123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- from __future__ import annotations
- import pytest
- from flexmeasures.data.models.reporting import Reporter
- from flexmeasures.data.models.data_sources import keep_latest_version, DataSource
- from datetime import datetime
- from pytz import UTC
- import numpy as np
- import timely_beliefs as tb
- def test_get_reporter_from_source(db, app, test_reporter, add_nearby_weather_sensors):
- reporter = test_reporter.data_generator
- reporter_sensor = add_nearby_weather_sensors.get("farther_temperature")
- assert isinstance(reporter, Reporter)
- assert reporter.__class__.__name__ == "TestReporter"
- res = reporter.compute(
- input=[{"sensor": reporter_sensor}],
- output=[{"sensor": reporter_sensor}],
- start=datetime(2023, 1, 1, tzinfo=UTC),
- end=datetime(2023, 1, 2, tzinfo=UTC),
- )[0]["data"]
- assert res.lineage.sources[0] == reporter.data_source
- with pytest.raises(AttributeError):
- reporter.compute(
- input=[{"sensor": reporter_sensor}],
- output=[{"sensor": reporter_sensor}],
- start=datetime(2023, 1, 1, tzinfo=UTC),
- end="not a date",
- )
- def test_data_source(db, app, test_reporter):
- # get TestReporter class from the data_generators registry
- TestReporter = app.data_generators["reporter"].get("TestReporter")
- reporter1 = TestReporter(config={"a": "1"})
- db.session.add(reporter1.data_source)
- reporter2 = TestReporter(config={"a": "1"})
- # reporter1 and reporter2 have the same data_source because they share the same config
- assert reporter1.data_source == reporter2.data_source
- assert reporter1.data_source.attributes.get("data_generator").get(
- "config"
- ) == reporter2.data_source.attributes.get("data_generator").get("config")
- reporter3 = TestReporter(config={"a": "2"})
- # reporter3 and reporter2 have different data sources because they have different config values
- assert reporter3.data_source != reporter2.data_source
- assert reporter3.data_source.attributes.get("data_generator").get(
- "config"
- ) != reporter2.data_source.attributes.get("data_generator").get("config")
- # recreate reporter3 from its data source
- reporter4 = reporter3.data_source.data_generator
- # check that reporter3 and reporter4 share the same config values
- assert reporter4._config == reporter3._config
- def test_data_generator_save_config(db, app, test_reporter, add_nearby_weather_sensors):
- TestReporter = app.data_generators["reporter"].get("TestReporter")
- reporter_sensor = add_nearby_weather_sensors.get("farther_temperature")
- reporter = TestReporter(config={"a": "1"})
- res = reporter.compute(
- input=[{"sensor": reporter_sensor}],
- output=[{"sensor": reporter_sensor}],
- start=datetime(2023, 1, 1, tzinfo=UTC),
- end=datetime(2023, 1, 2, tzinfo=UTC),
- )[0]["data"]
- assert res.lineage.sources[0].attributes.get("data_generator").get("config") == {
- "a": "1"
- }
- reporter = TestReporter(config={"a": "1"}, save_config=False)
- res = reporter.compute(
- input=[{"sensor": reporter_sensor}],
- output=[{"sensor": reporter_sensor}],
- start=datetime(2023, 1, 1, tzinfo=UTC),
- end=datetime(2023, 1, 2, tzinfo=UTC),
- )[0]["data"]
- # check that the data_generator is not saving the config in the data_source attributes
- assert res.lineage.sources[0].attributes.get("data_generator") == dict()
- def test_data_generator_save_parameters(
- db, app, test_reporter, add_nearby_weather_sensors
- ):
- TestReporter = app.data_generators["reporter"].get("TestReporter")
- reporter_sensor = add_nearby_weather_sensors.get("farther_temperature")
- reporter = TestReporter(config={"a": "1"}, save_parameters=True)
- parameters = {
- "input": [{"sensor": reporter_sensor.id}],
- "output": [{"sensor": reporter_sensor.id}],
- "start": "2023-01-01T00:00:00+00:00",
- "end": "2023-01-02T00:00:00+00:00",
- "b": "test",
- }
- parameters_without_start_end = {
- "input": [{"sensor": reporter_sensor.id}],
- "output": [{"sensor": reporter_sensor.id}],
- "b": "test",
- }
- res = reporter.compute(parameters=parameters)[0]["data"]
- assert res.lineage.sources[0].attributes.get("data_generator").get("config") == {
- "a": "1"
- }
- assert (
- res.lineage.sources[0].attributes.get("data_generator").get("parameters")
- == parameters_without_start_end
- )
- dg2 = reporter.data_source.data_generator
- parameters_2 = {
- "start": "2023-01-01T10:00:00+00:00",
- "end": "2023-01-02T00:00:00+00:00",
- "b": "test2",
- }
- res = dg2.compute(parameters=parameters_2)[0]["data"]
- # check that compute gets data stored in the DB (i.e. `input`/`output`) and updated data
- # from the method call (e.g. field `b``)
- assert dg2._parameters["b"] == parameters_2["b"]
- assert dg2._parameters["start"].isoformat() == parameters_2["start"]
- def test_keep_last_version():
- s1 = DataSource(name="s1", model="model 1", type="forecaster", version="0.1.0")
- s2 = DataSource(name="s1", model="model 1", type="forecaster")
- s3 = DataSource(name="s1", model="model 2", type="forecaster")
- s4 = DataSource(name="s1", model="model 2", type="scheduler")
- def create_dummy_frame(sources: list[DataSource]) -> tb.BeliefsDataFrame:
- sensor = tb.Sensor("A")
- beliefs = [
- tb.TimedBelief(
- sensor=sensor,
- event_start=datetime(2023, 1, 1, tzinfo=UTC),
- belief_time=datetime(2023, 1, 1, tzinfo=UTC),
- event_value=1,
- source=s,
- )
- for s in sources
- ]
- bdf = tb.BeliefsDataFrame(beliefs)
- bdf["source.name"] = (
- bdf.index.get_level_values("source").map(lambda x: x.name).values
- )
- bdf["source.model"] = (
- bdf.index.get_level_values("source").map(lambda x: x.model).values
- )
- bdf["source.type"] = (
- bdf.index.get_level_values("source").map(lambda x: x.type).values
- )
- bdf["source.version"] = (
- bdf.index.get_level_values("source").map(lambda x: x.version).values
- )
- return bdf
- # the data source with no version is assumed to have version 0.0.0
- bdf = create_dummy_frame([s1, s2])
- np.testing.assert_array_equal(keep_latest_version(bdf).sources, [s1])
- # sources with different models are preserved
- bdf = create_dummy_frame([s1, s2, s3])
- np.testing.assert_array_equal(keep_latest_version(bdf).sources, [s1, s3])
- # two sources with the same model but different types
- bdf = create_dummy_frame([s3, s4])
- np.testing.assert_array_equal(keep_latest_version(bdf).sources, [s3, s4])
- # repeated source
- bdf = create_dummy_frame([s1, s1])
- np.testing.assert_array_equal(keep_latest_version(bdf).sources, [s1])
|