__init__.py 19 KB


  1. from __future__ import annotations
  2. from marshmallow import (
  3. Schema,
  4. fields,
  5. validate,
  6. validates_schema,
  7. ValidationError,
  8. pre_load,
  9. post_dump,
  10. )
  11. from flexmeasures import Sensor
  12. from flexmeasures.data.schemas.generic_assets import GenericAssetIdField
  13. from flexmeasures.data.schemas.sensors import (
  14. VariableQuantityField,
  15. SensorIdField,
  16. )
  17. from flexmeasures.data.schemas.utils import FMValidationError
  18. from flexmeasures.data.schemas.times import AwareDateTimeField, PlanningDurationField
  19. from flexmeasures.utils.flexmeasures_inflection import p
  20. from flexmeasures.utils.unit_utils import (
  21. ur,
  22. units_are_convertible,
  23. is_capacity_price_unit,
  24. is_energy_price_unit,
  25. is_power_unit,
  26. is_energy_unit,
  27. )
  28. class FlexContextSchema(Schema):
  29. """This schema defines fields that provide context to the portfolio to be optimized."""
  30. # Device commitments
  31. consumption_breach_price = VariableQuantityField(
  32. "/MW",
  33. data_key="consumption-breach-price",
  34. required=False,
  35. value_validator=validate.Range(min=0),
  36. default=None,
  37. )
  38. production_breach_price = VariableQuantityField(
  39. "/MW",
  40. data_key="production-breach-price",
  41. required=False,
  42. value_validator=validate.Range(min=0),
  43. default=None,
  44. )
  45. soc_minima_breach_price = VariableQuantityField(
  46. "/MWh",
  47. data_key="soc-minima-breach-price",
  48. required=False,
  49. value_validator=validate.Range(min=0),
  50. default=None,
  51. )
  52. soc_maxima_breach_price = VariableQuantityField(
  53. "/MWh",
  54. data_key="soc-maxima-breach-price",
  55. required=False,
  56. value_validator=validate.Range(min=0),
  57. default=None,
  58. )
  59. # Dev fields
  60. relax_soc_constraints = fields.Bool(
  61. data_key="relax-soc-constraints", load_default=False
  62. )
  63. relax_capacity_constraints = fields.Bool(
  64. data_key="relax-capacity-constraints", load_default=False
  65. )
  66. # Energy commitments
  67. ems_power_capacity_in_mw = VariableQuantityField(
  68. "MW",
  69. required=False,
  70. data_key="site-power-capacity",
  71. value_validator=validate.Range(min=0),
  72. )
  73. # todo: deprecated since flexmeasures==0.23
  74. consumption_price_sensor = SensorIdField(data_key="consumption-price-sensor")
  75. production_price_sensor = SensorIdField(data_key="production-price-sensor")
  76. consumption_price = VariableQuantityField(
  77. "/MWh",
  78. required=False,
  79. data_key="consumption-price",
  80. return_magnitude=False,
  81. )
  82. production_price = VariableQuantityField(
  83. "/MWh",
  84. required=False,
  85. data_key="production-price",
  86. return_magnitude=False,
  87. )
  88. # Capacity breach commitments
  89. ems_production_capacity_in_mw = VariableQuantityField(
  90. "MW",
  91. required=False,
  92. data_key="site-production-capacity",
  93. value_validator=validate.Range(min=0),
  94. )
  95. ems_consumption_capacity_in_mw = VariableQuantityField(
  96. "MW",
  97. required=False,
  98. data_key="site-consumption-capacity",
  99. value_validator=validate.Range(min=0),
  100. )
  101. ems_consumption_breach_price = VariableQuantityField(
  102. "/MW",
  103. data_key="site-consumption-breach-price",
  104. required=False,
  105. value_validator=validate.Range(min=0),
  106. default=None,
  107. )
  108. ems_production_breach_price = VariableQuantityField(
  109. "/MW",
  110. data_key="site-production-breach-price",
  111. required=False,
  112. value_validator=validate.Range(min=0),
  113. default=None,
  114. )
  115. # Peak consumption commitment
  116. ems_peak_consumption_in_mw = VariableQuantityField(
  117. "MW",
  118. required=False,
  119. data_key="site-peak-consumption",
  120. value_validator=validate.Range(min=0),
  121. default="0 kW",
  122. )
  123. ems_peak_consumption_price = VariableQuantityField(
  124. "/MW",
  125. data_key="site-peak-consumption-price",
  126. required=False,
  127. value_validator=validate.Range(min=0),
  128. default=None,
  129. )
  130. # Peak production commitment
  131. ems_peak_production_in_mw = VariableQuantityField(
  132. "MW",
  133. required=False,
  134. data_key="site-peak-production",
  135. value_validator=validate.Range(min=0),
  136. default="0 kW",
  137. )
  138. ems_peak_production_price = VariableQuantityField(
  139. "/MW",
  140. data_key="site-peak-production-price",
  141. required=False,
  142. value_validator=validate.Range(min=0),
  143. default=None,
  144. )
  145. # todo: group by month start (MS), something like a commitment resolution, or a list of datetimes representing splits of the commitments
  146. inflexible_device_sensors = fields.List(
  147. SensorIdField(), data_key="inflexible-device-sensors"
  148. )
  149. def set_default_breach_prices(
  150. self, data: dict, fields: list[str], price: ur.Quantity
  151. ):
  152. """Fill in default breach prices.
  153. This relies on _try_to_convert_price_units to run first, setting a shared currency unit.
  154. """
  155. for field in fields:
  156. # use the same denominator as defined in the field
  157. data[field] = price.to(
  158. data["shared_currency_unit"]
  159. + "/"
  160. + self.declared_fields[field].to_unit.split("/")[-1]
  161. )
  162. return data
  163. @validates_schema
  164. def check_prices(self, data: dict, **kwargs):
  165. """Check assumptions about prices.
  166. 1. The flex-context must contain at most 1 consumption price and at most 1 production price field.
  167. 2. All prices must share the same currency.
  168. """
  169. # The flex-context must contain at most 1 consumption price and at most 1 production price field
  170. if "consumption_price_sensor" in data and "consumption_price" in data:
  171. raise ValidationError(
  172. "Must pass either consumption-price or consumption-price-sensor."
  173. )
  174. if "production_price_sensor" in data and "production_price" in data:
  175. raise ValidationError(
  176. "Must pass either production-price or production-price-sensor."
  177. )
  178. # New price fields can only be used after updating to the new consumption-price and production-price fields
  179. field_map = {
  180. field.data_key: field_var
  181. for field_var, field in self.declared_fields.items()
  182. }
  183. if any(
  184. field_map[field] in data and data[field_map[field]]
  185. for field in (
  186. "soc-minima-breach-price",
  187. "soc-maxima-breach-price",
  188. "site-consumption-breach-price",
  189. "site-production-breach-price",
  190. "site-peak-consumption-price",
  191. "site-peak-production-price",
  192. "relax-soc-constraints",
  193. "relax-capacity-constraints",
  194. "consumption-breach-price",
  195. "production-breach-price",
  196. )
  197. ):
  198. if field_map["consumption-price-sensor"] in data:
  199. raise ValidationError(
  200. f"""Please switch to using `consumption-price: {{"sensor": {data[field_map["consumption-price-sensor"]].id}}}`."""
  201. )
  202. if field_map["production-price-sensor"] in data:
  203. raise ValidationError(
  204. f"""Please switch to using `production-price: {{"sensor": {data[field_map["production-price-sensor"]].id}}}`."""
  205. )
  206. # make sure that the prices fields are valid price units
  207. # All prices must share the same unit
  208. data = self._try_to_convert_price_units(data)
  209. # Fill in default soc breach prices when asked to relax SoC constraints.
  210. if data["relax_soc_constraints"]:
  211. self.set_default_breach_prices(
  212. data,
  213. fields=["soc_minima_breach_price", "soc_maxima_breach_price"],
  214. price=ur.Quantity("1000 EUR/kWh"),
  215. )
  216. # Fill in default capacity breach prices when asked to relax capacity constraints.
  217. if data["relax_capacity_constraints"]:
  218. self.set_default_breach_prices(
  219. data,
  220. fields=["consumption_breach_price", "production_breach_price"],
  221. price=ur.Quantity("100 EUR/kW"),
  222. )
  223. return data
  224. def _try_to_convert_price_units(self, data):
  225. """Convert price units to the same unit and scale if they can (incl. same currency)."""
  226. shared_currency_unit = None
  227. previous_field_name = None
  228. for field in self.declared_fields:
  229. if field[-5:] == "price" and field in data:
  230. price_field = self.declared_fields[field]
  231. price_unit = price_field._get_unit(data[field])
  232. currency_unit = str(
  233. (
  234. ur.Quantity(price_unit) / ur.Quantity(f"1{price_field.to_unit}")
  235. ).units
  236. )
  237. if shared_currency_unit is None:
  238. shared_currency_unit = str(
  239. ur.Quantity(currency_unit).to_base_units().units
  240. )
  241. previous_field_name = price_field.data_key
  242. if units_are_convertible(currency_unit, shared_currency_unit):
  243. # Make sure all compatible currency units are on the same scale (e.g. not kEUR mixed with EUR)
  244. if currency_unit != shared_currency_unit:
  245. denominator_unit = str(
  246. ur.Unit(currency_unit) / ur.Unit(price_unit)
  247. )
  248. if isinstance(data[field], ur.Quantity):
  249. data[field] = data[field].to(
  250. f"{shared_currency_unit}/({denominator_unit})"
  251. )
  252. elif isinstance(data[field], list):
  253. for j in range(len(data[field])):
  254. data[field][j]["value"] = data[field][j]["value"].to(
  255. f"{shared_currency_unit}/({denominator_unit})"
  256. )
  257. elif isinstance(data[field], Sensor):
  258. raise ValidationError(
  259. f"Please convert all flex-context prices to the unit of the {data[field]} sensor ({price_unit})."
  260. )
  261. else:
  262. field_name = price_field.data_key
  263. raise ValidationError(
  264. f"Prices must share the same monetary unit. '{field_name}' uses '{currency_unit}', but '{previous_field_name}' used '{shared_currency_unit}'.",
  265. field_name=field_name,
  266. )
  267. data["shared_currency_unit"] = shared_currency_unit
  268. return data
  269. class DBFlexContextSchema(FlexContextSchema):
  270. mapped_schema_keys = {
  271. field: FlexContextSchema().declared_fields[field].data_key
  272. for field in FlexContextSchema().declared_fields
  273. }
  274. @validates_schema
  275. def forbid_time_series_specs(self, data: dict, **kwargs):
  276. """Do not allow time series specs for the flex-context fields saved in the db."""
  277. # List of keys to check for time series specs
  278. keys_to_check = []
  279. # All the keys in this list are all fields of type VariableQuantity
  280. for field_var, field in self.declared_fields.items():
  281. if isinstance(field, VariableQuantityField):
  282. keys_to_check.append((field_var, field))
  283. # Check each key and raise a ValidationError if it's a list
  284. for field_var, field in keys_to_check:
  285. if field_var in data and isinstance(data[field_var], list):
  286. raise ValidationError(
  287. "A time series specification (listing segments) is not supported when storing flex-context fields. Use a fixed quantity or a sensor reference instead.",
  288. field_name=field.data_key,
  289. )
  290. @validates_schema
  291. def validate_fields_unit(self, data: dict, **kwargs):
  292. """Check that each field value has a valid unit."""
  293. self._validate_price_fields(data)
  294. self._validate_power_fields(data)
  295. self._validate_inflexible_device_sensors(data)
  296. def _validate_price_fields(self, data: dict):
  297. """Validate price fields."""
  298. energy_price_fields = [
  299. "consumption_price",
  300. "production_price",
  301. ]
  302. capacity_price_fields = [
  303. "ems_consumption_breach_price",
  304. "ems_production_breach_price",
  305. "ems_peak_consumption_price",
  306. "ems_peak_production_price",
  307. ]
  308. # Check that consumption and production prices are Sensors
  309. self._forbid_fixed_prices(data)
  310. for field in energy_price_fields:
  311. if field in data:
  312. self._validate_field(data, "energy price", field, is_energy_price_unit)
  313. for field in capacity_price_fields:
  314. if field in data:
  315. self._validate_field(
  316. data, "capacity price", field, is_capacity_price_unit
  317. )
  318. def _validate_power_fields(self, data: dict):
  319. """Validate power fields."""
  320. power_fields = [
  321. "ems_power_capacity_in_mw",
  322. "ems_production_capacity_in_mw",
  323. "ems_consumption_capacity_in_mw",
  324. "ems_peak_consumption_in_mw",
  325. "ems_peak_production_in_mw",
  326. ]
  327. for field in power_fields:
  328. if field in data:
  329. self._validate_field(data, "power", field, is_power_unit)
  330. def _validate_field(self, data: dict, field_type: str, field: str, unit_validator):
  331. """Validate fields based on type and unit validator."""
  332. if isinstance(data[field], ur.Quantity):
  333. if not unit_validator(str(data[field].units)):
  334. raise ValidationError(
  335. f"{field_type.capitalize()} field '{self.mapped_schema_keys[field]}' must have {p.a(field_type)} unit.",
  336. field_name=self.mapped_schema_keys[field],
  337. )
  338. elif isinstance(data[field], Sensor):
  339. if not unit_validator(data[field].unit):
  340. raise ValidationError(
  341. f"{field_type.capitalize()} field '{self.mapped_schema_keys[field]}' must have {p.a(field_type)} unit.",
  342. field_name=self.mapped_schema_keys[field],
  343. )
  344. def _validate_inflexible_device_sensors(self, data: dict):
  345. """Validate inflexible device sensors."""
  346. if "inflexible_device_sensors" in data:
  347. for sensor in data["inflexible_device_sensors"]:
  348. if not is_power_unit(sensor.unit) and not is_energy_unit(sensor.unit):
  349. raise ValidationError(
  350. f"Inflexible device sensor '{sensor.id}' must have a power or energy unit.",
  351. field_name="inflexible-device-sensors",
  352. )
  353. def _forbid_fixed_prices(self, data: dict, **kwargs):
  354. """Do not allow fixed consumption price or fixed production price in the flex-context fields saved in the db.
  355. This is a temporary restriction as future iterations will allow fixed prices on these fields as well.
  356. """
  357. if "consumption_price" in data and isinstance(
  358. data["consumption_price"], ur.Quantity
  359. ):
  360. raise ValidationError(
  361. "Fixed prices are not currently supported for consumption-price in flex-context fields in the DB.",
  362. field_name="consumption-price",
  363. )
  364. if "production_price" in data and isinstance(
  365. data["production_price"], ur.Quantity
  366. ):
  367. raise ValidationError(
  368. "Fixed prices are not currently supported for production-price in flex-context fields in the DB.",
  369. field_name="production-price",
  370. )
  371. class MultiSensorFlexModelSchema(Schema):
  372. """
  373. This schema is agnostic to the underlying type of flex-model, which is governed by the chosen Scheduler instead.
  374. Therefore, the underlying type of flex-model is not deserialized.
  375. So:
  376. {
  377. "sensor": 1,
  378. "soc-at-start": "10 kWh"
  379. }
  380. becomes:
  381. {
  382. "sensor": <Sensor 1>,
  383. "sensor_flex_model": {
  384. "soc-at-start": "10 kWh"
  385. }
  386. }
  387. """
  388. sensor = SensorIdField(required=True)
  389. # it's up to the Scheduler to deserialize the underlying flex-model
  390. sensor_flex_model = fields.Dict(data_key="sensor-flex-model")
  391. @pre_load
  392. def unwrap_envelope(self, data, **kwargs):
  393. """Any field other than 'sensor' becomes part of the sensor's flex-model."""
  394. extra = {}
  395. rest = {}
  396. for k, v in data.items():
  397. if k not in self.fields:
  398. extra[k] = v
  399. else:
  400. rest[k] = v
  401. return {"sensor-flex-model": extra, **rest}
  402. @post_dump
  403. def wrap_with_envelope(self, data, **kwargs):
  404. """Any field in the 'sensor-flex-model' field becomes a main field."""
  405. sensor_flex_model = data.pop("sensor-flex-model", {})
  406. return dict(**data, **sensor_flex_model)
  407. class AssetTriggerSchema(Schema):
  408. """
  409. {
  410. "start": "2025-01-21T15:00+01",
  411. "flex-model": [
  412. {
  413. "sensor": 1,
  414. "soc-at-start": "10 kWh"
  415. },
  416. {
  417. "sensor": 2,
  418. "soc-at-start": "20 kWh"
  419. },
  420. ]
  421. }
  422. """
  423. asset = GenericAssetIdField(data_key="id")
  424. start_of_schedule = AwareDateTimeField(
  425. data_key="start", format="iso", required=True
  426. )
  427. belief_time = AwareDateTimeField(format="iso", data_key="prior")
  428. duration = PlanningDurationField(load_default=PlanningDurationField.load_default)
  429. flex_model = fields.List(
  430. fields.Nested(MultiSensorFlexModelSchema()),
  431. data_key="flex-model",
  432. )
  433. flex_context = fields.Dict(required=False, data_key="flex-context")
  434. @validates_schema
  435. def check_flex_model_sensors(self, data, **kwargs):
  436. """Verify that the flex-model's sensors live under the asset for which a schedule is triggered."""
  437. asset = data["asset"]
  438. sensors = []
  439. for sensor_flex_model in data["flex_model"]:
  440. sensor = sensor_flex_model["sensor"]
  441. if sensor in sensors:
  442. raise FMValidationError(
  443. f"Sensor {sensor_flex_model['sensor'].id} should not occur more than once in the flex-model"
  444. )
  445. if sensor.generic_asset not in [asset] + asset.offspring:
  446. raise FMValidationError(
  447. f"Sensor {sensor_flex_model['sensor'].id} does not belong to asset {asset.id} (or to one of its offspring)"
  448. )
  449. sensors.append(sensor)
  450. return data