generic_assets.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. from __future__ import annotations
  2. import json
  3. from marshmallow import validates, ValidationError, fields, validates_schema
  4. from flask_security import current_user
  5. from sqlalchemy import select
  6. from flexmeasures.data import ma, db
  7. from flexmeasures.data.models.user import Account
  8. from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
  9. from flexmeasures.data.schemas.locations import LatitudeField, LongitudeField
  10. from flexmeasures.data.schemas.utils import (
  11. FMValidationError,
  12. MarshmallowClickMixin,
  13. with_appcontext_if_needed,
  14. )
  15. from flexmeasures.auth.policy import user_has_admin_access
  16. from flexmeasures.cli import is_running as running_as_cli
  17. class JSON(fields.Field):
  18. def _deserialize(self, value, attr, data, **kwargs) -> dict:
  19. try:
  20. return json.loads(value)
  21. except ValueError:
  22. raise ValidationError("Not a valid JSON string.")
  23. def _serialize(self, value, attr, data, **kwargs) -> str:
  24. return json.dumps(value)
  25. class SensorsToShowSchema(fields.Field):
  26. """
  27. Schema for validating and deserializing the `sensors_to_show` attribute of a GenericAsset.
  28. The `sensors_to_show` attribute defines which sensors should be displayed for a particular asset.
  29. It supports various input formats, which are standardized into a list of dictionaries, each containing
  30. a `title` (optional) and a `sensors` list. The valid input formats include:
  31. - A single sensor ID (int): `42` -> `{"title": None, "sensors": [42]}`
  32. - A list of sensor IDs (list of ints): `[42, 43]` -> `{"title": None, "sensors": [42, 43]}`
  33. - A dictionary with a title and sensor: `{"title": "Temperature", "sensor": 42}` -> `{"title": "Temperature", "sensors": [42]}`
  34. - A dictionary with a title and sensors: `{"title": "Pressure", "sensors": [42, 43]}`
  35. Validation ensures that:
  36. - The input is either a list, integer, or dictionary.
  37. - If the input is a dictionary, it must contain either `sensor` (int) or `sensors` (list of ints).
  38. - All sensor IDs must be valid integers.
  39. Example Input:
  40. - `[{"title": "Test", "sensors": [1, 2]}, {"title": None, "sensors": [3, 4]}, 5]`
  41. Example Output (Standardized):
  42. - `[{"title": "Test", "sensors": [1, 2]}, {"title": None, "sensors": [3, 4]}, {"title": None, "sensors": [5]}]`
  43. """
  44. def deserialize(self, value, **kwargs) -> list:
  45. """
  46. Validate and deserialize the input value.
  47. """
  48. try:
  49. # Parse JSON if input is a string
  50. if isinstance(value, str):
  51. value = json.loads(value)
  52. # Ensure value is a list
  53. if not isinstance(value, list):
  54. raise ValidationError("sensors_to_show should be a list.")
  55. # Standardize each item in the list
  56. return [self._standardize_item(item) for item in value]
  57. except json.JSONDecodeError:
  58. raise ValidationError("Invalid JSON string.")
  59. def _standardize_item(self, item) -> dict:
  60. """
  61. Standardize different input formats to a consistent dictionary format.
  62. """
  63. if isinstance(item, int):
  64. return {"title": None, "sensors": [item]}
  65. elif isinstance(item, list):
  66. if not all(isinstance(sensor_id, int) for sensor_id in item):
  67. raise ValidationError(
  68. "All elements in a list within 'sensors_to_show' must be integers."
  69. )
  70. return {"title": None, "sensors": item}
  71. elif isinstance(item, dict):
  72. if "title" not in item:
  73. raise ValidationError("Dictionary must contain a 'title' key.")
  74. else:
  75. title = item["title"]
  76. if not isinstance(title, str) and title is not None:
  77. raise ValidationError("'title' value must be a string.")
  78. if "sensor" in item:
  79. sensor = item["sensor"]
  80. if not isinstance(sensor, int):
  81. raise ValidationError("'sensor' value must be an integer.")
  82. return {"title": title, "sensors": [sensor]}
  83. elif "sensors" in item:
  84. sensors = item["sensors"]
  85. if not isinstance(sensors, list) or not all(
  86. isinstance(sensor_id, int) for sensor_id in sensors
  87. ):
  88. raise ValidationError("'sensors' value must be a list of integers.")
  89. return {"title": title, "sensors": sensors}
  90. else:
  91. raise ValidationError(
  92. "Dictionary must contain either 'sensor' or 'sensors' key."
  93. )
  94. else:
  95. raise ValidationError(
  96. "Invalid item type in 'sensors_to_show'. Expected int, list, or dict."
  97. )
  98. @classmethod
  99. def flatten(cls, nested_list) -> list[int]:
  100. """
  101. Flatten a nested list of sensors or sensor dictionaries into a unique list of sensor IDs.
  102. This method processes the following formats, for each of the entries of the nested list:
  103. - A list of sensor IDs: `[1, 2, 3]`
  104. - A list of dictionaries where each dictionary contains a `sensors` list or a `sensor` key:
  105. `[{"title": "Temperature", "sensors": [1, 2]}, {"title": "Pressure", "sensor": 3}]`
  106. - Mixed formats: `[{"title": "Temperature", "sensors": [1, 2]}, {"title": "Pressure", "sensor": 3}, 4, 5, 1]`
  107. It extracts all sensor IDs, removes duplicates, and returns a flattened list of unique sensor IDs.
  108. Args:
  109. nested_list (list): A list containing sensor IDs, or dictionaries with `sensors` or `sensor` keys.
  110. Returns:
  111. list: A unique list of sensor IDs.
  112. """
  113. all_objects = []
  114. for s in nested_list:
  115. if isinstance(s, list):
  116. all_objects.extend(s)
  117. elif isinstance(s, dict):
  118. if "sensors" in s:
  119. all_objects.extend(s["sensors"])
  120. if "sensor" in s:
  121. all_objects.append(s["sensor"])
  122. else:
  123. all_objects.append(s)
  124. return list(dict.fromkeys(all_objects).keys())
  125. class GenericAssetSchema(ma.SQLAlchemySchema):
  126. """
  127. GenericAsset schema, with validations.
  128. """
  129. id = ma.auto_field(dump_only=True)
  130. name = fields.Str(required=True)
  131. account_id = ma.auto_field()
  132. owner = ma.Nested("AccountSchema", dump_only=True, only=("id", "name"))
  133. latitude = LatitudeField(allow_none=True)
  134. longitude = LongitudeField(allow_none=True)
  135. generic_asset_type_id = fields.Integer(required=True)
  136. generic_asset_type = ma.Nested(
  137. "GenericAssetTypeSchema", dump_only=True, only=("id", "name")
  138. )
  139. attributes = JSON(required=False)
  140. parent_asset_id = fields.Int(required=False, allow_none=True)
  141. child_assets = ma.Nested(
  142. "GenericAssetSchema",
  143. many=True,
  144. dump_only=True,
  145. only=("id", "name", "account_id", "generic_asset_type"),
  146. )
  147. sensors = ma.Nested("SensorSchema", many=True, dump_only=True, only=("id", "name"))
  148. sensors_to_show = JSON(required=False)
  149. flex_context = JSON(required=False)
  150. class Meta:
  151. model = GenericAsset
  152. @validates_schema(skip_on_field_errors=False)
  153. def validate_name_is_unique_under_parent(self, data, **kwargs):
  154. if "name" in data:
  155. asset = db.session.scalars(
  156. select(GenericAsset)
  157. .filter_by(
  158. name=data["name"],
  159. parent_asset_id=data.get("parent_asset_id"),
  160. account_id=data.get("account_id"),
  161. )
  162. .limit(1)
  163. ).first()
  164. if asset:
  165. raise ValidationError(
  166. f"An asset with the name '{data['name']}' already exists under parent asset with id={data.get('parent_asset_id')}.",
  167. "name",
  168. )
  169. @validates("generic_asset_type_id")
  170. def validate_generic_asset_type(self, generic_asset_type_id: int):
  171. generic_asset_type = db.session.get(GenericAssetType, generic_asset_type_id)
  172. if not generic_asset_type:
  173. raise ValidationError(
  174. f"GenericAssetType with id {generic_asset_type_id} doesn't exist."
  175. )
  176. @validates("parent_asset_id")
  177. def validate_parent_asset(self, parent_asset_id: int | None):
  178. if parent_asset_id is not None:
  179. parent_asset = db.session.get(GenericAsset, parent_asset_id)
  180. if not parent_asset:
  181. raise ValidationError(
  182. f"Parent GenericAsset with id {parent_asset_id} doesn't exist."
  183. )
  184. @validates("account_id")
  185. def validate_account(self, account_id: int | None):
  186. if account_id is None and (
  187. running_as_cli() or user_has_admin_access(current_user, "update")
  188. ):
  189. return
  190. account = db.session.get(Account, account_id)
  191. if not account:
  192. raise ValidationError(f"Account with Id {account_id} doesn't exist.")
  193. if not running_as_cli() and (
  194. not user_has_admin_access(current_user, "update")
  195. and account_id != current_user.account_id
  196. ):
  197. raise ValidationError(
  198. "User is not allowed to create assets for this account."
  199. )
  200. @validates("attributes")
  201. def validate_attributes(self, attributes: dict):
  202. sensors_to_show = attributes.get("sensors_to_show", [])
  203. if sensors_to_show:
  204. # Use SensorsToShowSchema to validate and deserialize sensors_to_show
  205. sensors_to_show_schema = SensorsToShowSchema()
  206. standardized_sensors = sensors_to_show_schema.deserialize(sensors_to_show)
  207. unique_sensor_ids = SensorsToShowSchema.flatten(standardized_sensors)
  208. # Check whether IDs represent accessible sensors
  209. from flexmeasures.data.schemas import SensorIdField
  210. for sensor_id in unique_sensor_ids:
  211. SensorIdField().deserialize(sensor_id)
  212. class GenericAssetTypeSchema(ma.SQLAlchemySchema):
  213. """
  214. GenericAssetType schema, with validations.
  215. """
  216. id = ma.auto_field()
  217. name = fields.Str()
  218. description = ma.auto_field()
  219. class Meta:
  220. model = GenericAssetType
  221. class GenericAssetIdField(MarshmallowClickMixin, fields.Int):
  222. """Field that deserializes to a GenericAsset and serializes back to an integer."""
  223. @with_appcontext_if_needed()
  224. def _deserialize(self, value, attr, obj, **kwargs) -> GenericAsset:
  225. """Turn a generic asset id into a GenericAsset."""
  226. generic_asset = db.session.get(GenericAsset, value)
  227. if generic_asset is None:
  228. raise FMValidationError(f"No asset found with id {value}.")
  229. # lazy loading now (asset is somehow not in session after this)
  230. generic_asset.generic_asset_type
  231. return generic_asset
  232. def _serialize(self, asset, attr, data, **kwargs):
  233. """Turn a GenericAsset into a generic asset id."""
  234. return asset.id