123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- from __future__ import annotations
- from datetime import timedelta
- from flask_login import current_user
- from isodate import datetime_isoformat
- from marshmallow import fields, post_load, validates_schema, ValidationError
- from marshmallow.validate import OneOf, Length
- from marshmallow_polyfield import PolyField
- from timely_beliefs import BeliefsDataFrame
- import pandas as pd
- from flexmeasures.data import ma
- from flexmeasures.data.models.time_series import Sensor
- from flexmeasures.api.common.schemas.sensors import SensorField
- from flexmeasures.api.common.utils.api_utils import upsample_values
- from flexmeasures.data.models.planning.utils import initialize_index
- from flexmeasures.data.schemas import AwareDateTimeField, DurationField, SourceIdField
- from flexmeasures.data.services.data_sources import get_or_create_source
- from flexmeasures.data.services.time_series import simplify_index
- from flexmeasures.utils.time_utils import (
- decide_resolution,
- duration_isoformat,
- server_now,
- )
- from flexmeasures.utils.unit_utils import (
- convert_units,
- units_are_convertible,
- is_energy_price_unit,
- )
- class SingleValueField(fields.Float):
- """Field that both de-serializes and serializes a single value to a list of floats (length 1)."""
- def _deserialize(self, value, attr, obj, **kwargs) -> list[float]:
- return [self._validated(value)]
- def _serialize(self, value, attr, data, **kwargs) -> list[float]:
- return [self._validated(value)]
- def select_schema_to_ensure_list_of_floats(
- values: list[float] | float, _
- ) -> fields.List | SingleValueField:
- """Allows both a single float and a list of floats. Always returns a list of floats.
- Meant to improve user experience by not needing to make a list out of a single item, such that:
- {
- "values": [3.7]
- }
- can be written as:
- {
- "values": 3.7
- }
- Either will be de-serialized to [3.7].
- Note that serialization always results in a list of floats.
- This ensures that we are not requiring the same flexibility from users who are retrieving data.
- """
- if isinstance(values, list):
- return fields.List(fields.Float(allow_none=True), validate=Length(min=1))
- else:
- return SingleValueField()
- class SensorDataDescriptionSchema(ma.Schema):
- """
- Schema describing sensor data (specifically, the sensor and the timing of the data).
- """
- sensor = SensorField(required=True, entity_type="sensor", fm_scheme="fm1")
- start = AwareDateTimeField(required=True, format="iso")
- duration = DurationField(required=True)
- horizon = DurationField(required=False)
- prior = AwareDateTimeField(required=False, format="iso")
- unit = fields.Str(required=True)
- @validates_schema
- def check_schema_unit_against_sensor_unit(self, data, **kwargs):
- """Allows units compatible with that of the sensor.
- For example, a sensor with W units allows data to be posted with units:
- - W, kW, MW, etc. (i.e. units with different prefixes)
- - J/s, Nm/s, etc. (i.e. units that can be converted using some multiplier)
- - Wh, kWh, etc. (i.e. units that represent a stock delta, which knowing the duration can be converted to a flow)
- For compatible units, the SensorDataSchema converts values to the sensor's unit.
- """
- posted_unit = data["unit"]
- required_unit = data["sensor"].unit
- if posted_unit != required_unit and not units_are_convertible(
- posted_unit, required_unit
- ):
- raise ValidationError(
- f"Required unit for this sensor is {data['sensor'].unit}, got incompatible unit: {data['unit']}"
- )
- class GetSensorDataSchema(SensorDataDescriptionSchema):
- resolution = DurationField(required=False)
- source = SourceIdField(required=False)
- # Optional field that can be used for extra validation
- type = fields.Str(
- required=False,
- validate=OneOf(
- [
- "GetSensorDataRequest",
- "GetMeterDataRequest",
- "GetPrognosisRequest",
- "GetPriceDataRequest",
- ]
- ),
- )
- @validates_schema
- def check_schema_unit_against_type(self, data, **kwargs):
- requested_unit = data["unit"]
- _type = data.get("type", None)
- if _type in (
- "GetMeterDataRequest",
- "GetPrognosisRequest",
- ) and not units_are_convertible(requested_unit, "MW"):
- raise ValidationError(
- f"The unit requested for this message type should be convertible from MW, got incompatible unit: {requested_unit}"
- )
- elif _type == "GetPriceDataRequest" and not is_energy_price_unit(
- requested_unit
- ):
- raise ValidationError(
- f"The unit requested for this message type should be convertible from an energy price unit, got incompatible unit: {requested_unit}"
- )
- @staticmethod
- def load_data_and_make_response(sensor_data_description: dict) -> dict:
- """Turn the de-serialized and validated data description into a response.
- Specifically, this function:
- - queries data according to the given description
- - converts to a single deterministic belief per event
- - ensures the response respects the requested time frame
- - converts values to the requested unit
- - converts values to the requested resolution
- """
- sensor: Sensor = sensor_data_description["sensor"]
- start = sensor_data_description["start"]
- duration = sensor_data_description["duration"]
- end = sensor_data_description["start"] + duration
- unit = sensor_data_description["unit"]
- resolution = sensor_data_description.get("resolution")
- source = sensor_data_description.get("source")
- # Post-load configuration of event frequency
- if resolution is None:
- if sensor.event_resolution != timedelta(hours=0):
- resolution = sensor.event_resolution
- else:
- # For instantaneous sensors, choose a default resolution given the requested time window
- resolution = decide_resolution(start, end)
- # Post-load configuration of belief timing against message type
- horizons_at_least = sensor_data_description.get("horizon", None)
- horizons_at_most = None
- _type = sensor_data_description.get("type", None)
- if _type == "GetMeterDataRequest":
- horizons_at_most = timedelta(0)
- elif _type == "GetPrognosisRequest":
- if horizons_at_least is None:
- horizons_at_least = timedelta(0)
- else:
- # If the horizon field is used, ensure we still respect the minimum horizon for prognoses
- horizons_at_least = max(horizons_at_least, timedelta(0))
- df = simplify_index(
- sensor.search_beliefs(
- event_starts_after=start,
- event_ends_before=end,
- horizons_at_least=horizons_at_least,
- horizons_at_most=horizons_at_most,
- source=source,
- beliefs_before=sensor_data_description.get("prior", None),
- one_deterministic_belief_per_event=True,
- resolution=resolution,
- as_json=False,
- )
- )
- # Convert to desired time range
- index = initialize_index(start=start, end=end, resolution=resolution)
- df = df.reindex(index)
- # Convert to desired unit
- values: pd.Series = convert_units( # type: ignore
- df["event_value"],
- from_unit=sensor.unit,
- to_unit=unit,
- )
- # Convert NaN to None, which JSON dumps as null values
- values = values.astype(object).where(pd.notnull(values), None)
- # Form the response
- response = dict(
- values=values.tolist(),
- start=datetime_isoformat(start),
- duration=duration_isoformat(duration),
- unit=unit,
- resolution=duration_isoformat(df.event_resolution),
- )
- return response
- class PostSensorDataSchema(SensorDataDescriptionSchema):
- """
- This schema includes data, so it can be used for POST requests
- or GET responses.
- TODO: For the GET use case, look at api/common/validators.py::get_data_downsampling_allowed
- (sets a resolution parameter which we can pass to the data collection function).
- """
- # Optional field that can be used for extra validation
- type = fields.Str(
- required=False,
- validate=OneOf(
- [
- "PostSensorDataRequest",
- "PostMeterDataRequest",
- "PostPrognosisRequest",
- "PostPriceDataRequest",
- "PostWeatherDataRequest",
- ]
- ),
- )
- values = PolyField(
- deserialization_schema_selector=select_schema_to_ensure_list_of_floats,
- serialization_schema_selector=select_schema_to_ensure_list_of_floats,
- many=False,
- )
- @validates_schema
- def check_schema_unit_against_type(self, data, **kwargs):
- posted_unit = data["unit"]
- _type = data.get("type", None)
- if _type in (
- "PostMeterDataRequest",
- "PostPrognosisRequest",
- ) and not units_are_convertible(posted_unit, "MW"):
- raise ValidationError(
- f"The unit required for this message type should be convertible to MW, got incompatible unit: {posted_unit}"
- )
- elif _type == "PostPriceDataRequest" and not is_energy_price_unit(posted_unit):
- raise ValidationError(
- f"The unit required for this message type should be convertible to an energy price unit, got incompatible unit: {posted_unit}"
- )
- @validates_schema
- def check_resolution_compatibility_of_sensor_data(self, data, **kwargs):
- """Ensure event frequency is compatible with the sensor's event resolution.
- For a sensor recording instantaneous values, any event frequency is compatible.
- For a sensor recording non-instantaneous values, the event frequency must fit the sensor's event resolution.
- Currently, only upsampling is supported (e.g. converting hourly events to 15-minute events).
- """
- required_resolution = data["sensor"].event_resolution
- if required_resolution == timedelta(hours=0):
- # For instantaneous sensors, any event frequency is compatible
- return
- # The event frequency is inferred by assuming sequential, equidistant values within a time interval.
- # The event resolution is assumed to be equal to the event frequency.
- inferred_resolution = data["duration"] / len(data["values"])
- if inferred_resolution % required_resolution != timedelta(hours=0):
- raise ValidationError(
- f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
- )
- @validates_schema
- def check_multiple_instantenous_values(self, data, **kwargs):
- """Ensure that we are not getting multiple instantaneous values that overlap.
- That is, two values spanning the same moment (a zero duration).
- """
- if len(data["values"]) > 1 and data["duration"] / len(
- data["values"]
- ) == timedelta(0):
- raise ValidationError(
- "Cannot save multiple instantaneous values that overlap. That is, two values spanning the same moment (a zero duration). Try sending a single value or definining a non-zero duration."
- )
- @post_load()
- def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame:
- """If needed, upsample and convert units, then deserialize to a BeliefsDataFrame."""
- data = self.possibly_upsample_values(data)
- data = self.possibly_convert_units(data)
- bdf = self.load_bdf(data)
- # Post-load validation against message type
- _type = data.get("type", None)
- if _type == "PostMeterDataRequest":
- if any(h > timedelta(0) for h in bdf.belief_horizons):
- raise ValidationError("Meter data must lie in the past.")
- elif _type == "PostPrognosisRequest":
- if any(h < timedelta(0) for h in bdf.belief_horizons):
- raise ValidationError("Prognoses must lie in the future.")
- return bdf
- @staticmethod
- def possibly_convert_units(data):
- """
- Convert values if needed, to fit the sensor's unit.
- Marshmallow runs this after validation.
- """
- data["values"] = convert_units(
- data["values"],
- from_unit=data["unit"],
- to_unit=data["sensor"].unit,
- event_resolution=data["sensor"].event_resolution,
- )
- return data
- @staticmethod
- def possibly_upsample_values(data):
- """
- Upsample the data if needed, to fit to the sensor's resolution.
- Marshmallow runs this after validation.
- """
- required_resolution = data["sensor"].event_resolution
- if required_resolution == timedelta(hours=0):
- # For instantaneous sensors, no need to upsample
- return data
- # The event frequency is inferred by assuming sequential, equidistant values within a time interval.
- # The event resolution is assumed to be equal to the event frequency.
- inferred_resolution = data["duration"] / len(data["values"])
- # we already know resolutions are compatible (see validation)
- if inferred_resolution != required_resolution:
- data["values"] = upsample_values(
- data["values"],
- from_resolution=inferred_resolution,
- to_resolution=required_resolution,
- )
- return data
- @staticmethod
- def load_bdf(sensor_data: dict) -> BeliefsDataFrame:
- """
- Turn the de-serialized and validated data into a BeliefsDataFrame.
- """
- source = get_or_create_source(current_user)
- num_values = len(sensor_data["values"])
- event_resolution = sensor_data["duration"] / num_values
- start = sensor_data["start"]
- sensor = sensor_data["sensor"]
- if frequency := sensor.get_attribute("frequency"):
- start = pd.Timestamp(start).round(frequency)
- if event_resolution == timedelta(hours=0):
- dt_index = pd.date_range(
- start,
- periods=num_values,
- )
- else:
- dt_index = pd.date_range(
- start,
- periods=num_values,
- freq=event_resolution,
- )
- s = pd.Series(sensor_data["values"], index=dt_index)
- # Work out what the recording time should be
- belief_timing = {}
- if "prior" in sensor_data:
- belief_timing["belief_time"] = sensor_data["prior"]
- elif "horizon" in sensor_data:
- belief_timing["belief_horizon"] = sensor_data["horizon"]
- else:
- belief_timing["belief_time"] = server_now()
- return BeliefsDataFrame(
- s,
- source=source,
- sensor=sensor_data["sensor"],
- **belief_timing,
- ).dropna()
|