123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- from __future__ import annotations
- import json
- from marshmallow import validates, ValidationError, fields, validates_schema
- from flask_security import current_user
- from sqlalchemy import select
- from flexmeasures.data import ma, db
- from flexmeasures.data.models.user import Account
- from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
- from flexmeasures.data.schemas.locations import LatitudeField, LongitudeField
- from flexmeasures.data.schemas.utils import (
- FMValidationError,
- MarshmallowClickMixin,
- with_appcontext_if_needed,
- )
- from flexmeasures.auth.policy import user_has_admin_access
- from flexmeasures.cli import is_running as running_as_cli
- class JSON(fields.Field):
- def _deserialize(self, value, attr, data, **kwargs) -> dict:
- try:
- return json.loads(value)
- except ValueError:
- raise ValidationError("Not a valid JSON string.")
- def _serialize(self, value, attr, data, **kwargs) -> str:
- return json.dumps(value)
- class SensorsToShowSchema(fields.Field):
- """
- Schema for validating and deserializing the `sensors_to_show` attribute of a GenericAsset.
- The `sensors_to_show` attribute defines which sensors should be displayed for a particular asset.
- It supports various input formats, which are standardized into a list of dictionaries, each containing
- a `title` (optional) and a `sensors` list. The valid input formats include:
- - A single sensor ID (int): `42` -> `{"title": None, "sensors": [42]}`
- - A list of sensor IDs (list of ints): `[42, 43]` -> `{"title": None, "sensors": [42, 43]}`
- - A dictionary with a title and sensor: `{"title": "Temperature", "sensor": 42}` -> `{"title": "Temperature", "sensors": [42]}`
- - A dictionary with a title and sensors: `{"title": "Pressure", "sensors": [42, 43]}`
- Validation ensures that:
- - The input is either a list, integer, or dictionary.
- - If the input is a dictionary, it must contain either `sensor` (int) or `sensors` (list of ints).
- - All sensor IDs must be valid integers.
- Example Input:
- - `[{"title": "Test", "sensors": [1, 2]}, {"title": None, "sensors": [3, 4]}, 5]`
- Example Output (Standardized):
- - `[{"title": "Test", "sensors": [1, 2]}, {"title": None, "sensors": [3, 4]}, {"title": None, "sensors": [5]}]`
- """
- def deserialize(self, value, **kwargs) -> list:
- """
- Validate and deserialize the input value.
- """
- try:
- # Parse JSON if input is a string
- if isinstance(value, str):
- value = json.loads(value)
- # Ensure value is a list
- if not isinstance(value, list):
- raise ValidationError("sensors_to_show should be a list.")
- # Standardize each item in the list
- return [self._standardize_item(item) for item in value]
- except json.JSONDecodeError:
- raise ValidationError("Invalid JSON string.")
- def _standardize_item(self, item) -> dict:
- """
- Standardize different input formats to a consistent dictionary format.
- """
- if isinstance(item, int):
- return {"title": None, "sensors": [item]}
- elif isinstance(item, list):
- if not all(isinstance(sensor_id, int) for sensor_id in item):
- raise ValidationError(
- "All elements in a list within 'sensors_to_show' must be integers."
- )
- return {"title": None, "sensors": item}
- elif isinstance(item, dict):
- if "title" not in item:
- raise ValidationError("Dictionary must contain a 'title' key.")
- else:
- title = item["title"]
- if not isinstance(title, str) and title is not None:
- raise ValidationError("'title' value must be a string.")
- if "sensor" in item:
- sensor = item["sensor"]
- if not isinstance(sensor, int):
- raise ValidationError("'sensor' value must be an integer.")
- return {"title": title, "sensors": [sensor]}
- elif "sensors" in item:
- sensors = item["sensors"]
- if not isinstance(sensors, list) or not all(
- isinstance(sensor_id, int) for sensor_id in sensors
- ):
- raise ValidationError("'sensors' value must be a list of integers.")
- return {"title": title, "sensors": sensors}
- else:
- raise ValidationError(
- "Dictionary must contain either 'sensor' or 'sensors' key."
- )
- else:
- raise ValidationError(
- "Invalid item type in 'sensors_to_show'. Expected int, list, or dict."
- )
- @classmethod
- def flatten(cls, nested_list) -> list[int]:
- """
- Flatten a nested list of sensors or sensor dictionaries into a unique list of sensor IDs.
- This method processes the following formats, for each of the entries of the nested list:
- - A list of sensor IDs: `[1, 2, 3]`
- - A list of dictionaries where each dictionary contains a `sensors` list or a `sensor` key:
- `[{"title": "Temperature", "sensors": [1, 2]}, {"title": "Pressure", "sensor": 3}]`
- - Mixed formats: `[{"title": "Temperature", "sensors": [1, 2]}, {"title": "Pressure", "sensor": 3}, 4, 5, 1]`
- It extracts all sensor IDs, removes duplicates, and returns a flattened list of unique sensor IDs.
- Args:
- nested_list (list): A list containing sensor IDs, or dictionaries with `sensors` or `sensor` keys.
- Returns:
- list: A unique list of sensor IDs.
- """
- all_objects = []
- for s in nested_list:
- if isinstance(s, list):
- all_objects.extend(s)
- elif isinstance(s, dict):
- if "sensors" in s:
- all_objects.extend(s["sensors"])
- if "sensor" in s:
- all_objects.append(s["sensor"])
- else:
- all_objects.append(s)
- return list(dict.fromkeys(all_objects).keys())
- class GenericAssetSchema(ma.SQLAlchemySchema):
- """
- GenericAsset schema, with validations.
- """
- id = ma.auto_field(dump_only=True)
- name = fields.Str(required=True)
- account_id = ma.auto_field()
- owner = ma.Nested("AccountSchema", dump_only=True, only=("id", "name"))
- latitude = LatitudeField(allow_none=True)
- longitude = LongitudeField(allow_none=True)
- generic_asset_type_id = fields.Integer(required=True)
- generic_asset_type = ma.Nested(
- "GenericAssetTypeSchema", dump_only=True, only=("id", "name")
- )
- attributes = JSON(required=False)
- parent_asset_id = fields.Int(required=False, allow_none=True)
- child_assets = ma.Nested(
- "GenericAssetSchema",
- many=True,
- dump_only=True,
- only=("id", "name", "account_id", "generic_asset_type"),
- )
- sensors = ma.Nested("SensorSchema", many=True, dump_only=True, only=("id", "name"))
- sensors_to_show = JSON(required=False)
- flex_context = JSON(required=False)
- class Meta:
- model = GenericAsset
- @validates_schema(skip_on_field_errors=False)
- def validate_name_is_unique_under_parent(self, data, **kwargs):
- if "name" in data:
- asset = db.session.scalars(
- select(GenericAsset)
- .filter_by(
- name=data["name"],
- parent_asset_id=data.get("parent_asset_id"),
- account_id=data.get("account_id"),
- )
- .limit(1)
- ).first()
- if asset:
- raise ValidationError(
- f"An asset with the name '{data['name']}' already exists under parent asset with id={data.get('parent_asset_id')}.",
- "name",
- )
- @validates("generic_asset_type_id")
- def validate_generic_asset_type(self, generic_asset_type_id: int):
- generic_asset_type = db.session.get(GenericAssetType, generic_asset_type_id)
- if not generic_asset_type:
- raise ValidationError(
- f"GenericAssetType with id {generic_asset_type_id} doesn't exist."
- )
- @validates("parent_asset_id")
- def validate_parent_asset(self, parent_asset_id: int | None):
- if parent_asset_id is not None:
- parent_asset = db.session.get(GenericAsset, parent_asset_id)
- if not parent_asset:
- raise ValidationError(
- f"Parent GenericAsset with id {parent_asset_id} doesn't exist."
- )
- @validates("account_id")
- def validate_account(self, account_id: int | None):
- if account_id is None and (
- running_as_cli() or user_has_admin_access(current_user, "update")
- ):
- return
- account = db.session.get(Account, account_id)
- if not account:
- raise ValidationError(f"Account with Id {account_id} doesn't exist.")
- if not running_as_cli() and (
- not user_has_admin_access(current_user, "update")
- and account_id != current_user.account_id
- ):
- raise ValidationError(
- "User is not allowed to create assets for this account."
- )
- @validates("attributes")
- def validate_attributes(self, attributes: dict):
- sensors_to_show = attributes.get("sensors_to_show", [])
- if sensors_to_show:
- # Use SensorsToShowSchema to validate and deserialize sensors_to_show
- sensors_to_show_schema = SensorsToShowSchema()
- standardized_sensors = sensors_to_show_schema.deserialize(sensors_to_show)
- unique_sensor_ids = SensorsToShowSchema.flatten(standardized_sensors)
- # Check whether IDs represent accessible sensors
- from flexmeasures.data.schemas import SensorIdField
- for sensor_id in unique_sensor_ids:
- SensorIdField().deserialize(sensor_id)
- class GenericAssetTypeSchema(ma.SQLAlchemySchema):
- """
- GenericAssetType schema, with validations.
- """
- id = ma.auto_field()
- name = fields.Str()
- description = ma.auto_field()
- class Meta:
- model = GenericAssetType
- class GenericAssetIdField(MarshmallowClickMixin, fields.Int):
- """Field that deserializes to a GenericAsset and serializes back to an integer."""
- @with_appcontext_if_needed()
- def _deserialize(self, value, attr, obj, **kwargs) -> GenericAsset:
- """Turn a generic asset id into a GenericAsset."""
- generic_asset = db.session.get(GenericAsset, value)
- if generic_asset is None:
- raise FMValidationError(f"No asset found with id {value}.")
- # lazy loading now (asset is somehow not in session after this)
- generic_asset.generic_asset_type
- return generic_asset
- def _serialize(self, asset, attr, data, **kwargs):
- """Turn a GenericAsset into a generic asset id."""
- return asset.id
|