sensors.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. from __future__ import annotations
  2. import json
  3. import hashlib
  4. from datetime import datetime, timedelta
  5. from flask import current_app
  6. from functools import lru_cache
  7. from isodate import duration_isoformat
  8. import time
  9. from timely_beliefs import BeliefsDataFrame
  10. import pandas as pd
  11. from humanize.time import precisedelta
  12. from humanize import naturaldelta
  13. from flexmeasures.data.models.time_series import TimedBelief
  14. import sqlalchemy as sa
  15. from flexmeasures.data import db
  16. from flexmeasures import Sensor, Account, Asset
  17. from flexmeasures.data.models.data_sources import DataSource
  18. from flexmeasures.data.models.generic_assets import GenericAsset
  19. from flexmeasures.data.schemas.reporting import StatusSchema
  20. from flexmeasures.utils.time_utils import server_now
  21. def get_sensors(
  22. account: Account | list[Account] | None,
  23. include_public_assets: bool = False,
  24. sensor_id_allowlist: list[int] | None = None,
  25. sensor_name_allowlist: list[str] | None = None,
  26. ) -> list[Sensor]:
  27. """Return a list of Sensor objects that belong to the given account, and/or public sensors.
  28. :param account: select only sensors from this account (or list of accounts)
  29. :param include_public_assets: if True, include sensors that belong to a public asset
  30. :param sensor_id_allowlist: optionally, allow only sensors whose id is in this list
  31. :param sensor_name_allowlist: optionally, allow only sensors whose name is in this list
  32. """
  33. sensor_query = sa.select(Sensor)
  34. if isinstance(account, list):
  35. accounts = account
  36. else:
  37. accounts: list = [account] if account else []
  38. account_ids: list = [acc.id for acc in accounts]
  39. sensor_query = sensor_query.join(
  40. GenericAsset, GenericAsset.id == Sensor.generic_asset_id
  41. ).filter(Sensor.generic_asset_id == GenericAsset.id)
  42. if include_public_assets:
  43. sensor_query = sensor_query.filter(
  44. sa.or_(
  45. GenericAsset.account_id.in_(account_ids),
  46. GenericAsset.account_id.is_(None),
  47. )
  48. )
  49. else:
  50. sensor_query = sensor_query.filter(GenericAsset.account_id.in_(account_ids))
  51. if sensor_id_allowlist:
  52. sensor_query = sensor_query.filter(Sensor.id.in_(sensor_id_allowlist))
  53. if sensor_name_allowlist:
  54. sensor_query = sensor_query.filter(Sensor.name.in_(sensor_name_allowlist))
  55. return db.session.scalars(sensor_query).all()
  56. def _get_sensor_bdfs_by_source_type(
  57. sensor: Sensor, staleness_search: dict
  58. ) -> dict[str, BeliefsDataFrame] | None:
  59. """Get latest event, split by source type for a given sensor with given search parameters.
  60. For now we use 'demo script', 'user', 'forecaster', 'scheduler' and 'reporter' source types
  61. """
  62. bdfs_by_source = dict()
  63. for source_type in ("demo script", "user", "forecaster", "scheduler", "reporter"):
  64. bdf = TimedBelief.search(
  65. sensors=sensor,
  66. most_recent_events_only=True,
  67. source_types=[source_type],
  68. **staleness_search,
  69. )
  70. if not bdf.empty:
  71. bdfs_by_source[source_type] = bdf
  72. return None if not bdfs_by_source else bdfs_by_source
  73. def get_staleness_start_times(
  74. sensor: Sensor, staleness_search: dict, now: datetime
  75. ) -> dict[str, timedelta] | None:
  76. """Get staleness start times for a given sensor by source.
  77. Also add whether there has any relevant data (for forecasters and schedulers this is future data).
  78. For scheduler and forecaster sources staleness start is latest event start time.
  79. For other sources staleness start is the knowledge time of the sensor's most recent event.
  80. This knowledge time represents when you could have known about the event
  81. (specifically, when you could have formed an ex-post belief about it).
  82. """
  83. staleness_bdfs = _get_sensor_bdfs_by_source_type(
  84. sensor=sensor, staleness_search=staleness_search
  85. )
  86. if staleness_bdfs is None:
  87. return None
  88. start_times = dict()
  89. for source_type, bdf in staleness_bdfs.items():
  90. time_column = "knowledge_times"
  91. source_type = str(source_type)
  92. has_relevant_data = True
  93. if source_type in ("scheduler", "forecaster"):
  94. # filter to get only future events
  95. bdf_filtered = bdf[bdf.event_starts > now]
  96. time_column = "event_starts"
  97. if bdf_filtered.empty:
  98. has_relevant_data = False
  99. bdf_filtered = bdf
  100. bdf = bdf_filtered
  101. start_times[source_type] = (
  102. has_relevant_data,
  103. getattr(bdf, time_column)[-1] if not bdf.empty else None,
  104. )
  105. return start_times
  106. def get_stalenesses(
  107. sensor: Sensor, staleness_search: dict, now: datetime
  108. ) -> dict[str, timedelta] | None:
  109. """Get the staleness of the sensor split by source.
  110. The staleness is defined relative to the knowledge time of the most recent event, rather than to its belief time.
  111. Basically, that means that we don't really care when the data arrived,
  112. as long as the available data is about what we should be able to know by now.
  113. :param sensor: The sensor to compute the staleness for.
  114. :param staleness_search: Deserialized keyword arguments to `TimedBelief.search`.
  115. :param now: Datetime representing now, used both to mask future beliefs,
  116. and to measures staleness against.
  117. """
  118. # Mask beliefs before now
  119. staleness_search = staleness_search.copy() # no inplace operations
  120. staleness_search["beliefs_before"] = min(
  121. now, staleness_search.get("beliefs_before", now)
  122. )
  123. staleness_start_times = get_staleness_start_times(
  124. sensor=sensor, staleness_search=staleness_search, now=now
  125. )
  126. if staleness_start_times is None:
  127. return None
  128. stalenesses = dict()
  129. for source_type, (has_relevant_data, start_time) in staleness_start_times.items():
  130. stalenesses[str(source_type)] = (
  131. has_relevant_data,
  132. None if start_time is None else now - start_time,
  133. )
  134. return stalenesses
  135. def get_status_specs(sensor: Sensor) -> dict:
  136. """Get status specs from a given sensor."""
  137. # Check for explicitly defined status specs
  138. status_specs = sensor.attributes.get("status_specs", dict())
  139. if status_specs:
  140. return status_specs
  141. status_specs["staleness_search"] = {}
  142. # Consider forecast or schedule data stale if it is less than 12 hours in the future
  143. status_specs["max_future_staleness"] = "-PT12H"
  144. # Default to status specs for economical sensors with daily updates
  145. if sensor.knowledge_horizon_fnc == "x_days_ago_at_y_oclock":
  146. status_specs["max_staleness"] = "P1D"
  147. status_specs["staleness_search"] = {}
  148. else:
  149. # Default to status specs indicating staleness after knowledge time + 2 sensor resolutions
  150. status_specs["staleness_search"] = {}
  151. status_specs["max_staleness"] = duration_isoformat(sensor.event_resolution * 2)
  152. return status_specs
  153. def get_statuses(
  154. sensor: Sensor,
  155. now: datetime,
  156. status_specs: dict | None = None,
  157. ) -> list[dict]:
  158. """Get the status of the sensor by source type.
  159. Main part of result here is a stale value, which is True if the sensor is stale, False otherwise.
  160. Other values are just context information for the stale value.
  161. """
  162. if status_specs is None:
  163. status_specs = get_status_specs(sensor=sensor)
  164. status_specs = StatusSchema().load(status_specs)
  165. max_staleness = status_specs.pop("max_staleness")
  166. max_future_staleness = status_specs.pop("max_future_staleness")
  167. staleness_search = status_specs.pop("staleness_search")
  168. stalenesses = get_stalenesses(
  169. sensor=sensor,
  170. staleness_search=staleness_search,
  171. now=now,
  172. )
  173. statuses = list()
  174. for source_type, (has_relevant_data, staleness) in (
  175. stalenesses or {None: (True, None)}
  176. ).items():
  177. if staleness is None or not has_relevant_data:
  178. staleness_since = now - staleness if not has_relevant_data else None
  179. stale = True
  180. reason = (
  181. "no data recorded"
  182. if staleness is None
  183. else "Found no future data which this source should have"
  184. )
  185. staleness = None
  186. else:
  187. max_source_staleness = (
  188. max_staleness if staleness > timedelta(0) else max_future_staleness
  189. )
  190. staleness_since = now - staleness
  191. stale = staleness > max_source_staleness
  192. timeline = "old" if staleness > timedelta(0) else "in the future"
  193. reason_part = ""
  194. if staleness > timedelta(0):
  195. reason_part = (
  196. "which is not more" if not stale else "but should not be more"
  197. )
  198. else:
  199. reason_part = "which is not less" if not stale else "but should be more"
  200. staleness = staleness if staleness > timedelta(0) else -staleness
  201. reason = f"most recent data is {precisedelta(staleness)} {timeline}, {reason_part} than {precisedelta(max_source_staleness)} {timeline}"
  202. statuses.append(
  203. dict(
  204. staleness=staleness,
  205. stale=stale,
  206. staleness_since=staleness_since,
  207. reason=reason,
  208. source_type=source_type,
  209. )
  210. )
  211. return statuses
  212. def _get_sensor_asset_relation(
  213. asset: Asset,
  214. sensor: Sensor,
  215. inflexible_device_sensors: list[Sensor],
  216. context_sensors: dict[str, Sensor],
  217. ) -> str:
  218. """Get the relation of a sensor to an asset."""
  219. relations = list()
  220. if sensor.generic_asset_id == asset.id:
  221. relations.append("sensor belongs to this asset")
  222. inflexible_device_sensors_ids = {sensor.id for sensor in inflexible_device_sensors}
  223. if sensor.id in inflexible_device_sensors_ids:
  224. relations.append("flex context (inflexible device)")
  225. for field, ctxt_sensor in context_sensors.items():
  226. if sensor.id == ctxt_sensor.id:
  227. relations.append(f"flex context ({field})")
  228. return ";".join(relations)
  229. def get_asset_sensors_metadata(
  230. asset: Asset,
  231. now: datetime = None,
  232. ) -> list[dict]:
  233. """
  234. Get the metadata of sensors for a given asset and its children.
  235. :param asset: Asset to get the sensors for.
  236. :param now: Datetime representing now, used to get the status of the sensors.
  237. :return: A list of dictionaries, each representing a sensor's metadata.
  238. """
  239. if not now:
  240. now = server_now()
  241. sensors = []
  242. sensor_ids = set()
  243. inflexible_device_sensors = asset.get_inflexible_device_sensors()
  244. context_sensors = {
  245. field: Sensor.query.get(asset.flex_context[field]["sensor"])
  246. for field in asset.flex_context
  247. if isinstance(asset.flex_context[field], dict)
  248. and field != "inflexible-device-sensors"
  249. }
  250. # Get sensors to show using the validate_sensors_to_show method
  251. sensors_to_show = []
  252. validated_asset_sensors = asset.validate_sensors_to_show(
  253. suggest_default_sensors=False
  254. )
  255. sensor_groups = [
  256. sensor["sensors"] for sensor in validated_asset_sensors if sensor is not None
  257. ]
  258. merged_sensor_groups = sum(sensor_groups, [])
  259. sensors_to_show.extend(merged_sensor_groups)
  260. sensors_list = [
  261. *inflexible_device_sensors,
  262. *context_sensors.values(),
  263. *sensors_to_show,
  264. ]
  265. for sensor in sensors_list:
  266. if sensor is None or sensor.id in sensor_ids:
  267. continue
  268. sensor_status = {}
  269. sensor_status["id"] = sensor.id
  270. sensor_status["name"] = sensor.name
  271. sensor_status["asset_name"] = sensor.generic_asset.name
  272. sensor_ids.add(sensor.id)
  273. sensors.append(sensor_status)
  274. return sensors
  275. def serialize_sensor_status_data(
  276. sensor: Sensor,
  277. ) -> list[dict]:
  278. """
  279. Serialize the status of a sensor belonging to an asset.
  280. :param sensor: Sensor to get the status of
  281. :return: A list of dictionaries, each representing the statuses of the sensor - one status per data source type that stored data on that sensor
  282. """
  283. asset = sensor.generic_asset
  284. sensor_statuses = get_statuses(sensor=sensor, now=server_now())
  285. inflexible_device_sensors = asset.get_inflexible_device_sensors()
  286. context_sensors = {
  287. field: Sensor.query.get(asset.flex_context[field]["sensor"])
  288. for field in asset.flex_context
  289. if isinstance(asset.flex_context[field], dict)
  290. and field != "inflexible-device-sensors"
  291. }
  292. sensors = []
  293. for sensor_status in sensor_statuses:
  294. sensor_status["id"] = sensor.id
  295. sensor_status["name"] = sensor.name
  296. sensor_status["resolution"] = naturaldelta(sensor.event_resolution)
  297. sensor_status["staleness"] = (
  298. naturaldelta(sensor_status["staleness"])
  299. if sensor_status["staleness"] is not None
  300. else None
  301. )
  302. sensor_status["staleness_since"] = (
  303. naturaldelta(sensor_status["staleness_since"])
  304. if sensor_status["staleness_since"] is not None
  305. else None
  306. )
  307. sensor_status["asset_name"] = asset.name
  308. sensor_status["relation"] = _get_sensor_asset_relation(
  309. asset, sensor, inflexible_device_sensors, context_sensors
  310. )
  311. sensors.append(sensor_status)
  312. return sensors
  313. def build_asset_jobs_data(
  314. asset: Asset,
  315. ) -> list[dict]:
  316. """Get all jobs data for an asset
  317. Returns a list of dictionaries, each containing the following keys:
  318. - job_id: id of a job
  319. - queue: job queue (scheduling or forecasting)
  320. - asset_or_sensor_type: type of an asset that is linked to the job (asset or sensor)
  321. - asset_id: id of sensor or asset
  322. - status: job status (e.g finished, failed, etc)
  323. - err: job error (equals to None when there was no error for a job)
  324. - enqueued_at: time when the job was enqueued
  325. - metadata_hash: hash of job metadata (internal field)
  326. """
  327. jobs = list()
  328. # try to get scheduling jobs for asset first (only scheduling jobs can be stored by asset id)
  329. jobs.append(
  330. (
  331. "scheduling",
  332. "asset",
  333. asset.id,
  334. asset.name,
  335. current_app.job_cache.get(asset.id, "scheduling", "asset"),
  336. )
  337. )
  338. for sensor in asset.sensors:
  339. jobs.append(
  340. (
  341. "scheduling",
  342. "sensor",
  343. sensor.id,
  344. sensor.name,
  345. current_app.job_cache.get(sensor.id, "scheduling", "sensor"),
  346. )
  347. )
  348. jobs.append(
  349. (
  350. "forecasting",
  351. "sensor",
  352. sensor.id,
  353. sensor.name,
  354. current_app.job_cache.get(sensor.id, "forecasting", "sensor"),
  355. )
  356. )
  357. jobs_data = list()
  358. # Building the actual return list - we also unpack lists of jobs, each to its own entry, and we add error info
  359. for queue, asset_or_sensor_type, entity_id, entity_name, jobs in jobs:
  360. for job in jobs:
  361. e = job.meta.get(
  362. "exception",
  363. Exception(
  364. "The job does not state why it failed. "
  365. "The worker may be missing an exception handler, "
  366. "or its exception handler is not storing the exception as job meta data."
  367. ),
  368. )
  369. job_err = (
  370. f"Scheduling job failed with {type(e).__name__}: {e}"
  371. if job.is_failed
  372. else None
  373. )
  374. metadata = json.dumps({**job.meta, "job_id": job.id}, default=str, indent=4)
  375. jobs_data.append(
  376. {
  377. "job_id": job.id,
  378. "metadata": metadata,
  379. "queue": queue,
  380. "asset_or_sensor_type": asset_or_sensor_type,
  381. "entity": f"{asset_or_sensor_type}: {entity_name} (Id: {entity_id})",
  382. "status": job.get_status(),
  383. "err": job_err,
  384. "enqueued_at": job.enqueued_at,
  385. "metadata_hash": hashlib.sha256(metadata.encode()).hexdigest(),
  386. }
  387. )
  388. return jobs_data
  389. @lru_cache()
  390. def _get_sensor_stats(sensor: Sensor, sort_keys: bool, ttl_hash=None) -> dict:
  391. # Subquery for filtered aggregates
  392. subquery_for_filtered_aggregates = (
  393. sa.select(
  394. TimedBelief.source_id,
  395. sa.func.max(TimedBelief.event_value).label("max_event_value"),
  396. sa.func.avg(TimedBelief.event_value).label("avg_event_value"),
  397. sa.func.sum(TimedBelief.event_value).label("sum_event_value"),
  398. sa.func.min(TimedBelief.event_value).label("min_event_value"),
  399. )
  400. .filter(TimedBelief.event_value != float("NaN"))
  401. .filter(TimedBelief.sensor_id == sensor.id)
  402. .group_by(TimedBelief.source_id)
  403. .subquery()
  404. )
  405. raw_stats = db.session.execute(
  406. sa.select(
  407. DataSource.name,
  408. sa.func.min(TimedBelief.event_start).label("min_event_start"),
  409. sa.func.max(TimedBelief.event_start).label("max_event_start"),
  410. sa.func.max(
  411. TimedBelief.event_start
  412. + sensor.event_resolution
  413. - TimedBelief.belief_horizon
  414. ).label("max_belief_time"),
  415. subquery_for_filtered_aggregates.c.min_event_value,
  416. subquery_for_filtered_aggregates.c.max_event_value,
  417. subquery_for_filtered_aggregates.c.avg_event_value,
  418. subquery_for_filtered_aggregates.c.sum_event_value,
  419. sa.func.count(TimedBelief.event_value).label("count_event_value"),
  420. )
  421. .select_from(TimedBelief)
  422. .join(DataSource, DataSource.id == TimedBelief.source_id)
  423. .join(
  424. subquery_for_filtered_aggregates,
  425. subquery_for_filtered_aggregates.c.source_id == TimedBelief.source_id,
  426. )
  427. .filter(TimedBelief.sensor_id == sensor.id)
  428. .group_by(
  429. DataSource.name,
  430. subquery_for_filtered_aggregates.c.min_event_value,
  431. subquery_for_filtered_aggregates.c.max_event_value,
  432. subquery_for_filtered_aggregates.c.avg_event_value,
  433. subquery_for_filtered_aggregates.c.sum_event_value,
  434. )
  435. ).fetchall()
  436. stats = dict()
  437. for row in raw_stats:
  438. (
  439. data_source,
  440. min_event_start,
  441. max_event_start,
  442. max_belief_time,
  443. min_value,
  444. max_value,
  445. mean_value,
  446. sum_values,
  447. count_values,
  448. ) = row
  449. first_event_start = (
  450. pd.Timestamp(min_event_start).tz_convert(sensor.timezone).isoformat()
  451. )
  452. last_event_end = (
  453. pd.Timestamp(max_event_start + sensor.event_resolution)
  454. .tz_convert(sensor.timezone)
  455. .isoformat()
  456. )
  457. last_belief_time = (
  458. pd.Timestamp(max_belief_time).tz_convert(sensor.timezone).isoformat()
  459. )
  460. stats[data_source] = {
  461. "First event start": first_event_start,
  462. "Last event end": last_event_end,
  463. "Last recorded": last_belief_time,
  464. "Min value": min_value,
  465. "Max value": max_value,
  466. "Mean value": mean_value,
  467. "Sum over values": sum_values,
  468. "Number of values": count_values,
  469. }
  470. if sort_keys is False:
  471. stats[data_source] = stats[data_source].items()
  472. return stats
  473. def _get_ttl_hash(seconds=120) -> int:
  474. """Returns the same value within "seconds" time period
  475. Is needed to make LRU cache a TTL one
  476. (lru_cache is used when call arguments are the same,
  477. here we ensure that call arguments are the same in "seconds" period of time).
  478. """
  479. return round(time.time() / seconds)
  480. def get_sensor_stats(sensor: Sensor, sort_keys: bool = True) -> dict:
  481. """Get stats for a sensor"""
  482. return _get_sensor_stats(sensor, sort_keys, ttl_hash=_get_ttl_hash())