123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- import pytest
- from flexmeasures.data.models.reporting.aggregator import AggregatorReporter
- from flexmeasures.data.models.data_sources import DataSource
- from datetime import datetime
- from pytz import utc, timezone
- import pandas as pd
- @pytest.mark.parametrize(
- "aggregation_method, expected_value",
- [
- ("sum", 0),
- ("mean", 0),
- ("var", 2),
- ("std", 2**0.5),
- ("max", 1),
- ("min", -1),
- ("prod", -1),
- ("median", 0),
- ],
- )
- def test_aggregator(setup_dummy_data, aggregation_method, expected_value, db):
- """
- This test computes the aggregation of two sensors containing 24 entries
- with value 1 and -1, respectively, for sensors 1 and 2.
- Test cases:
- 1) sum: 0 = 1 + (-1)
- 2) mean: 0 = ((1) + (-1))/2
- 3) var: 2 = (1)^2 + (-1)^2
- 4) std: sqrt(2) = sqrt((1)^2 + (-1)^2)
- 5) max: 1 = max(1, -1)
- 6) min: -1 = min(1, -1)
- 7) prod: -1 = (1) * (-1)
- 8) median: even number of elements, mean of the most central elements, 0 = ((1) + (-1))/2
- """
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- agg_reporter = AggregatorReporter(method=aggregation_method)
- source_1 = db.session.get(DataSource, 1)
- source_2 = db.session.get(DataSource, 2)
- result = agg_reporter.compute(
- input=[dict(sensor=s1, source=source_1), dict(sensor=s2, source=source_2)],
- output=[dict(sensor=report_sensor)],
- start=datetime(2023, 5, 10, tzinfo=utc),
- end=datetime(2023, 5, 11, tzinfo=utc),
- )[0]["data"]
- # check that we got a result for 24 hours
- assert len(result) == 24
- # check that the value is equal to expected_value
- assert (result == expected_value).all().event_value
- @pytest.mark.parametrize(
- "weight_1, weight_2, expected_result",
- [(1, 1, 0), (1, -1, 2), (2, 0, 2), (0, 2, -2)],
- )
- def test_aggregator_reporter_weights(
- setup_dummy_data, weight_1, weight_2, expected_result, db
- ):
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- reporter_config = dict(method="sum", weights={"s1": weight_1, "sensor_2": weight_2})
- source_1 = db.session.get(DataSource, 1)
- source_2 = db.session.get(DataSource, 2)
- agg_reporter = AggregatorReporter(config=reporter_config)
- result = agg_reporter.compute(
- input=[
- dict(name="s1", sensor=s1, source=source_1),
- dict(sensor=s2, source=source_2),
- ],
- output=[dict(sensor=report_sensor)],
- start=datetime(2023, 5, 10, tzinfo=utc),
- end=datetime(2023, 5, 11, tzinfo=utc),
- )[0]["data"]
- # check that we got a result for 24 hours
- assert len(result) == 24
- # check that the value is equal to expected_value
- assert (result == expected_result).all().event_value
- def test_dst_transition(setup_dummy_data, db):
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- agg_reporter = AggregatorReporter()
- tz = timezone("Europe/Amsterdam")
- # transition from winter (CET) to summer (CEST)
- result = agg_reporter.compute(
- input=[dict(sensor=s3, source=db.session.get(DataSource, 1))],
- output=[dict(sensor=report_sensor)],
- start=tz.localize(datetime(2023, 3, 26)),
- end=tz.localize(datetime(2023, 3, 27)),
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert len(result) == 23
- # transition from summer (CEST) to winter (CET)
- result = agg_reporter.compute(
- input=[dict(sensor=s3, source=db.session.get(DataSource, 1))],
- output=[dict(sensor=report_sensor)],
- start=tz.localize(datetime(2023, 10, 29)),
- end=tz.localize(datetime(2023, 10, 30)),
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert len(result) == 25
- def test_resampling(setup_dummy_data, db):
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- agg_reporter = AggregatorReporter()
- tz = timezone("Europe/Amsterdam")
- # transition from winter (CET) to summer (CEST)
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 3, 27)),
- end=tz.localize(datetime(2023, 3, 28)),
- input=[dict(sensor=s3, source=db.session.get(DataSource, 1))],
- output=[dict(sensor=daily_report_sensor, source=db.session.get(DataSource, 1))],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- resolution=pd.Timedelta("1D"),
- )[0]["data"]
- assert result.event_starts[0] == pd.Timestamp(
- year=2023, month=3, day=27, tz="Europe/Amsterdam"
- )
- # transition from summer (CEST) to winter (CET)
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 10, 29)),
- end=tz.localize(datetime(2023, 10, 30)),
- input=[dict(sensor=s3, source=db.session.get(DataSource, 1))],
- output=[dict(sensor=daily_report_sensor, source=db.session.get(DataSource, 1))],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- resolution=pd.Timedelta("1D"),
- )[0]["data"]
- assert result.event_starts[0] == pd.Timestamp(
- year=2023, month=10, day=29, tz="Europe/Amsterdam"
- )
- def test_source_transition(setup_dummy_data, db):
- """The first 13 hours of the time window "belong" to Source 1 and are filled with 1.0.
- From 12:00 to 24:00, there are events belonging to Source 2 with value -1.
- We expect the reporter to use only the values defined in the `sources` array in the `input` field.
- In case of encountering more than one source per event, the first source defined in the sources
- array is prioritized.
- """
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- agg_reporter = AggregatorReporter()
- tz = timezone("UTC")
- ds1 = db.session.get(DataSource, 1)
- ds2 = db.session.get(DataSource, 2)
- # considering DataSource 1 and 2
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 4, 24)),
- end=tz.localize(datetime(2023, 4, 25)),
- input=[dict(sensor=s3, sources=[ds1, ds2])],
- output=[dict(sensor=report_sensor)],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert len(result) == 24
- assert (
- (result[:13] == 1).all().event_value
- ) # the data from the first source is used
- assert (result[13:] == -1).all().event_value
- # only considering DataSource 1
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 4, 24)),
- end=tz.localize(datetime(2023, 4, 25)),
- input=[dict(sensor=s3, sources=[ds1])],
- output=[dict(sensor=report_sensor)],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert len(result) == 13
- assert (result == 1).all().event_value
- # only considering DataSource 2
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 4, 24)),
- end=tz.localize(datetime(2023, 4, 25)),
- input=[dict(sensor=s3, sources=[ds2])],
- output=[dict(sensor=report_sensor)],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert len(result) == 12
- assert (result == -1).all().event_value
- # if no source is passed, the reporter should raise a ValueError
- # as there are events with different data sources in the report time period.
- # This is important, for instance, for sensors containing power and scheduled values
- # where we could get beliefs from both sources.
- with pytest.raises(ValueError):
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 4, 24)),
- end=tz.localize(datetime(2023, 4, 25)),
- input=[dict(sensor=s3)],
- output=[dict(sensor=report_sensor)],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- # The exception to the above is when a new version of the same source recorded a value,
- # in which case the latest version takes precedence. This happened in the last hour of the day.
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 4, 24, 18, 0)),
- end=tz.localize(datetime(2023, 4, 25)),
- input=[dict(sensor=s3)],
- output=[dict(sensor=report_sensor)],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert (result[:5] == -1).all().event_value # beliefs from the older version
- assert (result[5:] == 3).all().event_value # belief from the latest version
- # If we exclude source type "A" (source 1 is of that type) we should get the same result.
- same_result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 4, 24, 18, 0)),
- end=tz.localize(datetime(2023, 4, 25)),
- input=[dict(sensor=s3, exclude_source_types=["A"])],
- output=[dict(sensor=report_sensor)],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert (same_result == result).all().event_value
- # If we exclude source type "B" (both versions of source 2 are of that type) we should get an empty result
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 4, 24, 18, 0)),
- end=tz.localize(datetime(2023, 4, 25)),
- input=[dict(sensor=s3, exclude_source_types=["B"])],
- output=[dict(sensor=report_sensor)],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert result.empty
- # If we set use_latest_version_per_event=False, we should get both versions of source 2,
- # and one_deterministic_belief_per_event=True kicks in to give back the most recent version
- result = agg_reporter.compute(
- start=tz.localize(datetime(2023, 4, 24, 18, 0)),
- end=tz.localize(datetime(2023, 4, 25)),
- input=[dict(sensor=s3, use_latest_version_per_event=False)],
- output=[dict(sensor=report_sensor)],
- belief_time=tz.localize(datetime(2023, 12, 1)),
- )[0]["data"]
- assert len(result) == 6
- assert (result[:5] == -1).all().event_value # beliefs from the older version
- assert (result[5:] == 3).all().event_value # belief from the latest version
|