data_sources.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. from __future__ import annotations
  2. import json
  3. from typing import TYPE_CHECKING, Any, ClassVar
  4. from sqlalchemy.ext.mutable import MutableDict
  5. import pandas as pd
  6. import timely_beliefs as tb
  7. from packaging.version import Version
  8. from flexmeasures.data import db
  9. from flask import current_app
  10. import hashlib
  11. from marshmallow import Schema
  12. if TYPE_CHECKING:
  13. from flexmeasures.data.models.user import User
  14. class DataGenerator:
  15. __data_generator_base__: str | None = None
  16. _data_source: DataSource | None = None
  17. _config: dict = None
  18. _parameters: dict = None
  19. _parameters_schema: Schema | None = None
  20. _config_schema: Schema | None = None
  21. _save_config: bool = True
  22. _save_parameters: bool = False
  23. def __init__(
  24. self,
  25. config: dict | None = None,
  26. save_config=True,
  27. save_parameters=False,
  28. **kwargs,
  29. ) -> None:
  30. """Base class for the Schedulers, Reporters and Forecasters.
  31. The configuration `config` stores static parameters, parameters that, if
  32. changed, trigger the creation of a new DataSource. Dynamic parameters, such as
  33. the start date, can go into the `parameters`. See docstring of the method `DataGenerator.compute` for
  34. more details. Nevertheless, the parameter `save_parameters` can be set to True if some `parameters` need
  35. to be saved to the DB. In that case, the method `_clean_parameters` is called to remove any field that is not
  36. to be persisted, e.g. time parameters which are already contained in the TimedBelief.
  37. Create a new DataGenerator with a certain configuration. There are two alternatives
  38. to define the parameters:
  39. 1. Serialized through the keyword argument `config`.
  40. 2. Deserialized, passing each parameter as keyword arguments.
  41. The configuration is validated using the schema `_config_schema`, to be defined by the subclass.
  42. `config` cannot contain the key `config` at its top level, otherwise it could conflict with the constructor keyword argument `config`
  43. when passing the config as deserialized attributes.
  44. Example:
  45. The configuration requires two parameters for the PV and consumption sensors.
  46. Option 1:
  47. dg = DataGenerator(config = {
  48. "sensor_pv" : 1,
  49. "sensor_consumption" : 2
  50. })
  51. Option 2:
  52. sensor_pv = Sensor.query.get(1)
  53. sensor_consumption = Sensor.query.get(2)
  54. dg = DataGenerator(sensor_pv = sensor_pv,
  55. sensor_consumption = sensor_consumption)
  56. :param config: serialized `config` parameters, defaults to None
  57. :param save_config: whether to save the config into the data source attributes
  58. :param save_parameters: whether to save the parameters into the data source attributes
  59. """
  60. self._save_config = save_config
  61. self._save_parameters = save_parameters
  62. if config is None and len(kwargs) > 0:
  63. self._config = kwargs
  64. DataGenerator.validate_deserialized(self._config, self._config_schema)
  65. elif config is not None:
  66. self._config = self._config_schema.load(config)
  67. elif len(kwargs) == 0:
  68. self._config = self._config_schema.load({})
  69. def _compute(self, **kwargs) -> list[dict[str, Any]]:
  70. raise NotImplementedError()
  71. def compute(self, parameters: dict | None = None, **kwargs) -> list[dict[str, Any]]:
  72. """The configuration `parameters` stores dynamic parameters, parameters that, if
  73. changed, DO NOT trigger the creation of a new DataSource. Static parameters, such as
  74. the topology of an energy system, can go into `config`.
  75. `parameters` cannot contain the key `parameters` at its top level, otherwise it could conflict with keyword argument `parameters`
  76. of the method compute when passing the `parameters` as deserialized attributes.
  77. :param parameters: serialized `parameters` parameters, defaults to None
  78. """
  79. if self._parameters is None:
  80. self._parameters = {}
  81. if parameters is None:
  82. self._parameters.update(self._parameters_schema.dump(kwargs))
  83. else:
  84. self._parameters.update(parameters)
  85. self._parameters = self._parameters_schema.load(self._parameters)
  86. return self._compute(**self._parameters)
  87. @staticmethod
  88. def validate_deserialized(values: dict, schema: Schema) -> bool:
  89. schema.load(schema.dump(values))
  90. @classmethod
  91. def get_data_source_info(cls: type) -> dict:
  92. """
  93. Create and return the data source info, from which a data source lookup/creation is possible.
  94. See for instance get_data_source_for_job().
  95. """
  96. source_info = dict(
  97. source=current_app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE")
  98. ) # default
  99. source_info["source_type"] = cls.__data_generator_base__
  100. source_info["model"] = cls.__name__
  101. return source_info
  102. @property
  103. def data_source(self) -> "DataSource":
  104. """DataSource property derived from the `source_info`: `source_type` (scheduler, forecaster or reporter), `model` (e.g AggregatorReporter)
  105. and `attributes`. It looks for a data source in the database the marges the `source_info` and, in case of not finding any, it creates a new one.
  106. This property gets created once and it's cached for the rest of the lifetime of the DataGenerator object.
  107. """
  108. from flexmeasures.data.services.data_sources import get_or_create_source
  109. if self._data_source is None:
  110. data_source_info = self.get_data_source_info()
  111. attributes = {"data_generator": {}}
  112. if self._save_config:
  113. attributes["data_generator"]["config"] = self._config_schema.dump(
  114. self._config
  115. )
  116. if self._save_parameters:
  117. attributes["data_generator"]["parameters"] = self._clean_parameters(
  118. self._parameters_schema.dump(self._parameters)
  119. )
  120. data_source_info["attributes"] = attributes
  121. self._data_source = get_or_create_source(**data_source_info)
  122. return self._data_source
  123. def _clean_parameters(self, parameters: dict) -> dict:
  124. """Use this function to clean up the parameters dictionary from the
  125. fields that are not to be persisted to the DB as data source attributes (when save_parameters=True),
  126. e.g. because they are already stored as TimedBelief properties, or otherwise.
  127. Example:
  128. An DataGenerator has the following parameters: ["start", "end", "field1", "field2"] and we want just "field1" and "field2"
  129. to be persisted.
  130. Parameters provided to the `compute` method (input of the method `_clean_parameters`):
  131. parameters = {
  132. "start" : "2023-01-01T00:00:00+02:00",
  133. "end" : "2023-01-02T00:00:00+02:00",
  134. "field1" : 1,
  135. "field2" : 2
  136. }
  137. Parameters persisted to the DB (output of the method `_clean_parameters`):
  138. parameters = {"field1" : 1,"field2" : 2}
  139. """
  140. raise NotImplementedError()
  141. class DataSource(db.Model, tb.BeliefSourceDBMixin):
  142. """Each data source is a data-providing entity."""
  143. __tablename__ = "data_source"
  144. __table_args__ = (
  145. db.UniqueConstraint("name", "user_id", "model", "version", "attributes_hash"),
  146. )
  147. # The type of data source (e.g. user, forecaster or scheduler)
  148. type = db.Column(db.String(80), default="")
  149. # The id of the user source (can link e.g. to fm_user table)
  150. user_id = db.Column(
  151. db.Integer, db.ForeignKey("fm_user.id"), nullable=True, unique=True
  152. )
  153. user = db.relationship("User", backref=db.backref("data_source", lazy=True))
  154. attributes = db.Column(MutableDict.as_mutable(db.JSON), nullable=False, default={})
  155. attributes_hash = db.Column(db.LargeBinary(length=256))
  156. # The model and version of a script source
  157. model = db.Column(db.String(80), nullable=True)
  158. version = db.Column(
  159. db.String(17), # length supports up to version 999.999.999dev999
  160. nullable=True,
  161. )
  162. sensors = db.relationship(
  163. "Sensor",
  164. secondary="timed_belief",
  165. backref=db.backref("data_sources", lazy="select"),
  166. viewonly=True,
  167. )
  168. _data_generator: ClassVar[DataGenerator | None] = None
  169. def __init__(
  170. self,
  171. name: str | None = None,
  172. type: str | None = None,
  173. user: User | None = None,
  174. attributes: dict | None = None,
  175. **kwargs,
  176. ):
  177. if user is not None:
  178. name = user.username
  179. type = "user"
  180. self.user = user
  181. elif user is None and type == "user":
  182. raise TypeError("A data source cannot have type 'user' but no user set.")
  183. self.type = type
  184. if attributes is not None:
  185. self.attributes = attributes
  186. self.attributes_hash = hashlib.sha256(
  187. json.dumps(attributes).encode("utf-8")
  188. ).digest()
  189. tb.BeliefSourceDBMixin.__init__(self, name=name)
  190. db.Model.__init__(self, **kwargs)
  191. @property
  192. def data_generator(self) -> DataGenerator:
  193. if self._data_generator:
  194. return self._data_generator
  195. data_generator = None
  196. if self.type not in ["scheduler", "forecaster", "reporter"]:
  197. raise NotImplementedError(
  198. "Only the classes Scheduler, Forecaster and Reporters are DataGenerator's."
  199. )
  200. if not self.model:
  201. raise NotImplementedError(
  202. "There's no DataGenerator class defined in this DataSource."
  203. )
  204. types = current_app.data_generators
  205. if all(
  206. [self.model not in current_app.data_generators[_type] for _type in types]
  207. ):
  208. raise NotImplementedError(
  209. "DataGenerator `{self.model}` not registered in this FlexMeasures instance."
  210. )
  211. # fetch DataGenerator details
  212. data_generator_details = self.attributes.get("data_generator", {})
  213. config = data_generator_details.get("config", {})
  214. parameters = data_generator_details.get("parameters", {})
  215. # create DataGenerator class and add the parameters
  216. data_generator = current_app.data_generators[self.type][self.model](
  217. config=config
  218. )
  219. data_generator._parameters = parameters
  220. # assign the current DataSource (self) as its source
  221. data_generator._data_source = self
  222. self._data_generator = data_generator
  223. return self._data_generator
  224. @property
  225. def label(self):
  226. """Human-readable label (preferably not starting with a capital letter, so it can be used in a sentence)."""
  227. if self.type == "user":
  228. return f"data entered by user {self.user.username}" # todo: give users a display name
  229. elif self.type == "forecaster":
  230. return f"forecast by {self.name}" # todo: give DataSource an optional db column to persist versioned models separately to the name of the data source?
  231. elif self.type == "scheduler":
  232. return f"schedule by {self.name}"
  233. elif self.type == "reporter":
  234. return f"report by {self.name}"
  235. elif self.type == "crawling script":
  236. return f"data retrieved from {self.name}"
  237. elif self.type in ("demo script", "CLI script"):
  238. return f"demo data entered by {self.name}"
  239. else:
  240. return f"data from {self.name}"
  241. @property
  242. def description(self):
  243. """Extended description.
  244. For example:
  245. >>> DataSource("Seita", type="forecaster", model="naive", version="1.2").description
  246. "Seita's naive forecaster v1.2"
  247. >>> DataSource("Seita", type="scheduler", model="StorageScheduler", version="2").description
  248. "Seita's StorageScheduler model v2"
  249. """
  250. descr = self.name
  251. if self.model:
  252. descr += f"'s {self.model} "
  253. # Mention the data source type unless the model name already mentions it
  254. descr += (
  255. self.type if self.type.lower() not in self.model.lower() else "model"
  256. )
  257. if self.version:
  258. descr += f" v{self.version}"
  259. return descr
  260. def __repr__(self) -> str:
  261. return "<Data source %r (%s)>" % (self.id, self.description)
  262. def __str__(self) -> str:
  263. return self.description
  264. def to_dict(self) -> dict:
  265. model_incl_version = self.model if self.model else ""
  266. if self.model and self.version:
  267. model_incl_version += f" (v{self.version})"
  268. if "forecast" in self.type.lower():
  269. _type = "forecaster" # e.g. 'forecaster' or 'forecasting script'
  270. elif "schedul" in self.type.lower(): # e.g. 'scheduler' or 'scheduling script'
  271. _type = "scheduler"
  272. else:
  273. _type = "other"
  274. return dict(
  275. id=self.id,
  276. name=self.name,
  277. model=model_incl_version,
  278. type=_type,
  279. description=self.description,
  280. )
  281. @staticmethod
  282. def hash_attributes(attributes: dict) -> str:
  283. return hashlib.sha256(json.dumps(attributes).encode("utf-8")).digest()
  284. def get_attribute(self, attribute: str, default: Any = None) -> Any:
  285. """Looks for the attribute in the DataSource's attributes column."""
  286. return self.attributes.get(attribute, default)
  287. def has_attribute(self, attribute: str) -> bool:
  288. return attribute in self.attributes
  289. def set_attribute(self, attribute: str, value):
  290. self.attributes[attribute] = value
  291. def keep_latest_version(
  292. bdf: tb.BeliefsDataFrame,
  293. one_deterministic_belief_per_event: bool = False,
  294. ) -> tb.BeliefsDataFrame:
  295. """Filters the BeliefsDataFrame to keep the latest version of each source, for each event.
  296. The function performs the following steps:
  297. 1. Resets the index to flatten the DataFrame.
  298. 2. Adds columns for the source's name, type, model, and version.
  299. 3. Sorts the rows by event_start and source.version in descending order.
  300. 4. Removes duplicates based on event_start, source.name, source.type, and source.model, keeping the latest version.
  301. 5. Drops the temporary columns added for source attributes.
  302. 6. Restores the original index.
  303. Parameters:
  304. -----------
  305. bdf : tb.BeliefsDataFrame
  306. The input BeliefsDataFrame containing event_start and source information.
  307. Returns:
  308. --------
  309. tb.BeliefsDataFrame
  310. A new BeliefsDataFrame containing only the latest version of each source
  311. for each event_start, with the original index restored.
  312. """
  313. if bdf.empty:
  314. return bdf
  315. # Remember the original index, then reset it
  316. index_levels = bdf.index.names
  317. bdf = bdf.reset_index()
  318. belief_column = "belief_time"
  319. if belief_column not in index_levels:
  320. belief_column = "belief_horizon"
  321. event_column = "event_start"
  322. if event_column not in index_levels:
  323. event_column = "event_end"
  324. # Add source-related columns using vectorized operations for clarity
  325. bdf[["source.name", "source.type", "source.model", "source.version"]] = bdf[
  326. "source"
  327. ].apply(
  328. lambda s: pd.Series(
  329. {
  330. "source.name": s.name,
  331. "source.type": s.type,
  332. "source.model": s.model,
  333. "source.version": Version(
  334. s.version if s.version is not None else "0.0.0"
  335. ),
  336. }
  337. )
  338. )
  339. # Sort by event_start and version, keeping only the latest version
  340. bdf = bdf.sort_values(by=[event_column, "source.version"], ascending=[True, False])
  341. # Drop duplicates based on event_start and source identifiers, keeping the latest version
  342. unique_columns = [
  343. event_column,
  344. "cumulative_probability",
  345. "source.name",
  346. "source.type",
  347. "source.model",
  348. ]
  349. if not one_deterministic_belief_per_event:
  350. unique_columns += [belief_column]
  351. bdf = bdf.drop_duplicates(unique_columns)
  352. # Remove temporary columns and restore the original index
  353. bdf = bdf.drop(
  354. columns=["source.name", "source.type", "source.model", "source.version"]
  355. )
  356. bdf = bdf.set_index(index_levels)
  357. return bdf