123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- import pytest
- from datetime import datetime
- from pytz import utc
- from flexmeasures.data.models.reporting.pandas_reporter import PandasReporter
- def test_reporter(app, setup_dummy_data):
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- reporter_config = dict(
- required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
- required_output=[{"name": "df_merge"}],
- transformations=[
- dict(
- df_input="sensor_1",
- df_output="sensor_1_source_1",
- method="xs",
- args=["@source_1"],
- kwargs=dict(level=2),
- ),
- dict(
- df_input="sensor_2",
- df_output="sensor_2_source_1",
- method="xs",
- args=["@source_1"],
- kwargs=dict(level=2),
- ),
- dict(
- df_output="df_merge",
- df_input="sensor_1_source_1",
- method="merge",
- args=["@sensor_2_source_1"],
- kwargs=dict(on="event_start", suffixes=("_sensor1", "_sensor2")),
- ),
- dict(method="resample", args=["2h"]),
- dict(method="mean"),
- dict(method="sum", kwargs=dict(axis=1)),
- ],
- )
- reporter = PandasReporter(config=reporter_config)
- start = datetime(2023, 4, 10, tzinfo=utc)
- end = datetime(2023, 4, 10, 10, tzinfo=utc)
- input = [dict(name="sensor_1", sensor=s1), dict(name="sensor_2", sensor=s2)]
- output = [dict(name="df_merge", sensor=report_sensor)]
- report1 = reporter.compute(start=start, end=end, input=input, output=output)
- result = report1[0]["data"]
- assert len(result) == 5
- assert str(result.event_starts[0]) == "2023-04-10 00:00:00+00:00"
- assert (
- result.sensor == report_sensor
- ) # check that the output sensor is effectively assigned.
- data_source_name = app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE")
- data_source_type = "reporter"
- assert all(
- (source.name == data_source_name) and (source.type == data_source_type)
- for source in result.sources
- ) # check data source is assigned
- # check that calling compute with different parameters changes the result
- report2 = reporter.compute(
- start=datetime(2023, 4, 10, 3, tzinfo=utc), end=end, input=input, output=output
- )
- result2 = report2[0]["data"]
- assert len(result2) == 4
- assert str(result2.event_starts[0]) == "2023-04-10 02:00:00+00:00"
- def test_reporter_repeated(setup_dummy_data):
- """check that calling compute doesn't change the result"""
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- reporter_config = dict(
- required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
- required_output=[{"name": "df_merge"}],
- transformations=[
- dict(
- df_input="sensor_1",
- df_output="sensor_1_source_1",
- method="xs",
- args=["@source_1"],
- kwargs=dict(level=2),
- ),
- dict(
- df_input="sensor_2",
- df_output="sensor_2_source_1",
- method="xs",
- args=["@source_1"],
- kwargs=dict(level=2),
- ),
- dict(
- df_output="df_merge",
- df_input="sensor_1_source_1",
- method="merge",
- args=["@sensor_2_source_1"],
- kwargs=dict(on="event_start", suffixes=("_sensor1", "_sensor2")),
- ),
- dict(method="resample", args=["2h"]),
- dict(method="mean"),
- dict(method="sum", kwargs=dict(axis=1)),
- ],
- )
- parameters = dict(
- start="2023-04-10T00:00:00 00:00",
- end="2023-04-10T10:00:00 00:00",
- input=[
- dict(name="sensor_1", sensor=s1.id),
- dict(name="sensor_2", sensor=s2.id),
- ],
- output=[dict(name="df_merge", sensor=report_sensor.id)],
- )
- reporter = PandasReporter(config=reporter_config)
- report1 = reporter.compute(parameters=parameters)
- report2 = reporter.compute(parameters=parameters)
- assert all(report2[0]["data"].values == report1[0]["data"].values)
- def test_reporter_empty(setup_dummy_data):
- """check that calling compute with missing data returns an empty report"""
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- config = dict(
- required_input=[{"name": "sensor_1"}],
- required_output=[{"name": "sensor_1"}],
- transformations=[],
- )
- reporter = PandasReporter(config=config)
- # compute report on available data
- report = reporter.compute(
- start=datetime(2023, 4, 10, tzinfo=utc),
- end=datetime(2023, 4, 10, 10, tzinfo=utc),
- input=[dict(name="sensor_1", sensor=s1)],
- output=[dict(name="sensor_1", sensor=report_sensor)],
- )
- assert not report[0]["data"].empty
- # compute report on dates with no data available
- report = reporter.compute(
- sensor=report_sensor,
- start=datetime(2021, 4, 10, tzinfo=utc),
- end=datetime(2021, 4, 10, 10, tzinfo=utc),
- input=[dict(name="sensor_1", sensor=s1)],
- output=[dict(name="sensor_1", sensor=report_sensor)],
- )
- assert report[0]["data"].empty
- def test_pandas_reporter_unit_conversion(app, setup_dummy_data):
- """
- Check that the unit conversion feature can handle the following cases:
- - kW -> kW
- - kW -> MW
- - kW -> MWh
- - kW -> W -> kW
- """
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- reporter_config = dict(
- required_input=[
- {"name": "sensor_4"},
- {"name": "sensor_4_kw"},
- {"name": "sensor_4_mw", "unit": "MW"},
- {"name": "sensor_4_mwh", "unit": "MWh"},
- ],
- required_output=[
- {"name": "sensor_4_kw"},
- {"name": "sensor_4_mw"},
- {"name": "sensor_4_mwh"},
- # Assume that the internal operations that produce sensor_4_output_w have "W"
- {"name": "sensor_4_output_w", "unit": "W"},
- ],
- transformations=[
- {"df_input": "sensor_4", "method": "copy", "df_output": "sensor_4_output_w"}
- ],
- )
- reporter = PandasReporter(config=reporter_config)
- start = datetime(2023, 1, 1, tzinfo=utc)
- end = datetime(2023, 1, 2, tzinfo=utc)
- input = [
- dict(name="sensor_4", sensor=s4),
- dict(name="sensor_4_kw", sensor=s4),
- dict(name="sensor_4_mw", sensor=s4),
- dict(name="sensor_4_mwh", sensor=s4),
- ]
- output = [
- dict(name="sensor_4_kw", sensor=s4),
- dict(name="sensor_4_mw", sensor=s4),
- dict(name="sensor_4_mwh", sensor=s4),
- dict(name="sensor_4_output_w", sensor=s4),
- ]
- report = reporter.compute(start=start, end=end, input=input, output=output)
- result_kw = report[0]["data"]
- result_mw = report[1]["data"]
- result_mwh = report[2]["data"]
- result_output_w = report[3]["data"]
- # MW = kW / 1000
- assert (result_mw.event_value.values == result_kw.event_value.values / 1000).all()
- # MWh = MW * 0.25 (resolution = 15 min)
- assert (result_mwh.event_value.values == result_mw.event_value.values * 0.25).all()
- # Input is in kW; the operations transform the data to produce values in W and it transforms the values to the output sensor unit (kW).
- # In summary, Input = 1 kW -(copy the values)-> 1 W -> 0.001 kW
- assert (
- result_output_w.event_value.values == result_kw.event_value.values * 0.001
- ).all()
- @pytest.mark.parametrize("shortcut", [True, False])
- def test_pandas_reporter_valid_range(app, setup_dummy_data, shortcut):
- """
- Check that we can select a valid range of values, where values outside the range are dropped.
- If shortcut=True, we test a shorter approach (fewer transformations) using pd.eval.
- """
- s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
- range = [3, 7]
- if shortcut:
- transformations = [
- {
- "df_input": "any values",
- "method": "eval",
- "args": [f"event_value > {range[0]} & event_value < {range[1]}"],
- "df_output": "mask",
- },
- {
- "df_input": "any values",
- "method": "where",
- "args": ["@mask"],
- "df_output": "ranged values",
- },
- {
- # "df_input": "ranged values", # redundant: defaults to previous df_output
- "method": "dropna",
- # "df_output": "ranged values", # redundant: defaults to current df_input
- },
- ]
- else:
- transformations = [
- {
- "df_input": "any values",
- "method": "gt",
- "args": [range[0]],
- "df_output": "gt_value",
- },
- {
- "df_input": "any values",
- "method": "lt",
- "args": [range[1]],
- "df_output": "lt_value",
- },
- {
- "df_input": "any values",
- "method": "where",
- "args": ["@gt_value"],
- "df_output": "ranged values",
- },
- {
- # "df_input": "ranged values", # redundant: defaults to previous df_output
- "method": "where",
- "args": ["@lt_value"],
- # "df_output": "ranged values", # redundant: defaults to current df_input
- },
- {
- # "df_input": "ranged values", # redundant: defaults to previous df_output
- "method": "dropna",
- # "df_output": "ranged values", # redundant: defaults to current df_input
- },
- ]
- reporter_config = dict(
- required_input=[
- {"name": "any values"},
- ],
- required_output=[
- {"name": "ranged values"},
- ],
- transformations=transformations,
- )
- reporter = PandasReporter(config=reporter_config)
- start = datetime(2023, 4, 10, tzinfo=utc)
- end = datetime(2023, 4, 11, tzinfo=utc)
- input = [
- dict(name="any values", sensor=s1),
- ]
- output = [
- dict(name="ranged values", sensor=s1),
- ]
- report = reporter.compute(start=start, end=end, input=input, output=output)
- result = report[0]["data"]
- # Check that some values were originally outside the range
- original_values = s1.search_beliefs(
- event_starts_after=start,
- event_ends_before=end,
- )
- assert not (original_values.event_value.values > range[0]).all()
- assert not (original_values.event_value.values < range[-1]).all()
- # Check that all values are now inside the range
- assert (result.event_value.values > range[0]).all()
- assert (result.event_value.values < range[1]).all()
|