123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- from __future__ import annotations
- from datetime import datetime, timedelta
- from typing import Any
- import pandas as pd
- from flexmeasures.data.models.reporting import Reporter
- from flexmeasures.data.models.time_series import Sensor
- from flexmeasures.data.schemas.reporting.aggregation import (
- AggregatorConfigSchema,
- AggregatorParametersSchema,
- )
- from flexmeasures.utils.time_utils import server_now
- class AggregatorReporter(Reporter):
- """This reporter applies an aggregation function to multiple sensors"""
- __version__ = "1"
- __author__ = "Seita"
- _config_schema = AggregatorConfigSchema()
- _parameters_schema = AggregatorParametersSchema()
- weights: dict
- method: str
- def _compute_report(
- self,
- start: datetime,
- end: datetime,
- input: list[dict[str, Any]],
- output: list[dict[str, Any]],
- resolution: timedelta | None = None,
- belief_time: datetime | None = None,
- ) -> list[dict[str, Any]]:
- """
- This method merges all the BeliefDataFrames into a single one, dropping
- all indexes but event_start, and applies an aggregation function over the
- columns.
- """
- method: str = self._config.get("method")
- weights: dict = self._config.get("weights", {})
- dataframes = []
- if belief_time is None:
- belief_time = server_now()
- for input_description in input:
- sensor: Sensor = input_description.pop("sensor")
- # if name is not in belief_search_config, using the Sensor id instead
- column_name = input_description.pop("name", f"sensor_{sensor.id}")
- source = input_description.pop(
- "source", input_description.pop("sources", None)
- )
- if source is not None and not isinstance(source, list):
- source = [source]
- df = sensor.search_beliefs(
- event_starts_after=start,
- event_ends_before=end,
- resolution=resolution,
- beliefs_before=belief_time,
- source=source,
- one_deterministic_belief_per_event=True,
- **input_description,
- )
- # Check for multi-sourced events (i.e. multiple sources for a single event)
- if len(df.lineage.events) != len(df):
- duplicate_events = df[
- df.index.get_level_values("event_start").duplicated()
- ]
- raise ValueError(
- f"{len(duplicate_events)} event(s) are duplicate. First duplicate: {duplicate_events[0]}. Consider using (more) source filters."
- )
- # Check for multiple sources within the entire frame (excluding different versions of the same source)
- unique_sources = df.lineage.sources
- properties = [
- "name",
- "type",
- "model",
- ] # properties to identify different versions of the same source
- if (
- len(unique_sources) > 1
- and not all(
- getattr(source, prop) == getattr(unique_sources[0], prop)
- for prop in properties
- for source in unique_sources
- )
- and (source is None or len(source) == 0)
- ):
- raise ValueError(
- "Missing attribute source or sources. The fields `source` or `sources` is required when having multiple sources within the time window."
- )
- df = df.droplevel([1, 2, 3])
- # apply weight
- if column_name in weights:
- df *= weights[column_name]
- dataframes.append(df)
- output_df = pd.concat(dataframes, axis=1)
- # apply aggregation method
- output_df = output_df.aggregate(method, axis=1)
- # convert BeliefsSeries into a BeliefsDataFrame
- output_df = output_df.to_frame("event_value")
- output_df["belief_time"] = belief_time
- output_df["cumulative_probability"] = 0.5
- output_df["source"] = self.data_source
- output_df.sensor = output[0]["sensor"]
- output_df.event_resolution = output[0]["sensor"].event_resolution
- output_df = output_df.set_index(
- ["belief_time", "source", "cumulative_probability"], append=True
- )
- return [
- {
- "name": "aggregate",
- "column": "event_value",
- "sensor": output[0]["sensor"],
- "data": output_df,
- }
- ]
|