sensors.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. from __future__ import annotations
  2. import numbers
  3. from pytz.exceptions import UnknownTimeZoneError
  4. from flask import current_app
  5. from marshmallow import (
  6. Schema,
  7. fields,
  8. validate,
  9. validates,
  10. ValidationError,
  11. validates_schema,
  12. )
  13. from marshmallow.validate import Validator
  14. import json
  15. import re
  16. import isodate
  17. import pandas as pd
  18. from flexmeasures.data import ma, db
  19. from flexmeasures.data.models.generic_assets import GenericAsset
  20. from flexmeasures.data.models.time_series import Sensor
  21. from flexmeasures.data.schemas.utils import (
  22. FMValidationError,
  23. MarshmallowClickMixin,
  24. with_appcontext_if_needed,
  25. convert_to_quantity,
  26. )
  27. from flexmeasures.utils.unit_utils import (
  28. is_valid_unit,
  29. ur,
  30. units_are_convertible,
  31. )
  32. from flexmeasures.data.schemas.times import DurationField, AwareDateTimeField
  33. from flexmeasures.data.schemas.units import QuantityField
  34. class JSON(fields.Field):
  35. def _deserialize(self, value, attr, data, **kwargs) -> dict:
  36. try:
  37. return json.loads(value)
  38. except ValueError:
  39. raise ValidationError("Not a valid JSON string.")
  40. def _serialize(self, value, attr, data, **kwargs) -> str:
  41. return json.dumps(value)
  42. class TimedEventSchema(Schema):
  43. value = QuantityField(
  44. required=True,
  45. to_unit="dimensionless", # placeholder, overridden in __init__
  46. default_src_unit="dimensionless", # placeholder, overridden in __init__
  47. return_magnitude=True, # placeholder, overridden in __init__
  48. )
  49. datetime = AwareDateTimeField(required=False)
  50. start = AwareDateTimeField(required=False)
  51. end = AwareDateTimeField(required=False)
  52. duration = DurationField(required=False)
  53. def __init__(
  54. self,
  55. timezone: str | None = None,
  56. value_validator: Validator | None = None,
  57. to_unit: str | None = None,
  58. default_src_unit: str | None = None,
  59. return_magnitude: bool = True,
  60. *args,
  61. **kwargs,
  62. ):
  63. """A time period (or single point) with a value.
  64. :param timezone: Optionally, set a timezone to be able to interpret nominal durations.
  65. """
  66. self.timezone = timezone
  67. self.value_validator = value_validator
  68. super().__init__(*args, **kwargs)
  69. if to_unit is not None:
  70. if to_unit.startswith("/"):
  71. if len(to_unit) < 2:
  72. raise ValueError(
  73. f"Variable `to_unit='{to_unit}'` must define a denominator."
  74. )
  75. setattr(self.fields["value"], "to_unit", to_unit)
  76. if default_src_unit is not None:
  77. setattr(self.fields["value"], "default_src_unit", default_src_unit)
  78. setattr(self.fields["value"], "return_magnitude", return_magnitude)
  79. @validates("value")
  80. def validate_value(self, _value):
  81. if self.value_validator is not None:
  82. self.value_validator(_value)
  83. @validates_schema
  84. def check_time_window(self, data: dict, **kwargs):
  85. """Checks whether a complete time interval can be derived from the timing fields.
  86. The data is updated in-place, guaranteeing that the 'start' and 'end' fields are filled out.
  87. """
  88. dt = data.get("datetime")
  89. start = data.get("start")
  90. end = data.get("end")
  91. duration = data.get("duration")
  92. if dt is not None:
  93. if any([p is not None for p in (start, end, duration)]):
  94. raise ValidationError(
  95. "If using the 'datetime' field, no 'start', 'end' or 'duration' is expected."
  96. )
  97. data["start"] = dt
  98. data["end"] = dt
  99. elif duration is not None:
  100. if self.timezone is None and isinstance(duration, isodate.Duration):
  101. raise ValidationError(
  102. "Cannot interpret nominal duration used in the 'duration' field without a known timezone."
  103. )
  104. elif all([p is None for p in (start, end)]) or all(
  105. [p is not None for p in (start, end)]
  106. ):
  107. raise ValidationError(
  108. "If using the 'duration' field, either 'start' or 'end' is expected."
  109. )
  110. if start is not None:
  111. try:
  112. grounded = DurationField.ground_from(
  113. duration, pd.Timestamp(start).tz_convert(self.timezone)
  114. )
  115. except UnknownTimeZoneError:
  116. grounded = DurationField.ground_from(duration, pd.Timestamp(start))
  117. data["start"] = start
  118. data["end"] = start + grounded
  119. else:
  120. try:
  121. grounded = DurationField.ground_from(
  122. -duration, pd.Timestamp(end).tz_convert(self.timezone)
  123. )
  124. except UnknownTimeZoneError:
  125. grounded = DurationField.ground_from(-duration, pd.Timestamp(end))
  126. data["start"] = end + grounded
  127. data["end"] = end
  128. else:
  129. if any([p is None for p in (start, end)]):
  130. raise ValidationError(
  131. "Missing field(s) to describe timing: use the 'datetime' field, "
  132. "or a combination of 2 fields of 'start', 'end' and 'duration'."
  133. )
  134. data["start"] = start
  135. data["end"] = end
  136. class SensorSchemaMixin(Schema):
  137. """
  138. Base sensor schema.
  139. Here we include all fields which are implemented by timely_beliefs.SensorDBMixin
  140. All classes inheriting from timely beliefs sensor don't need to repeat these.
  141. In a while, this schema can represent our unified Sensor class.
  142. When subclassing, also subclass from `ma.SQLAlchemySchema` and add your own DB model class, e.g.:
  143. class Meta:
  144. model = Asset
  145. """
  146. id = ma.auto_field(dump_only=True)
  147. name = ma.auto_field(required=True)
  148. unit = ma.auto_field(required=True)
  149. timezone = ma.auto_field()
  150. event_resolution = DurationField(required=True)
  151. entity_address = fields.String(dump_only=True)
  152. attributes = JSON(required=False)
  153. @validates("unit")
  154. def validate_unit(self, unit: str):
  155. if not is_valid_unit(unit):
  156. raise ValidationError(f"Unit '{unit}' cannot be handled.")
  157. class SensorSchema(SensorSchemaMixin, ma.SQLAlchemySchema):
  158. """
  159. Sensor schema, with validations.
  160. """
  161. generic_asset_id = fields.Integer(required=True)
  162. @validates("generic_asset_id")
  163. def validate_generic_asset(self, generic_asset_id: int):
  164. generic_asset = db.session.get(GenericAsset, generic_asset_id)
  165. if not generic_asset:
  166. raise ValidationError(
  167. f"Generic asset with id {generic_asset_id} doesn't exist."
  168. )
  169. # Add it to context to use it for AssetAuditLog record
  170. self.context["generic_asset"] = generic_asset
  171. class Meta:
  172. model = Sensor
  173. class SensorIdField(MarshmallowClickMixin, fields.Int):
  174. """Field that deserializes to a Sensor and serializes back to an integer."""
  175. def __init__(self, *args, unit: str | ur.Quantity | None = None, **kwargs):
  176. super().__init__(*args, **kwargs)
  177. if isinstance(unit, str):
  178. self.to_unit = ur.Quantity(unit)
  179. elif isinstance(unit, ur.Quantity):
  180. self.to_unit = unit
  181. else:
  182. self.to_unit = None
  183. @with_appcontext_if_needed()
  184. def _deserialize(self, value: int, attr, obj, **kwargs) -> Sensor:
  185. """Turn a sensor id into a Sensor."""
  186. if not isinstance(value, int) and not isinstance(value, str):
  187. raise FMValidationError(
  188. f"Sensor ID has the wrong type. Got `{type(value).__name__}` but `int` was expected."
  189. )
  190. sensor = db.session.get(Sensor, value)
  191. if sensor is None:
  192. raise FMValidationError(f"No sensor found with id {value}.")
  193. # lazy loading now (sensor is somehow not in session after this)
  194. sensor.generic_asset
  195. sensor.generic_asset.generic_asset_type
  196. # if the units are defined, check if the sensor data is convertible to the target units
  197. if self.to_unit is not None and not units_are_convertible(
  198. sensor.unit, str(self.to_unit.units)
  199. ):
  200. raise FMValidationError(
  201. f"Cannot convert {sensor.unit} to {self.to_unit.units}"
  202. )
  203. return sensor
  204. def _serialize(self, sensor: Sensor, attr, data, **kwargs) -> int:
  205. """Turn a Sensor into a sensor id."""
  206. return sensor.id
  207. class VariableQuantityField(MarshmallowClickMixin, fields.Field):
  208. def __init__(
  209. self,
  210. to_unit,
  211. *args,
  212. default_src_unit: str | None = None,
  213. return_magnitude: bool = False,
  214. timezone: str | None = None,
  215. value_validator: Validator | None = None,
  216. **kwargs,
  217. ):
  218. """Field for validating, serializing and deserializing a variable quantity.
  219. A variable quantity can be represented by a sensor, time series or fixed quantity.
  220. # todo: Sensor should perhaps deserialize already to sensor data
  221. NB any value validators passed are only applied to Quantities.
  222. For example, value_validator=validate.Range(min=0) will raise a ValidationError in case of negative quantities,
  223. but will let pass any sensor that has recorded negative values.
  224. :param to_unit: Unit to which the sensor, time series or quantity should be convertible.
  225. - Sensors are checked for convertibility, but the original sensor is returned,
  226. so its values are not yet converted.
  227. - Time series and quantities are already converted to the given unit.
  228. - Units starting with '/' (e.g. '/MWh') lead to accepting any value, which will be
  229. converted to the given unit. For example,
  230. a quantity of 1 EUR/kWh with to_unit='/MWh' is deserialized to 1000 EUR/MWh.
  231. :param default_src_unit: What unit to use in case of getting a numeric value.
  232. Does not apply to time series or sensors.
  233. In case to_unit is dimensionless, default_src_unit defaults to dimensionless;
  234. as a result, numeric values are accepted.
  235. :param return_magnitude: In case of getting a time series, whether the result should include
  236. the magnitude of each quantity, or each Quantity object itself.
  237. :param timezone: Only used in case a time series is specified and one of the *timed events*
  238. in the time series uses a nominal duration, such as "P1D".
  239. """
  240. super().__init__(*args, **kwargs)
  241. if value_validator is not None:
  242. # Insert validation into self.validators so that multiple errors can be stored.
  243. value_validator = RepurposeValidatorToIgnoreSensorsAndLists(value_validator)
  244. self.validators.insert(0, value_validator)
  245. self.timezone = timezone
  246. self.value_validator = value_validator
  247. if to_unit.startswith("/") and len(to_unit) < 2:
  248. raise ValueError(
  249. f"Variable `to_unit='{to_unit}'` must define a denominator."
  250. )
  251. self.to_unit = to_unit
  252. if default_src_unit is None and units_are_convertible(
  253. self.to_unit, "dimensionless"
  254. ):
  255. default_src_unit = "dimensionless"
  256. self.default_src_unit = default_src_unit
  257. self.return_magnitude = return_magnitude
  258. @with_appcontext_if_needed()
  259. def _deserialize(
  260. self, value: dict[str, int] | list[dict] | str, attr, obj, **kwargs
  261. ) -> Sensor | list[dict] | ur.Quantity:
  262. if isinstance(value, dict):
  263. return self._deserialize_dict(value)
  264. elif isinstance(value, list):
  265. return self._deserialize_list(value)
  266. elif isinstance(value, str):
  267. return self._deserialize_str(value)
  268. elif isinstance(value, numbers.Real) and self.default_src_unit is not None:
  269. return self._deserialize_numeric(value, attr, obj, **kwargs)
  270. else:
  271. raise FMValidationError(
  272. f"Unsupported value type. `{type(value)}` was provided but only dict, list and str are supported."
  273. )
  274. def _deserialize_dict(self, value: dict[str, int]) -> Sensor:
  275. """Deserialize a sensor reference to a Sensor."""
  276. if "sensor" not in value:
  277. raise FMValidationError("Dictionary provided but `sensor` key not found.")
  278. sensor = SensorIdField(
  279. unit=self.to_unit if not self.to_unit.startswith("/") else None
  280. ).deserialize(value["sensor"], None, None)
  281. return sensor
  282. def _deserialize_list(self, value: list[dict]) -> list[dict]:
  283. """Deserialize a time series to a list of timed events."""
  284. if self.return_magnitude is True:
  285. current_app.logger.warning(
  286. "Deserialized time series will include Quantity objects in the future. Set `return_magnitude=False` to trigger the new behaviour."
  287. )
  288. field = fields.List(
  289. fields.Nested(
  290. TimedEventSchema(
  291. timezone=self.timezone,
  292. value_validator=self.value_validator,
  293. to_unit=self.to_unit,
  294. default_src_unit=self.default_src_unit,
  295. return_magnitude=self.return_magnitude,
  296. )
  297. )
  298. )
  299. return field._deserialize(value, None, None)
  300. def _deserialize_str(self, value: str) -> ur.Quantity:
  301. """Deserialize a string to a Quantity."""
  302. return convert_to_quantity(value=value, to_unit=self.to_unit)
  303. def _deserialize_numeric(
  304. self, value: numbers.Real, attr, obj, **kwargs
  305. ) -> ur.Quantity:
  306. """Try to deserialize a numeric value to a Quantity, using the default_src_unit."""
  307. return self._deserialize(
  308. f"{value} {self.default_src_unit}", attr, obj, **kwargs
  309. )
  310. def _serialize(
  311. self, value: Sensor | pd.Series | ur.Quantity, attr, data, **kwargs
  312. ) -> str | dict[str, int]:
  313. if isinstance(value, Sensor):
  314. return dict(sensor=value.id)
  315. elif isinstance(value, pd.Series):
  316. raise NotImplementedError(
  317. "Serialization of a time series from a Pandas Series is not implemented yet."
  318. )
  319. elif isinstance(value, ur.Quantity):
  320. return str(value.to(self.to_unit))
  321. else:
  322. raise FMValidationError(
  323. "Serialized quantity, sensor or time series needs to be of type int, float, Sensor or pandas.Series."
  324. )
  325. def convert(self, value, param, ctx, **kwargs):
  326. # case that the click default is defined in numeric values
  327. if not isinstance(value, str):
  328. return super().convert(value, param, ctx, **kwargs)
  329. _value = re.match(r"sensor:(\d+)", value)
  330. if _value is not None:
  331. _value = {"sensor": int(_value.groups()[0])}
  332. else:
  333. _value = value
  334. return super().convert(_value, param, ctx, **kwargs)
  335. def _get_unit(self, variable_quantity: ur.Quantity | list[dict | Sensor]) -> str:
  336. """Obtain the unit from the variable quantity."""
  337. if isinstance(variable_quantity, ur.Quantity):
  338. unit = str(variable_quantity.units)
  339. elif isinstance(variable_quantity, list):
  340. unit = str(variable_quantity[0]["value"].units)
  341. if not all(
  342. str(variable_quantity[j]["value"].units) == unit
  343. for j in range(len(variable_quantity))
  344. ):
  345. raise ValidationError(
  346. "Segments of a time series must share the same unit.",
  347. field_name=self.data_key,
  348. )
  349. elif isinstance(variable_quantity, Sensor):
  350. unit = variable_quantity.unit
  351. else:
  352. raise NotImplementedError(
  353. f"Unexpected type '{type(variable_quantity)}' for variable_quantity describing '{self.data_key}': {variable_quantity}."
  354. )
  355. return unit
  356. class RepurposeValidatorToIgnoreSensorsAndLists(validate.Validator):
  357. """Validator that executes another validator (the one you initialize it with) only on non-Sensor and non-list values."""
  358. def __init__(self, original_validator, *, error: str | None = None):
  359. self.error = error
  360. self.original_validator = original_validator
  361. def __call__(self, value):
  362. if not isinstance(value, (Sensor, list)):
  363. self.original_validator(value)
  364. return value
  365. class QuantityOrSensor(VariableQuantityField):
  366. def __init__(self, *args, **kwargs):
  367. """Deprecated class. Use `VariableQuantityField` instead."""
  368. current_app.logger.warning(
  369. "Class `TimeSeriesOrSensor` is deprecated. Use `VariableQuantityField` instead."
  370. )
  371. super().__init__(return_magnitude=False, *args, **kwargs)
  372. class TimeSeriesOrSensor(VariableQuantityField):
  373. def __init__(self, *args, **kwargs):
  374. """Deprecated class. Use `VariableQuantityField` instead."""
  375. current_app.logger.warning(
  376. "Class `TimeSeriesOrSensor` is deprecated. Use `VariableQuantityField` instead."
  377. )
  378. super().__init__(return_magnitude=True, *args, **kwargs)