123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- import pytest
- from datetime import datetime, timedelta
- from pytz import utc
- import pandas as pd
- from flexmeasures.data.models.planning.utils import initialize_index
- from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
- from flexmeasures.data.models.data_sources import DataSource
- from flexmeasures.data.models.time_series import Sensor, TimedBelief
- @pytest.fixture(scope="module")
- def generic_report(db, app):
- report_asset_type = GenericAssetType(name="ReportAssetType")
- db.session.add(report_asset_type)
- generic_report = GenericAsset(
- name="GenericReport", generic_asset_type=report_asset_type
- )
- db.session.add(generic_report)
- return generic_report
- @pytest.fixture(scope="module")
- def profit_report(db, app, generic_report, add_market_prices, setup_sources):
- device_type = GenericAssetType(name="Device")
- db.session.add(device_type)
- electricity_device = GenericAsset(
- name="Electricity Consuming Device", generic_asset_type=device_type
- )
- db.session.add(electricity_device)
- power_sensor = Sensor(
- "power",
- generic_asset=electricity_device,
- event_resolution=timedelta(minutes=15),
- unit="MW",
- timezone="Europe/Amsterdam",
- )
- energy_sensor = Sensor(
- "energy",
- generic_asset=electricity_device,
- event_resolution=timedelta(minutes=15),
- unit="MWh",
- timezone="Europe/Amsterdam",
- )
- profit_sensor_hourly = Sensor(
- "profit hourly",
- generic_asset=generic_report,
- event_resolution=timedelta(hours=1),
- unit="EUR",
- timezone="Europe/Amsterdam",
- )
- profit_sensor_daily = Sensor(
- "profit daily",
- generic_asset=generic_report,
- event_resolution=timedelta(hours=24),
- unit="EUR",
- timezone="Europe/Amsterdam",
- )
- db.session.add_all(
- [profit_sensor_hourly, profit_sensor_daily, energy_sensor, power_sensor]
- )
- time_slots = initialize_index(
- start=pd.Timestamp("2015-01-03").tz_localize("Europe/Amsterdam"),
- end=pd.Timestamp("2015-01-04").tz_localize("Europe/Amsterdam"),
- resolution="15min",
- )
- def save_values(sensor, values):
- beliefs = [
- TimedBelief(
- event_start=dt,
- belief_horizon=timedelta(hours=0),
- event_value=val,
- source=setup_sources["Seita"],
- sensor=sensor,
- )
- for dt, val in zip(time_slots, values)
- ]
- db.session.add_all(beliefs)
- # periodic pattern of producing 100kW for 4h and consuming 100kW for 4h:
- # i.e. [0.1 0.1 0.1 0.1 ... -0.1 -0.1 -0.1 -0.1]
- save_values(power_sensor, ([0.1] * 16 + [-0.1] * 16) * 3)
- # creating the same pattern as above but with energy
- # a flat consumption / production rate of 100kW is equivalent to consume / produce 25kWh
- # every 15min block for 1h
- save_values(energy_sensor, ([0.025] * 16 + [-0.025] * 16) * 3)
- db.session.commit()
- yield profit_sensor_hourly, profit_sensor_daily, power_sensor, energy_sensor
- @pytest.fixture(scope="module")
- def setup_dummy_data(db, app, generic_report):
- """
- Create 2 Sensors, 1 Asset and 1 AssetType
- """
- dummy_asset_type = GenericAssetType(name="DummyGenericAssetType")
- db.session.add(dummy_asset_type)
- dummy_asset = GenericAsset(
- name="DummyGenericAsset", generic_asset_type=dummy_asset_type
- )
- db.session.add(dummy_asset)
- sensor1 = Sensor(
- "sensor 1",
- generic_asset=dummy_asset,
- event_resolution=timedelta(hours=1),
- unit="kW",
- )
- db.session.add(sensor1)
- sensor2 = Sensor(
- "sensor 2", generic_asset=dummy_asset, event_resolution=timedelta(hours=1)
- )
- db.session.add(sensor2)
- sensor3 = Sensor(
- "sensor 3",
- generic_asset=dummy_asset,
- event_resolution=timedelta(hours=1),
- timezone="Europe/Amsterdam",
- )
- db.session.add(sensor3)
- sensor4 = Sensor(
- "sensor 4",
- generic_asset=dummy_asset,
- event_resolution=timedelta(minutes=15),
- timezone="Europe/Amsterdam",
- unit="kW",
- )
- db.session.add(sensor4)
- report_sensor = Sensor(
- "report sensor",
- generic_asset=generic_report,
- event_resolution=timedelta(hours=1),
- )
- db.session.add(report_sensor)
- daily_report_sensor = Sensor(
- "daily report sensor",
- generic_asset=generic_report,
- event_resolution=timedelta(days=1),
- timezone="Europe/Amsterdam",
- )
- db.session.add(daily_report_sensor)
- """
- Create 3 DataSources
- """
- source1 = DataSource("source1", type="A")
- source2 = DataSource("source2", type="B")
- source2v02 = DataSource("source2", type="B", version="0.2")
- """
- Create TimedBeliefs
- """
- beliefs = []
- for sensor in [sensor1, sensor2]:
- for si, source in enumerate([source1, source2]):
- for t in range(10):
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 4, 10, tzinfo=utc)
- + timedelta(hours=t + si),
- belief_horizon=timedelta(hours=24),
- event_value=t,
- sensor=sensor,
- source=source,
- )
- )
- # add simple data for testing the AggregatorReporter:
- # 24 hourly events with value 1 for sensor1 and value -1 for sensor2
- for sensor, source, value in zip([sensor1, sensor2], [source1, source2], [1, -1]):
- for t in range(24):
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 5, 10, tzinfo=utc) + timedelta(hours=t),
- belief_horizon=timedelta(hours=24),
- event_value=value,
- sensor=sensor,
- source=source,
- )
- )
- # add simple data for testing DST transition
- for t in range(24 * 4): # create data for 4 days
- # UTC+1 -> UTC+2
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 3, 24, tzinfo=utc) + timedelta(hours=t),
- belief_horizon=timedelta(hours=24),
- event_value=t,
- sensor=sensor3,
- source=source1,
- )
- )
- # UTC+2 -> UTC+1
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 10, 27, tzinfo=utc) + timedelta(hours=t),
- belief_horizon=timedelta(hours=24),
- event_value=t,
- sensor=sensor3,
- source=source1,
- )
- )
- # Add data source transition, from DataSource 1 to DataSource 2
- # At 12:00, there is one event from both of the sources
- for t in range(12): # create data for 4 days
- # 00:00 -> 12:00
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 4, 24, tzinfo=utc) + timedelta(hours=t),
- belief_horizon=timedelta(hours=24),
- event_value=1,
- sensor=sensor3,
- source=source1,
- )
- )
- # 12:00 -> 24:00
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 4, 24, tzinfo=utc) + timedelta(hours=t + 12),
- belief_horizon=timedelta(hours=24),
- event_value=-1,
- sensor=sensor3,
- source=source2,
- )
- )
- # add a belief belonging to Source 1 in the second half of the day
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 4, 24, tzinfo=utc) + timedelta(hours=12),
- belief_horizon=timedelta(hours=24),
- event_value=1,
- sensor=sensor3,
- source=source1,
- )
- )
- # add a belief belonging to version 0.2 of Source 2 around the end of the day, recorded 25 instead of 24 hours in advance
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 4, 24, tzinfo=utc) + timedelta(hours=23),
- belief_horizon=timedelta(hours=25),
- event_value=3,
- sensor=sensor3,
- source=source2v02,
- )
- )
- # add data for sensor 4
- for t in range(24 * 3):
- beliefs.append(
- TimedBelief(
- event_start=datetime(2023, 1, 1, tzinfo=utc) + timedelta(hours=t),
- belief_horizon=timedelta(hours=24),
- event_value=1,
- sensor=sensor4,
- source=source1,
- )
- )
- db.session.add_all(beliefs)
- db.session.commit()
- yield sensor1, sensor2, sensor3, sensor4, report_sensor, daily_report_sensor
|