aggregator.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. from __future__ import annotations
  2. from datetime import datetime, timedelta
  3. from typing import Any
  4. import pandas as pd
  5. from flexmeasures.data.models.reporting import Reporter
  6. from flexmeasures.data.models.time_series import Sensor
  7. from flexmeasures.data.schemas.reporting.aggregation import (
  8. AggregatorConfigSchema,
  9. AggregatorParametersSchema,
  10. )
  11. from flexmeasures.utils.time_utils import server_now
  12. class AggregatorReporter(Reporter):
  13. """This reporter applies an aggregation function to multiple sensors"""
  14. __version__ = "1"
  15. __author__ = "Seita"
  16. _config_schema = AggregatorConfigSchema()
  17. _parameters_schema = AggregatorParametersSchema()
  18. weights: dict
  19. method: str
  20. def _compute_report(
  21. self,
  22. start: datetime,
  23. end: datetime,
  24. input: list[dict[str, Any]],
  25. output: list[dict[str, Any]],
  26. resolution: timedelta | None = None,
  27. belief_time: datetime | None = None,
  28. ) -> list[dict[str, Any]]:
  29. """
  30. This method merges all the BeliefDataFrames into a single one, dropping
  31. all indexes but event_start, and applies an aggregation function over the
  32. columns.
  33. """
  34. method: str = self._config.get("method")
  35. weights: dict = self._config.get("weights", {})
  36. dataframes = []
  37. if belief_time is None:
  38. belief_time = server_now()
  39. for input_description in input:
  40. sensor: Sensor = input_description.pop("sensor")
  41. # if name is not in belief_search_config, using the Sensor id instead
  42. column_name = input_description.pop("name", f"sensor_{sensor.id}")
  43. source = input_description.pop(
  44. "source", input_description.pop("sources", None)
  45. )
  46. if source is not None and not isinstance(source, list):
  47. source = [source]
  48. df = sensor.search_beliefs(
  49. event_starts_after=start,
  50. event_ends_before=end,
  51. resolution=resolution,
  52. beliefs_before=belief_time,
  53. source=source,
  54. one_deterministic_belief_per_event=True,
  55. **input_description,
  56. )
  57. # Check for multi-sourced events (i.e. multiple sources for a single event)
  58. if len(df.lineage.events) != len(df):
  59. duplicate_events = df[
  60. df.index.get_level_values("event_start").duplicated()
  61. ]
  62. raise ValueError(
  63. f"{len(duplicate_events)} event(s) are duplicate. First duplicate: {duplicate_events[0]}. Consider using (more) source filters."
  64. )
  65. # Check for multiple sources within the entire frame (excluding different versions of the same source)
  66. unique_sources = df.lineage.sources
  67. properties = [
  68. "name",
  69. "type",
  70. "model",
  71. ] # properties to identify different versions of the same source
  72. if (
  73. len(unique_sources) > 1
  74. and not all(
  75. getattr(source, prop) == getattr(unique_sources[0], prop)
  76. for prop in properties
  77. for source in unique_sources
  78. )
  79. and (source is None or len(source) == 0)
  80. ):
  81. raise ValueError(
  82. "Missing attribute source or sources. The fields `source` or `sources` is required when having multiple sources within the time window."
  83. )
  84. df = df.droplevel([1, 2, 3])
  85. # apply weight
  86. if column_name in weights:
  87. df *= weights[column_name]
  88. dataframes.append(df)
  89. output_df = pd.concat(dataframes, axis=1)
  90. # apply aggregation method
  91. output_df = output_df.aggregate(method, axis=1)
  92. # convert BeliefsSeries into a BeliefsDataFrame
  93. output_df = output_df.to_frame("event_value")
  94. output_df["belief_time"] = belief_time
  95. output_df["cumulative_probability"] = 0.5
  96. output_df["source"] = self.data_source
  97. output_df.sensor = output[0]["sensor"]
  98. output_df.event_resolution = output[0]["sensor"].event_resolution
  99. output_df = output_df.set_index(
  100. ["belief_time", "source", "cumulative_probability"], append=True
  101. )
  102. return [
  103. {
  104. "name": "aggregate",
  105. "column": "event_value",
  106. "sensor": output[0]["sensor"],
  107. "data": output_df,
  108. }
  109. ]