time_series.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
  1. from __future__ import annotations
  2. from typing import Any, Type
  3. from datetime import datetime as datetime_type, timedelta
  4. import json
  5. from packaging.version import Version
  6. from flask import current_app
  7. import pandas as pd
  8. from sqlalchemy import select
  9. from sqlalchemy.ext.declarative import declared_attr
  10. from sqlalchemy.ext.mutable import MutableDict
  11. from sqlalchemy.schema import UniqueConstraint
  12. from sqlalchemy import inspect
  13. import timely_beliefs as tb
  14. from timely_beliefs.beliefs.probabilistic_utils import get_median_belief
  15. import timely_beliefs.utils as tb_utils
  16. from flexmeasures.auth.policy import AuthModelMixin, ACCOUNT_ADMIN_ROLE
  17. from flexmeasures.data import db
  18. from flexmeasures.data.models.data_sources import keep_latest_version
  19. from flexmeasures.data.models.parsing_utils import parse_source_arg
  20. from flexmeasures.data.services.annotations import prepare_annotations_for_chart
  21. from flexmeasures.data.services.timerange import get_timerange
  22. from flexmeasures.data.queries.utils import get_source_criteria
  23. from flexmeasures.data.services.time_series import aggregate_values
  24. from flexmeasures.utils.entity_address_utils import (
  25. EntityAddressException,
  26. build_entity_address,
  27. )
  28. from flexmeasures.utils.unit_utils import (
  29. is_energy_unit,
  30. is_power_unit,
  31. is_energy_price_unit,
  32. )
  33. from flexmeasures.data.models.annotations import (
  34. Annotation,
  35. SensorAnnotationRelationship,
  36. to_annotation_frame,
  37. )
  38. from flexmeasures.data.models.charts import chart_type_to_chart_specs
  39. from flexmeasures.data.models.data_sources import DataSource
  40. from flexmeasures.data.models.generic_assets import GenericAsset
  41. from flexmeasures.data.models.validation_utils import check_required_attributes
  42. from flexmeasures.data.queries.sensors import query_sensors_by_proximity
  43. from flexmeasures.utils.geo_utils import parse_lat_lng
  44. class Sensor(db.Model, tb.SensorDBMixin, AuthModelMixin):
  45. """A sensor measures events."""
  46. attributes = db.Column(MutableDict.as_mutable(db.JSON), nullable=False, default={})
  47. generic_asset_id = db.Column(
  48. db.Integer,
  49. db.ForeignKey("generic_asset.id", ondelete="CASCADE"),
  50. nullable=False,
  51. )
  52. generic_asset = db.relationship(
  53. "GenericAsset",
  54. foreign_keys=[generic_asset_id],
  55. backref=db.backref(
  56. "sensors", lazy=True, cascade="all, delete-orphan", passive_deletes=True
  57. ),
  58. )
  59. annotations = db.relationship(
  60. "Annotation",
  61. secondary="annotations_sensors",
  62. backref=db.backref("sensors", lazy="dynamic"),
  63. )
  64. def get_path(self, separator: str = ">"):
  65. return (
  66. f"{self.generic_asset.get_path(separator=separator)}{separator}{self.name}"
  67. )
  68. def __init__(
  69. self,
  70. name: str,
  71. generic_asset: GenericAsset | None = None,
  72. generic_asset_id: int | None = None,
  73. attributes: dict | None = None,
  74. **kwargs,
  75. ):
  76. assert (generic_asset is None) ^ (
  77. generic_asset_id is None
  78. ), "Either generic_asset_id or generic_asset must be set."
  79. tb.SensorDBMixin.__init__(self, name, **kwargs)
  80. tb_utils.remove_class_init_kwargs(tb.SensorDBMixin, kwargs)
  81. if generic_asset is not None:
  82. kwargs["generic_asset"] = generic_asset
  83. else:
  84. kwargs["generic_asset_id"] = generic_asset_id
  85. if attributes is not None:
  86. kwargs["attributes"] = attributes
  87. db.Model.__init__(self, **kwargs)
  88. __table_args__ = (
  89. UniqueConstraint(
  90. "name",
  91. "generic_asset_id",
  92. name="sensor_name_generic_asset_id_key",
  93. ),
  94. )
  95. def __acl__(self):
  96. """
  97. We allow reading to whoever can read the asset.
  98. Editing as well as deletion is left to account admins.
  99. """
  100. return {
  101. "create-children": f"account:{self.generic_asset.account_id}",
  102. "read": self.generic_asset.__acl__()["read"],
  103. "update": (
  104. f"account:{self.generic_asset.account_id}",
  105. f"role:{ACCOUNT_ADMIN_ROLE}",
  106. ),
  107. "delete": (
  108. f"account:{self.generic_asset.account_id}",
  109. f"role:{ACCOUNT_ADMIN_ROLE}",
  110. ),
  111. }
  112. @property
  113. def entity_address(self) -> str:
  114. try:
  115. return build_entity_address(dict(sensor_id=self.id), "sensor")
  116. except EntityAddressException as eae:
  117. current_app.logger.warn(
  118. f"Problems generating entity address for sensor {self}: {eae}"
  119. )
  120. return "no entity address available"
  121. @property
  122. def location(self) -> tuple[float, float] | None:
  123. location = (self.get_attribute("latitude"), self.get_attribute("longitude"))
  124. if None not in location:
  125. return location
  126. @property
  127. def measures_power(self) -> bool:
  128. """True if this sensor's unit is measuring power"""
  129. return is_power_unit(self.unit)
  130. @property
  131. def measures_energy(self) -> bool:
  132. """True if this sensor's unit is measuring energy"""
  133. return is_energy_unit(self.unit)
  134. @property
  135. def measures_energy_price(self) -> bool:
  136. """True if this sensors' unit is measuring energy prices"""
  137. return is_energy_price_unit(self.unit)
  138. @property
  139. def is_strictly_non_positive(self) -> bool:
  140. """Return True if this sensor strictly records non-positive values."""
  141. return self.get_attribute("is_consumer", False) and not self.get_attribute(
  142. "is_producer", True
  143. )
  144. @property
  145. def is_strictly_non_negative(self) -> bool:
  146. """Return True if this sensor strictly records non-negative values."""
  147. return self.get_attribute("is_producer", False) and not self.get_attribute(
  148. "is_consumer", True
  149. )
  150. def get_attribute(self, attribute: str, default: Any = None) -> Any:
  151. """Looks for the attribute on the Sensor.
  152. If not found, looks for the attribute on the Sensor's GenericAsset.
  153. If not found, returns the default.
  154. """
  155. if hasattr(self, attribute):
  156. return getattr(self, attribute)
  157. if attribute in self.attributes:
  158. return self.attributes[attribute]
  159. if hasattr(self.generic_asset, attribute):
  160. return getattr(self.generic_asset, attribute)
  161. if attribute in self.generic_asset.attributes:
  162. return self.generic_asset.attributes[attribute]
  163. if hasattr(self.generic_asset.flex_context, attribute):
  164. return getattr(self.generic_asset.flex_context, attribute)
  165. if (
  166. self.generic_asset.flex_context
  167. and attribute in self.generic_asset.flex_context
  168. ):
  169. return self.generic_asset.flex_context[attribute]
  170. return default
  171. def has_attribute(self, attribute: str) -> bool:
  172. return (
  173. attribute in self.attributes or attribute in self.generic_asset.attributes
  174. )
  175. def set_attribute(self, attribute: str, value):
  176. if self.has_attribute(attribute):
  177. self.attributes[attribute] = value
  178. def check_required_attributes(
  179. self,
  180. attributes: list[str | tuple[str, Type | tuple[Type, ...]]],
  181. ):
  182. """Raises if any attribute in the list of attributes is missing, or has the wrong type.
  183. :param attributes: List of either an attribute name or a tuple of an attribute name and its allowed type
  184. (the allowed type may also be a tuple of several allowed types)
  185. """
  186. check_required_attributes(self, attributes)
  187. def latest_state(
  188. self,
  189. source: (
  190. DataSource | list[DataSource] | int | list[int] | str | list[str] | None
  191. ) = None,
  192. ) -> tb.BeliefsDataFrame:
  193. """Search the most recent event for this sensor, and return the most recent ex-post belief.
  194. :param source: search only beliefs by this source (pass the DataSource, or its name or id) or list of sources
  195. """
  196. return self.search_beliefs(
  197. horizons_at_most=timedelta(0),
  198. source=source,
  199. most_recent_beliefs_only=True,
  200. most_recent_events_only=True,
  201. one_deterministic_belief_per_event=True,
  202. )
  203. def search_annotations(
  204. self,
  205. annotation_starts_after: datetime_type | None = None, # deprecated
  206. annotations_after: datetime_type | None = None,
  207. annotation_ends_before: datetime_type | None = None, # deprecated
  208. annotations_before: datetime_type | None = None,
  209. source: (
  210. DataSource | list[DataSource] | int | list[int] | str | list[str] | None
  211. ) = None,
  212. include_asset_annotations: bool = False,
  213. include_account_annotations: bool = False,
  214. as_frame: bool = False,
  215. ) -> list[Annotation] | pd.DataFrame:
  216. """Return annotations assigned to this sensor, and optionally, also those assigned to the sensor's asset and the asset's account.
  217. :param annotations_after: only return annotations that end after this datetime (exclusive)
  218. :param annotations_before: only return annotations that start before this datetime (exclusive)
  219. """
  220. # todo: deprecate the 'annotation_starts_after' argument in favor of 'annotations_after' (announced v0.11.0)
  221. annotations_after = tb_utils.replace_deprecated_argument(
  222. "annotation_starts_after",
  223. annotation_starts_after,
  224. "annotations_after",
  225. annotations_after,
  226. required_argument=False,
  227. )
  228. # todo: deprecate the 'annotation_ends_before' argument in favor of 'annotations_before' (announced v0.11.0)
  229. annotations_before = tb_utils.replace_deprecated_argument(
  230. "annotation_ends_before",
  231. annotation_ends_before,
  232. "annotations_before",
  233. annotations_before,
  234. required_argument=False,
  235. )
  236. parsed_sources = parse_source_arg(source)
  237. query = (
  238. select(Annotation)
  239. .join(SensorAnnotationRelationship)
  240. .filter(
  241. SensorAnnotationRelationship.sensor_id == self.id,
  242. SensorAnnotationRelationship.annotation_id == Annotation.id,
  243. )
  244. )
  245. if annotations_after is not None:
  246. query = query.filter(
  247. Annotation.end > annotations_after,
  248. )
  249. if annotations_before is not None:
  250. query = query.filter(
  251. Annotation.start < annotations_before,
  252. )
  253. if parsed_sources:
  254. query = query.filter(
  255. Annotation.source.in_(parsed_sources),
  256. )
  257. annotations = db.session.scalars(query).all()
  258. if include_asset_annotations:
  259. annotations += self.generic_asset.search_annotations(
  260. annotations_after=annotations_after,
  261. annotations_before=annotations_before,
  262. source=source,
  263. )
  264. if include_account_annotations:
  265. annotations += self.generic_asset.owner.search_annotations(
  266. annotations_after=annotations_after,
  267. annotations_before=annotations_before,
  268. source=source,
  269. )
  270. return to_annotation_frame(annotations) if as_frame else annotations
  271. def search_beliefs(
  272. self,
  273. event_starts_after: datetime_type | None = None,
  274. event_ends_before: datetime_type | None = None,
  275. beliefs_after: datetime_type | None = None,
  276. beliefs_before: datetime_type | None = None,
  277. horizons_at_least: timedelta | None = None,
  278. horizons_at_most: timedelta | None = None,
  279. source: (
  280. DataSource | list[DataSource] | int | list[int] | str | list[str] | None
  281. ) = None,
  282. user_source_ids: int | list[int] | None = None,
  283. source_types: list[str] | None = None,
  284. exclude_source_types: list[str] | None = None,
  285. use_latest_version_per_event: bool = True,
  286. most_recent_beliefs_only: bool = True,
  287. most_recent_events_only: bool = False,
  288. most_recent_only: bool = False,
  289. one_deterministic_belief_per_event: bool = False,
  290. one_deterministic_belief_per_event_per_source: bool = False,
  291. as_json: bool = False,
  292. resolution: str | timedelta | None = None,
  293. ) -> tb.BeliefsDataFrame | str:
  294. """Search all beliefs about events for this sensor.
  295. If you don't set any filters, you get the most recent beliefs about all events.
  296. :param event_starts_after: only return beliefs about events that start after this datetime (inclusive)
  297. :param event_ends_before: only return beliefs about events that end before this datetime (inclusive)
  298. :param beliefs_after: only return beliefs formed after this datetime (inclusive)
  299. :param beliefs_before: only return beliefs formed before this datetime (inclusive)
  300. :param horizons_at_least: only return beliefs with a belief horizon equal or greater than this timedelta (for example, use timedelta(0) to get ante knowledge time beliefs)
  301. :param horizons_at_most: only return beliefs with a belief horizon equal or less than this timedelta (for example, use timedelta(0) to get post knowledge time beliefs)
  302. :param source: search only beliefs by this source (pass the DataSource, or its name or id) or list of sources. Without this set and a most recent parameter used (see below), the results can be of any source.
  303. :param user_source_ids: Optional list of user source ids to query only specific user sources
  304. :param source_types: Optional list of source type names to query only specific source types *
  305. :param exclude_source_types: Optional list of source type names to exclude specific source types *
  306. :param use_latest_version_per_event: only return the belief from the latest version of a source, for each event
  307. :param most_recent_beliefs_only: only return the most recent beliefs for each event from each source (minimum belief horizon). Defaults to True.
  308. :param most_recent_events_only: only return (post knowledge time) beliefs for the most recent event (maximum event start). Defaults to False.
  309. :param most_recent_only: only return a single belief, the most recent from the most recent event. Fastest method if you only need one. Defaults to False. To use, also set most_recent_beliefs_only=False. Use with care when data uses cumulative probability (more than one belief per event_start and horizon).
  310. :param one_deterministic_belief_per_event: only return a single value per event (no probabilistic distribution and only 1 source)
  311. :param one_deterministic_belief_per_event_per_source: only return a single value per event per source (no probabilistic distribution)
  312. :param as_json: return beliefs in JSON format (e.g. for use in charts) rather than as BeliefsDataFrame
  313. :param resolution: optionally set the resolution of data being displayed
  314. :returns: BeliefsDataFrame or JSON string (if as_json is True)
  315. """
  316. bdf = TimedBelief.search(
  317. sensors=self,
  318. event_starts_after=event_starts_after,
  319. event_ends_before=event_ends_before,
  320. beliefs_after=beliefs_after,
  321. beliefs_before=beliefs_before,
  322. horizons_at_least=horizons_at_least,
  323. horizons_at_most=horizons_at_most,
  324. source=source,
  325. user_source_ids=user_source_ids,
  326. source_types=source_types,
  327. exclude_source_types=exclude_source_types,
  328. use_latest_version_per_event=use_latest_version_per_event,
  329. most_recent_beliefs_only=most_recent_beliefs_only,
  330. most_recent_events_only=most_recent_events_only,
  331. most_recent_only=most_recent_only,
  332. one_deterministic_belief_per_event=one_deterministic_belief_per_event,
  333. one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source,
  334. resolution=resolution,
  335. )
  336. if as_json:
  337. df = bdf.reset_index()
  338. df["sensor"] = self
  339. df["sensor"] = df["sensor"].apply(lambda x: x.to_dict())
  340. df["source"] = df["source"].apply(lambda x: x.to_dict())
  341. return df.to_json(orient="records")
  342. return bdf
  343. def chart(
  344. self,
  345. chart_type: str = "bar_chart",
  346. event_starts_after: datetime_type | None = None,
  347. event_ends_before: datetime_type | None = None,
  348. beliefs_after: datetime_type | None = None,
  349. beliefs_before: datetime_type | None = None,
  350. source: (
  351. DataSource | list[DataSource] | int | list[int] | str | list[str] | None
  352. ) = None,
  353. most_recent_beliefs_only: bool = True,
  354. include_data: bool = False,
  355. include_sensor_annotations: bool = False,
  356. include_asset_annotations: bool = False,
  357. include_account_annotations: bool = False,
  358. dataset_name: str | None = None,
  359. resolution: str | timedelta | None = None,
  360. **kwargs,
  361. ) -> dict:
  362. """Create a vega-lite chart showing sensor data.
  363. :param chart_type: currently only "bar_chart" # todo: where can we properly list the available chart types?
  364. :param event_starts_after: only return beliefs about events that start after this datetime (inclusive)
  365. :param event_ends_before: only return beliefs about events that end before this datetime (inclusive)
  366. :param beliefs_after: only return beliefs formed after this datetime (inclusive)
  367. :param beliefs_before: only return beliefs formed before this datetime (inclusive)
  368. :param source: search only beliefs by this source (pass the DataSource, or its name or id) or list of sources
  369. :param most_recent_beliefs_only: only return the most recent beliefs for each event from each source (minimum belief horizon)
  370. :param include_data: if True, include data in the chart, or if False, exclude data
  371. :param include_sensor_annotations: if True and include_data is True, include sensor annotations in the chart, or if False, exclude these
  372. :param include_asset_annotations: if True and include_data is True, include asset annotations in the chart, or if False, exclude them
  373. :param include_account_annotations: if True and include_data is True, include account annotations in the chart, or if False, exclude them
  374. :param dataset_name: optionally name the dataset used in the chart (the default name is sensor_<id>)
  375. :param resolution: optionally set the resolution of data being displayed
  376. :returns: JSON string defining vega-lite chart specs
  377. """
  378. # Set up chart specification
  379. if dataset_name is None:
  380. dataset_name = "sensor_" + str(self.id)
  381. self.sensor_type = self.get_attribute("sensor_type", self.name)
  382. if event_starts_after:
  383. kwargs["event_starts_after"] = event_starts_after
  384. if event_ends_before:
  385. kwargs["event_ends_before"] = event_ends_before
  386. chart_specs = chart_type_to_chart_specs(
  387. chart_type,
  388. sensor=self,
  389. dataset_name=dataset_name,
  390. include_annotations=include_sensor_annotations
  391. or include_asset_annotations
  392. or include_account_annotations,
  393. **kwargs,
  394. )
  395. if include_data:
  396. # Get data
  397. data = self.search_beliefs(
  398. as_json=True,
  399. event_starts_after=event_starts_after,
  400. event_ends_before=event_ends_before,
  401. beliefs_after=beliefs_after,
  402. beliefs_before=beliefs_before,
  403. most_recent_beliefs_only=most_recent_beliefs_only,
  404. source=source,
  405. resolution=resolution,
  406. )
  407. # Get annotations
  408. if include_sensor_annotations:
  409. annotations_df = self.search_annotations(
  410. annotations_after=event_starts_after,
  411. annotations_before=event_ends_before,
  412. include_asset_annotations=include_asset_annotations,
  413. include_account_annotations=include_account_annotations,
  414. as_frame=True,
  415. )
  416. elif include_asset_annotations:
  417. annotations_df = self.generic_asset.search_annotations(
  418. annotations_after=event_starts_after,
  419. annotations_before=event_ends_before,
  420. include_account_annotations=include_account_annotations,
  421. as_frame=True,
  422. )
  423. elif include_account_annotations:
  424. annotations_df = self.generic_asset.owner.search_annotations(
  425. annotations_after=event_starts_after,
  426. annotations_before=event_ends_before,
  427. as_frame=True,
  428. )
  429. else:
  430. annotations_df = to_annotation_frame([])
  431. # Wrap and stack annotations
  432. annotations_df = prepare_annotations_for_chart(annotations_df)
  433. # Annotations to JSON records
  434. annotations_df = annotations_df.reset_index()
  435. annotations_df["source"] = annotations_df["source"].astype(str)
  436. annotations_data = annotations_df.to_json(orient="records")
  437. # Combine chart specs, data and annotations
  438. chart_specs["datasets"] = {
  439. dataset_name: json.loads(data),
  440. dataset_name + "_annotations": json.loads(annotations_data),
  441. }
  442. return chart_specs
  443. @property
  444. def timerange(self) -> dict[str, datetime_type]:
  445. """Time range for which sensor data exists.
  446. :returns: dictionary with start and end, for example:
  447. {
  448. 'start': datetime.datetime(2020, 12, 3, 14, 0, tzinfo=pytz.utc),
  449. 'end': datetime.datetime(2020, 12, 3, 14, 30, tzinfo=pytz.utc)
  450. }
  451. """
  452. start, end = get_timerange([self.id])
  453. return dict(start=start, end=end)
  454. def __repr__(self) -> str:
  455. return f"<Sensor {self.id}: {self.name}, unit: {self.unit} res.: {self.event_resolution}>"
  456. def __str__(self) -> str:
  457. return self.name
  458. def to_dict(self) -> dict:
  459. return dict(
  460. id=self.id,
  461. name=self.name,
  462. description=f"{self.name} ({self.generic_asset.name})",
  463. )
  464. @classmethod
  465. def find_closest(
  466. cls, generic_asset_type_name: str, sensor_name: str, n: int = 1, **kwargs
  467. ) -> "Sensor" | list["Sensor"] | None:
  468. """Returns the closest n sensors within a given asset type (as a list if n > 1).
  469. Parses latitude and longitude values stated in kwargs.
  470. Can be called with an object that has latitude and longitude properties, for example:
  471. sensor = Sensor.find_closest("weather station", "wind speed", object=generic_asset)
  472. Can also be called with latitude and longitude parameters, for example:
  473. sensor = Sensor.find_closest("weather station", "temperature", latitude=32, longitude=54)
  474. sensor = Sensor.find_closest("weather station", "temperature", lat=32, lng=54)
  475. Finally, pass in an account_id parameter if you want to query an account other than your own. This only works for admins. Public assets are always queried.
  476. """
  477. latitude, longitude = parse_lat_lng(kwargs)
  478. account_id_filter = kwargs["account_id"] if "account_id" in kwargs else None
  479. query = query_sensors_by_proximity(
  480. latitude=latitude,
  481. longitude=longitude,
  482. generic_asset_type_name=generic_asset_type_name,
  483. sensor_name=sensor_name,
  484. account_id=account_id_filter,
  485. )
  486. if n == 1:
  487. return db.session.scalars(query.limit(1)).first()
  488. else:
  489. return db.session.scalars(query.limit(n)).all()
  490. def make_hashable(self) -> tuple:
  491. """Returns a tuple with the properties subject to change
  492. In principle all properties (except ID) of a given sensor could be changed, but not all changes are relevant to warrant reanalysis (e.g. scheduling or forecasting).
  493. """
  494. return (self.id, self.attributes, self.generic_asset.attributes)
  495. def search_data_sources(
  496. self,
  497. event_starts_after: datetime_type | None = None,
  498. event_ends_after: datetime_type | None = None,
  499. event_starts_before: datetime_type | None = None,
  500. event_ends_before: datetime_type | None = None,
  501. source_types: list[str] | None = None,
  502. exclude_source_types: list[str] | None = None,
  503. ) -> list[DataSource]:
  504. q = select(DataSource).join(TimedBelief).filter(TimedBelief.sensor == self)
  505. # todo: refactor to use apply_event_timing_filters from timely-beliefs
  506. if event_starts_after:
  507. q = q.filter(TimedBelief.event_start >= event_starts_after)
  508. if not pd.isnull(event_ends_after):
  509. if self.event_resolution == timedelta(0):
  510. # inclusive
  511. q = q.filter(TimedBelief.event_start >= event_ends_after)
  512. else:
  513. # exclusive
  514. q = q.filter(
  515. TimedBelief.event_start > event_ends_after - self.event_resolution
  516. )
  517. if not pd.isnull(event_starts_before):
  518. if self.event_resolution == timedelta(0):
  519. # inclusive
  520. q = q.filter(TimedBelief.event_start <= event_starts_before)
  521. else:
  522. # exclusive
  523. q = q.filter(TimedBelief.event_start < event_starts_before)
  524. if event_ends_before:
  525. q = q.filter(
  526. TimedBelief.event_start
  527. <= pd.Timestamp(event_ends_before) - self.event_resolution
  528. )
  529. if source_types:
  530. q = q.filter(DataSource.type.in_(source_types))
  531. if exclude_source_types:
  532. q = q.filter(DataSource.type.not_in(exclude_source_types))
  533. return db.session.scalars(q).all()
  534. class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
  535. """A timed belief holds a precisely timed record of a belief about an event.
  536. It also records the source of the belief, and the sensor that the event pertains to.
  537. """
  538. @declared_attr
  539. def source_id(cls):
  540. return db.Column(db.Integer, db.ForeignKey("data_source.id"), primary_key=True)
  541. sensor = db.relationship(
  542. "Sensor",
  543. backref=db.backref(
  544. "beliefs",
  545. lazy=True,
  546. cascade="merge", # no save-update (i.e. don't auto-save time series data to session upon updating sensor)
  547. ),
  548. )
  549. source = db.relationship(
  550. "DataSource",
  551. backref=db.backref(
  552. "beliefs",
  553. lazy=True,
  554. cascade="merge", # no save-update (i.e. don't auto-save time series data to session upon updating source)
  555. ),
  556. )
  557. def __init__(
  558. self,
  559. sensor: tb.DBSensor,
  560. source: tb.DBBeliefSource,
  561. **kwargs,
  562. ):
  563. # get a Sensor instance attached to the database session (input sensor is detached)
  564. # check out Issue #683 for more details
  565. inspection_obj = inspect(sensor, raiseerr=False)
  566. if (
  567. inspection_obj and inspection_obj.detached
  568. ): # fetch Sensor only when it is detached
  569. sensor = db.session.get(Sensor, sensor.id)
  570. tb.TimedBeliefDBMixin.__init__(self, sensor, source, **kwargs)
  571. tb_utils.remove_class_init_kwargs(tb.TimedBeliefDBMixin, kwargs)
  572. db.Model.__init__(self, **kwargs)
  573. @classmethod
  574. def search(
  575. cls,
  576. sensors: Sensor | int | str | list[Sensor | int | str],
  577. sensor: Sensor = None, # deprecated
  578. event_starts_after: datetime_type | None = None,
  579. event_ends_before: datetime_type | None = None,
  580. beliefs_after: datetime_type | None = None,
  581. beliefs_before: datetime_type | None = None,
  582. horizons_at_least: timedelta | None = None,
  583. horizons_at_most: timedelta | None = None,
  584. source: (
  585. DataSource | list[DataSource] | int | list[int] | str | list[str] | None
  586. ) = None,
  587. user_source_ids: int | list[int] | None = None,
  588. source_types: list[str] | None = None,
  589. exclude_source_types: list[str] | None = None,
  590. use_latest_version_per_event: bool = True,
  591. most_recent_beliefs_only: bool = True,
  592. most_recent_events_only: bool = False,
  593. most_recent_only: bool = False,
  594. one_deterministic_belief_per_event: bool = False,
  595. one_deterministic_belief_per_event_per_source: bool = False,
  596. resolution: str | timedelta = None,
  597. sum_multiple: bool = True,
  598. ) -> tb.BeliefsDataFrame | dict[str, tb.BeliefsDataFrame]:
  599. """Search all beliefs about events for the given sensors.
  600. If you don't set any filters, you get the most recent beliefs about all events.
  601. :param sensors: search only these sensors, identified by their instance or id (both unique) or name (non-unique)
  602. :param event_starts_after: only return beliefs about events that start after this datetime (inclusive)
  603. :param event_ends_before: only return beliefs about events that end before this datetime (inclusive)
  604. :param beliefs_after: only return beliefs formed after this datetime (inclusive)
  605. :param beliefs_before: only return beliefs formed before this datetime (inclusive)
  606. :param horizons_at_least: only return beliefs with a belief horizon equal or greater than this timedelta (for example, use timedelta(0) to get ante knowledge time beliefs)
  607. :param horizons_at_most: only return beliefs with a belief horizon equal or less than this timedelta (for example, use timedelta(0) to get post knowledge time beliefs)
  608. :param source: search only beliefs by this source (pass the DataSource, or its name or id) or list of sources
  609. :param user_source_ids: Optional list of user source ids to query only specific user sources
  610. :param source_types: Optional list of source type names to query only specific source types *
  611. :param exclude_source_types: Optional list of source type names to exclude specific source types *
  612. :param use_latest_version_per_event: only return the belief from the latest version of a source, for each event
  613. :param most_recent_beliefs_only: only return the most recent beliefs for each event from each source (minimum belief horizon). Defaults to True.
  614. :param most_recent_events_only: only return (post knowledge time) beliefs for the most recent event (maximum event start)
  615. :param most_recent_only: only return a single belief, the most recent from the most recent event. Fastest method if you only need one.
  616. :param one_deterministic_belief_per_event: only return a single value per event (no probabilistic distribution and only 1 source)
  617. :param one_deterministic_belief_per_event_per_source: only return a single value per event per source (no probabilistic distribution)
  618. :param resolution: Optional timedelta or pandas freqstr used to resample the results **
  619. :param sum_multiple: if True, sum over multiple sensors; otherwise, return a dictionary with sensors as key, each holding a BeliefsDataFrame as its value
  620. * If user_source_ids is specified, the "user" source type is automatically included (and not excluded).
  621. Somewhat redundant, though still allowed, is to set both source_types and exclude_source_types.
  622. ** Note that:
  623. - timely-beliefs converts string resolutions to datetime.timedelta objects (see https://github.com/SeitaBV/timely-beliefs/issues/13).
  624. - for sensors recording non-instantaneous data: updates both the event frequency and the event resolution
  625. - for sensors recording instantaneous data: updates only the event frequency (and event resolution remains 0)
  626. """
  627. # todo: deprecate the 'sensor' argument in favor of 'sensors' (announced v0.8.0)
  628. sensors = tb_utils.replace_deprecated_argument(
  629. "sensor",
  630. sensor,
  631. "sensors",
  632. sensors,
  633. )
  634. # convert to list
  635. sensors = [sensors] if not isinstance(sensors, list) else sensors
  636. # convert from sensor names to sensors
  637. sensor_names = [s for s in sensors if isinstance(s, str)]
  638. if sensor_names:
  639. sensors = [s for s in sensors if not isinstance(s, str)]
  640. sensors_from_names = db.session.scalars(
  641. select(Sensor).filter(Sensor.name.in_(sensor_names))
  642. ).all()
  643. sensors.extend(sensors_from_names)
  644. parsed_sources = parse_source_arg(source)
  645. source_criteria = get_source_criteria(
  646. cls, user_source_ids, source_types, exclude_source_types
  647. )
  648. custom_join_targets = [] if parsed_sources else [DataSource]
  649. bdf_dict = {}
  650. for sensor in sensors:
  651. bdf = cls.search_session(
  652. session=db.session,
  653. sensor=sensor,
  654. # Workaround (1st half) for https://github.com/FlexMeasures/flexmeasures/issues/484
  655. event_ends_after=event_starts_after,
  656. event_starts_before=event_ends_before,
  657. beliefs_after=beliefs_after,
  658. beliefs_before=beliefs_before,
  659. horizons_at_least=horizons_at_least,
  660. horizons_at_most=horizons_at_most,
  661. source=parsed_sources,
  662. most_recent_beliefs_only=most_recent_beliefs_only,
  663. most_recent_events_only=most_recent_events_only,
  664. most_recent_only=most_recent_only,
  665. custom_filter_criteria=source_criteria,
  666. custom_join_targets=custom_join_targets,
  667. )
  668. if use_latest_version_per_event:
  669. bdf = keep_latest_version(
  670. bdf=bdf,
  671. one_deterministic_belief_per_event=one_deterministic_belief_per_event,
  672. )
  673. if one_deterministic_belief_per_event:
  674. if (
  675. bdf.lineage.number_of_sources <= 1
  676. and bdf.lineage.probabilistic_depth == 1
  677. ):
  678. # Fast track, no need to loop over beliefs
  679. pass
  680. else:
  681. # First make deterministic
  682. bdf = bdf.for_each_belief(get_median_belief)
  683. # Then sort each event by most recent source version and most recent belief_time
  684. bdf = bdf.sort_values(
  685. by=["event_start", "source", "belief_time"],
  686. ascending=[True, False, False],
  687. key=lambda col: (
  688. col.map(
  689. lambda s: Version(s.version if s.version else "0.0.0")
  690. )
  691. if col.name == "source"
  692. else col
  693. ),
  694. )
  695. # Finally, take the first belief for each event, thus preference most recent belief_time first, latest version second
  696. bdf = bdf.groupby(level=["event_start"], group_keys=False).apply(
  697. lambda x: x.head(1)
  698. )
  699. elif one_deterministic_belief_per_event_per_source:
  700. if len(bdf) == 0 or bdf.lineage.probabilistic_depth == 1:
  701. # Fast track, no need to loop over beliefs
  702. pass
  703. else:
  704. bdf = bdf.for_each_belief(get_median_belief)
  705. # NB resampling will be triggered if resolutions are not an exact match (also in case of str vs timedelta)
  706. if resolution is not None and resolution != bdf.event_resolution:
  707. bdf = bdf.resample_events(
  708. resolution, keep_only_most_recent_belief=most_recent_beliefs_only
  709. )
  710. # Workaround (2nd half) for https://github.com/FlexMeasures/flexmeasures/issues/484
  711. bdf = bdf[bdf.event_starts >= event_starts_after]
  712. bdf = bdf[bdf.event_ends <= event_ends_before]
  713. bdf_dict[bdf.sensor] = bdf
  714. if sum_multiple:
  715. return aggregate_values(bdf_dict)
  716. else:
  717. return bdf_dict
  718. @classmethod
  719. def add(
  720. cls,
  721. bdf: tb.BeliefsDataFrame,
  722. expunge_session: bool = False,
  723. allow_overwrite: bool = False,
  724. bulk_save_objects: bool = False,
  725. commit_transaction: bool = False,
  726. ):
  727. """Add a BeliefsDataFrame as timed beliefs in the database.
  728. :param bdf: the BeliefsDataFrame to be persisted
  729. :param expunge_session: if True, all non-flushed instances are removed from the session before adding beliefs.
  730. Expunging can resolve problems you might encounter with states of objects in your session.
  731. When using this option, you might want to flush newly-created objects which are not beliefs
  732. (e.g. a sensor or data source object).
  733. :param allow_overwrite: if True, new objects are merged
  734. if False, objects are added to the session or bulk saved
  735. :param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(),
  736. which is quite fast but has several caveats, see:
  737. https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats
  738. if False, objects are added to the session with session.add_all()
  739. :param commit_transaction: if True, the session is committed
  740. if False, you can still add other data to the session
  741. and commit it all within an atomic transaction
  742. """
  743. return cls.add_to_session(
  744. session=db.session,
  745. beliefs_data_frame=bdf,
  746. expunge_session=expunge_session,
  747. allow_overwrite=allow_overwrite,
  748. bulk_save_objects=bulk_save_objects,
  749. commit_transaction=commit_transaction,
  750. )
  751. def __repr__(self) -> str:
  752. """timely-beliefs representation of timed beliefs."""
  753. return tb.TimedBelief.__repr__(self)