sensor_data.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. from __future__ import annotations
  2. from datetime import timedelta
  3. from flask_login import current_user
  4. from isodate import datetime_isoformat
  5. from marshmallow import fields, post_load, validates_schema, ValidationError
  6. from marshmallow.validate import OneOf, Length
  7. from marshmallow_polyfield import PolyField
  8. from timely_beliefs import BeliefsDataFrame
  9. import pandas as pd
  10. from flexmeasures.data import ma
  11. from flexmeasures.data.models.time_series import Sensor
  12. from flexmeasures.api.common.schemas.sensors import SensorField
  13. from flexmeasures.api.common.utils.api_utils import upsample_values
  14. from flexmeasures.data.models.planning.utils import initialize_index
  15. from flexmeasures.data.schemas import AwareDateTimeField, DurationField, SourceIdField
  16. from flexmeasures.data.services.data_sources import get_or_create_source
  17. from flexmeasures.data.services.time_series import simplify_index
  18. from flexmeasures.utils.time_utils import (
  19. decide_resolution,
  20. duration_isoformat,
  21. server_now,
  22. )
  23. from flexmeasures.utils.unit_utils import (
  24. convert_units,
  25. units_are_convertible,
  26. is_energy_price_unit,
  27. )
  28. class SingleValueField(fields.Float):
  29. """Field that both de-serializes and serializes a single value to a list of floats (length 1)."""
  30. def _deserialize(self, value, attr, obj, **kwargs) -> list[float]:
  31. return [self._validated(value)]
  32. def _serialize(self, value, attr, data, **kwargs) -> list[float]:
  33. return [self._validated(value)]
  34. def select_schema_to_ensure_list_of_floats(
  35. values: list[float] | float, _
  36. ) -> fields.List | SingleValueField:
  37. """Allows both a single float and a list of floats. Always returns a list of floats.
  38. Meant to improve user experience by not needing to make a list out of a single item, such that:
  39. {
  40. "values": [3.7]
  41. }
  42. can be written as:
  43. {
  44. "values": 3.7
  45. }
  46. Either will be de-serialized to [3.7].
  47. Note that serialization always results in a list of floats.
  48. This ensures that we are not requiring the same flexibility from users who are retrieving data.
  49. """
  50. if isinstance(values, list):
  51. return fields.List(fields.Float(allow_none=True), validate=Length(min=1))
  52. else:
  53. return SingleValueField()
  54. class SensorDataDescriptionSchema(ma.Schema):
  55. """
  56. Schema describing sensor data (specifically, the sensor and the timing of the data).
  57. """
  58. sensor = SensorField(required=True, entity_type="sensor", fm_scheme="fm1")
  59. start = AwareDateTimeField(required=True, format="iso")
  60. duration = DurationField(required=True)
  61. horizon = DurationField(required=False)
  62. prior = AwareDateTimeField(required=False, format="iso")
  63. unit = fields.Str(required=True)
  64. @validates_schema
  65. def check_schema_unit_against_sensor_unit(self, data, **kwargs):
  66. """Allows units compatible with that of the sensor.
  67. For example, a sensor with W units allows data to be posted with units:
  68. - W, kW, MW, etc. (i.e. units with different prefixes)
  69. - J/s, Nm/s, etc. (i.e. units that can be converted using some multiplier)
  70. - Wh, kWh, etc. (i.e. units that represent a stock delta, which knowing the duration can be converted to a flow)
  71. For compatible units, the SensorDataSchema converts values to the sensor's unit.
  72. """
  73. posted_unit = data["unit"]
  74. required_unit = data["sensor"].unit
  75. if posted_unit != required_unit and not units_are_convertible(
  76. posted_unit, required_unit
  77. ):
  78. raise ValidationError(
  79. f"Required unit for this sensor is {data['sensor'].unit}, got incompatible unit: {data['unit']}"
  80. )
  81. class GetSensorDataSchema(SensorDataDescriptionSchema):
  82. resolution = DurationField(required=False)
  83. source = SourceIdField(required=False)
  84. # Optional field that can be used for extra validation
  85. type = fields.Str(
  86. required=False,
  87. validate=OneOf(
  88. [
  89. "GetSensorDataRequest",
  90. "GetMeterDataRequest",
  91. "GetPrognosisRequest",
  92. "GetPriceDataRequest",
  93. ]
  94. ),
  95. )
  96. @validates_schema
  97. def check_schema_unit_against_type(self, data, **kwargs):
  98. requested_unit = data["unit"]
  99. _type = data.get("type", None)
  100. if _type in (
  101. "GetMeterDataRequest",
  102. "GetPrognosisRequest",
  103. ) and not units_are_convertible(requested_unit, "MW"):
  104. raise ValidationError(
  105. f"The unit requested for this message type should be convertible from MW, got incompatible unit: {requested_unit}"
  106. )
  107. elif _type == "GetPriceDataRequest" and not is_energy_price_unit(
  108. requested_unit
  109. ):
  110. raise ValidationError(
  111. f"The unit requested for this message type should be convertible from an energy price unit, got incompatible unit: {requested_unit}"
  112. )
  113. @staticmethod
  114. def load_data_and_make_response(sensor_data_description: dict) -> dict:
  115. """Turn the de-serialized and validated data description into a response.
  116. Specifically, this function:
  117. - queries data according to the given description
  118. - converts to a single deterministic belief per event
  119. - ensures the response respects the requested time frame
  120. - converts values to the requested unit
  121. - converts values to the requested resolution
  122. """
  123. sensor: Sensor = sensor_data_description["sensor"]
  124. start = sensor_data_description["start"]
  125. duration = sensor_data_description["duration"]
  126. end = sensor_data_description["start"] + duration
  127. unit = sensor_data_description["unit"]
  128. resolution = sensor_data_description.get("resolution")
  129. source = sensor_data_description.get("source")
  130. # Post-load configuration of event frequency
  131. if resolution is None:
  132. if sensor.event_resolution != timedelta(hours=0):
  133. resolution = sensor.event_resolution
  134. else:
  135. # For instantaneous sensors, choose a default resolution given the requested time window
  136. resolution = decide_resolution(start, end)
  137. # Post-load configuration of belief timing against message type
  138. horizons_at_least = sensor_data_description.get("horizon", None)
  139. horizons_at_most = None
  140. _type = sensor_data_description.get("type", None)
  141. if _type == "GetMeterDataRequest":
  142. horizons_at_most = timedelta(0)
  143. elif _type == "GetPrognosisRequest":
  144. if horizons_at_least is None:
  145. horizons_at_least = timedelta(0)
  146. else:
  147. # If the horizon field is used, ensure we still respect the minimum horizon for prognoses
  148. horizons_at_least = max(horizons_at_least, timedelta(0))
  149. df = simplify_index(
  150. sensor.search_beliefs(
  151. event_starts_after=start,
  152. event_ends_before=end,
  153. horizons_at_least=horizons_at_least,
  154. horizons_at_most=horizons_at_most,
  155. source=source,
  156. beliefs_before=sensor_data_description.get("prior", None),
  157. one_deterministic_belief_per_event=True,
  158. resolution=resolution,
  159. as_json=False,
  160. )
  161. )
  162. # Convert to desired time range
  163. index = initialize_index(start=start, end=end, resolution=resolution)
  164. df = df.reindex(index)
  165. # Convert to desired unit
  166. values: pd.Series = convert_units( # type: ignore
  167. df["event_value"],
  168. from_unit=sensor.unit,
  169. to_unit=unit,
  170. )
  171. # Convert NaN to None, which JSON dumps as null values
  172. values = values.astype(object).where(pd.notnull(values), None)
  173. # Form the response
  174. response = dict(
  175. values=values.tolist(),
  176. start=datetime_isoformat(start),
  177. duration=duration_isoformat(duration),
  178. unit=unit,
  179. resolution=duration_isoformat(df.event_resolution),
  180. )
  181. return response
  182. class PostSensorDataSchema(SensorDataDescriptionSchema):
  183. """
  184. This schema includes data, so it can be used for POST requests
  185. or GET responses.
  186. TODO: For the GET use case, look at api/common/validators.py::get_data_downsampling_allowed
  187. (sets a resolution parameter which we can pass to the data collection function).
  188. """
  189. # Optional field that can be used for extra validation
  190. type = fields.Str(
  191. required=False,
  192. validate=OneOf(
  193. [
  194. "PostSensorDataRequest",
  195. "PostMeterDataRequest",
  196. "PostPrognosisRequest",
  197. "PostPriceDataRequest",
  198. "PostWeatherDataRequest",
  199. ]
  200. ),
  201. )
  202. values = PolyField(
  203. deserialization_schema_selector=select_schema_to_ensure_list_of_floats,
  204. serialization_schema_selector=select_schema_to_ensure_list_of_floats,
  205. many=False,
  206. )
  207. @validates_schema
  208. def check_schema_unit_against_type(self, data, **kwargs):
  209. posted_unit = data["unit"]
  210. _type = data.get("type", None)
  211. if _type in (
  212. "PostMeterDataRequest",
  213. "PostPrognosisRequest",
  214. ) and not units_are_convertible(posted_unit, "MW"):
  215. raise ValidationError(
  216. f"The unit required for this message type should be convertible to MW, got incompatible unit: {posted_unit}"
  217. )
  218. elif _type == "PostPriceDataRequest" and not is_energy_price_unit(posted_unit):
  219. raise ValidationError(
  220. f"The unit required for this message type should be convertible to an energy price unit, got incompatible unit: {posted_unit}"
  221. )
  222. @validates_schema
  223. def check_resolution_compatibility_of_sensor_data(self, data, **kwargs):
  224. """Ensure event frequency is compatible with the sensor's event resolution.
  225. For a sensor recording instantaneous values, any event frequency is compatible.
  226. For a sensor recording non-instantaneous values, the event frequency must fit the sensor's event resolution.
  227. Currently, only upsampling is supported (e.g. converting hourly events to 15-minute events).
  228. """
  229. required_resolution = data["sensor"].event_resolution
  230. if required_resolution == timedelta(hours=0):
  231. # For instantaneous sensors, any event frequency is compatible
  232. return
  233. # The event frequency is inferred by assuming sequential, equidistant values within a time interval.
  234. # The event resolution is assumed to be equal to the event frequency.
  235. inferred_resolution = data["duration"] / len(data["values"])
  236. if inferred_resolution % required_resolution != timedelta(hours=0):
  237. raise ValidationError(
  238. f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
  239. )
  240. @validates_schema
  241. def check_multiple_instantenous_values(self, data, **kwargs):
  242. """Ensure that we are not getting multiple instantaneous values that overlap.
  243. That is, two values spanning the same moment (a zero duration).
  244. """
  245. if len(data["values"]) > 1 and data["duration"] / len(
  246. data["values"]
  247. ) == timedelta(0):
  248. raise ValidationError(
  249. "Cannot save multiple instantaneous values that overlap. That is, two values spanning the same moment (a zero duration). Try sending a single value or definining a non-zero duration."
  250. )
  251. @post_load()
  252. def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame:
  253. """If needed, upsample and convert units, then deserialize to a BeliefsDataFrame."""
  254. data = self.possibly_upsample_values(data)
  255. data = self.possibly_convert_units(data)
  256. bdf = self.load_bdf(data)
  257. # Post-load validation against message type
  258. _type = data.get("type", None)
  259. if _type == "PostMeterDataRequest":
  260. if any(h > timedelta(0) for h in bdf.belief_horizons):
  261. raise ValidationError("Meter data must lie in the past.")
  262. elif _type == "PostPrognosisRequest":
  263. if any(h < timedelta(0) for h in bdf.belief_horizons):
  264. raise ValidationError("Prognoses must lie in the future.")
  265. return bdf
  266. @staticmethod
  267. def possibly_convert_units(data):
  268. """
  269. Convert values if needed, to fit the sensor's unit.
  270. Marshmallow runs this after validation.
  271. """
  272. data["values"] = convert_units(
  273. data["values"],
  274. from_unit=data["unit"],
  275. to_unit=data["sensor"].unit,
  276. event_resolution=data["sensor"].event_resolution,
  277. )
  278. return data
  279. @staticmethod
  280. def possibly_upsample_values(data):
  281. """
  282. Upsample the data if needed, to fit to the sensor's resolution.
  283. Marshmallow runs this after validation.
  284. """
  285. required_resolution = data["sensor"].event_resolution
  286. if required_resolution == timedelta(hours=0):
  287. # For instantaneous sensors, no need to upsample
  288. return data
  289. # The event frequency is inferred by assuming sequential, equidistant values within a time interval.
  290. # The event resolution is assumed to be equal to the event frequency.
  291. inferred_resolution = data["duration"] / len(data["values"])
  292. # we already know resolutions are compatible (see validation)
  293. if inferred_resolution != required_resolution:
  294. data["values"] = upsample_values(
  295. data["values"],
  296. from_resolution=inferred_resolution,
  297. to_resolution=required_resolution,
  298. )
  299. return data
  300. @staticmethod
  301. def load_bdf(sensor_data: dict) -> BeliefsDataFrame:
  302. """
  303. Turn the de-serialized and validated data into a BeliefsDataFrame.
  304. """
  305. source = get_or_create_source(current_user)
  306. num_values = len(sensor_data["values"])
  307. event_resolution = sensor_data["duration"] / num_values
  308. start = sensor_data["start"]
  309. sensor = sensor_data["sensor"]
  310. if frequency := sensor.get_attribute("frequency"):
  311. start = pd.Timestamp(start).round(frequency)
  312. if event_resolution == timedelta(hours=0):
  313. dt_index = pd.date_range(
  314. start,
  315. periods=num_values,
  316. )
  317. else:
  318. dt_index = pd.date_range(
  319. start,
  320. periods=num_values,
  321. freq=event_resolution,
  322. )
  323. s = pd.Series(sensor_data["values"], index=dt_index)
  324. # Work out what the recording time should be
  325. belief_timing = {}
  326. if "prior" in sensor_data:
  327. belief_timing["belief_time"] = sensor_data["prior"]
  328. elif "horizon" in sensor_data:
  329. belief_timing["belief_horizon"] = sensor_data["horizon"]
  330. else:
  331. belief_timing["belief_time"] = server_now()
  332. return BeliefsDataFrame(
  333. s,
  334. source=source,
  335. sensor=sensor_data["sensor"],
  336. **belief_timing,
  337. ).dropna()