from __future__ import annotations import pytest from datetime import datetime, timedelta from random import random import pandas as pd import numpy as np from flask_sqlalchemy import SQLAlchemy from statsmodels.api import OLS from flexmeasures import AssetType, Asset, Sensor import timely_beliefs as tb from sqlalchemy import select from flexmeasures.data.models.reporting import Reporter from flexmeasures.data.schemas.reporting import ReporterParametersSchema from flexmeasures.data.models.annotations import Annotation from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType from flexmeasures.data.models.forecasting import model_map from flexmeasures.data.models.forecasting.model_spec_factory import ( create_initial_model_specs, ) from flexmeasures.utils.time_utils import as_server_time from marshmallow import fields from marshmallow import Schema @pytest.fixture(scope="module") def setup_test_data( db, app, add_market_prices, setup_assets, setup_generic_asset_types, ): """ Adding a few forecasting jobs (based on data made in flexmeasures.conftest). """ print("Setting up data for data tests on %s" % db.engine) add_test_weather_sensor_and_forecasts(db, setup_generic_asset_types) print("Done setting up data for data tests") return setup_assets @pytest.fixture(scope="function") def setup_fresh_test_data( fresh_db, setup_markets_fresh_db, setup_accounts_fresh_db, setup_assets_fresh_db, setup_generic_asset_types_fresh_db, app, ) -> dict[str, GenericAsset]: add_test_weather_sensor_and_forecasts(fresh_db, setup_generic_asset_types_fresh_db) return setup_assets_fresh_db def add_test_weather_sensor_and_forecasts(db: SQLAlchemy, setup_generic_asset_types): """one day of test data (one complete sine curve) for two sensors""" data_source = db.session.execute( select(DataSource).filter_by(name="Seita", type="demo script") ).scalar_one_or_none() weather_station = GenericAsset( name="Test weather station farther away", generic_asset_type=setup_generic_asset_types["weather_station"], latitude=100, longitude=100, ) for sensor_name, unit in (("irradiance", "kW/m²"), ("wind speed", "m/s")): sensor = Sensor(name=sensor_name, generic_asset=weather_station, unit=unit) db.session.add(sensor) time_slots = pd.date_range( datetime(2015, 1, 1), datetime(2015, 1, 2, 23, 45), freq="15min" ) values = [random() * (1 + np.sin(x / 15)) for x in range(len(time_slots))] if sensor_name == "temperature": values = [value * 17 for value in values] if sensor_name == "wind speed": values = [value * 45 for value in values] if sensor_name == "irradiance": values = [value * 600 for value in values] for dt, val in zip(time_slots, values): db.session.add( TimedBelief( sensor=sensor, event_start=as_server_time(dt), event_value=val, belief_horizon=timedelta(hours=6), source=data_source, ) ) @pytest.fixture(scope="module", autouse=True) def add_failing_test_model(db): """Add a test model specs to the lookup which should fail due to missing data. It falls back to linear OLS (which falls back to naive).""" def test_specs(**args): """Customize initial specs with OLS and too early training start.""" model_specs = create_initial_model_specs(**args) model_specs.set_model(OLS) model_specs.start_of_training = model_specs.start_of_training - timedelta( days=365 ) model_identifier = "failing-test model v1" return model_specs, model_identifier, "linear-OLS" model_map["failing-test"] = test_specs @pytest.fixture(scope="module") def add_nearby_weather_sensors(db, add_weather_sensors) -> dict[str, Sensor]: temp_sensor_location = add_weather_sensors["temperature"].generic_asset.location weather_station_type = db.session.execute( select(GenericAssetType).filter_by(name="weather station") ).scalar_one_or_none() farther_weather_station = GenericAsset( name="Test weather station farther away", generic_asset_type=weather_station_type, latitude=temp_sensor_location[0], longitude=temp_sensor_location[1] + 0.1, ) db.session.add(farther_weather_station) farther_temp_sensor = Sensor( name="temperature", generic_asset=farther_weather_station, event_resolution=timedelta(minutes=5), unit="°C", ) db.session.add(farther_temp_sensor) even_farther_weather_station = GenericAsset( name="Test weather station even farther away", generic_asset_type=weather_station_type, latitude=temp_sensor_location[0], longitude=temp_sensor_location[1] + 0.2, ) db.session.add(even_farther_weather_station) even_farther_temp_sensor = Sensor( name="temperature", generic_asset=even_farther_weather_station, event_resolution=timedelta(minutes=5), unit="°C", ) db.session.add(even_farther_temp_sensor) add_weather_sensors["farther_temperature"] = farther_temp_sensor add_weather_sensors["even_farther_temperature"] = even_farther_temp_sensor return add_weather_sensors @pytest.fixture(scope="module") def setup_annotations( db, battery_soc_sensor, setup_sources, app, ): """Set up an annotation for an account, an asset and a sensor.""" sensor = battery_soc_sensor asset = sensor.generic_asset account = asset.owner source = setup_sources["Seita"] annotation = Annotation( content="Dutch new year", start=pd.Timestamp("2020-01-01 00:00+01"), end=pd.Timestamp("2020-01-02 00:00+01"), source=source, type="holiday", ) account.annotations.append(annotation) asset.annotations.append(annotation) sensor.annotations.append(annotation) db.session.flush() return dict( annotation=annotation, account=account, asset=asset, sensor=sensor, ) @pytest.fixture(scope="module") def test_reporter(app, db, add_nearby_weather_sensors): class TestReporterConfigSchema(Schema): a = fields.Str() class TestReporterParametersSchema(ReporterParametersSchema): b = fields.Str(required=False) class TestReporter(Reporter): _config_schema = TestReporterConfigSchema() _parameters_schema = TestReporterParametersSchema() def _compute_report(self, **kwargs) -> list: start = kwargs.get("start") end = kwargs.get("end") sensor = kwargs["output"][0]["sensor"] resolution = sensor.event_resolution index = pd.date_range(start=start, end=end, freq=resolution) r = pd.DataFrame() r["event_start"] = index r["belief_time"] = index r["source"] = self.data_source r["cumulative_probability"] = 0.5 r["event_value"] = 0 bdf = tb.BeliefsDataFrame(r, sensor=sensor) return [{"data": bdf, "sensor": sensor}] app.data_generators["reporter"].update({"TestReporter": TestReporter}) config = dict(a="b") ds = TestReporter(config=config).data_source assert ds.name == app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE") db.session.add(ds) db.session.commit() return ds @pytest.fixture(scope="function") def smart_building_types(app, fresh_db, setup_generic_asset_types_fresh_db): site = AssetType(name="site") building = AssetType(name="building") ev = AssetType(name="ev") heat_buffer = AssetType(name="heat buffer") fresh_db.session.add_all([site, building, ev, heat_buffer]) fresh_db.session.flush() return ( site, setup_generic_asset_types_fresh_db["solar"], building, setup_generic_asset_types_fresh_db["battery"], ev, heat_buffer, ) @pytest.fixture(scope="function") def smart_building(app, fresh_db, smart_building_types): """ Topology of the sytstem: +---------+ | | +------------------ Site +--------------+------------------+ | | | | | | +-+----+--+ | | | | | | | | | | | | | +----+ +--+ | | | | | | | +----+----+ +------+-----+ +--+---+ +------+------+ +------+------+ | | | | | | | | | | | Solar | | Building | | EV | | Battery | | Heat Buffer | | | | | | | | | | | +---------+ +------------+ +------+ +-------------+ +-------------+ Diagram created with: https://textik.com/#924f8a2112551f92 """ site, solar, building, battery, ev, heat_buffer = smart_building_types coordinates = {"latitude": 0, "longitude": 0} test_site = Asset(name="Test Site", generic_asset_type_id=site.id, **coordinates) fresh_db.session.add(test_site) fresh_db.session.flush() test_building = Asset( name="Test Building", generic_asset_type_id=building.id, parent_asset_id=test_site.id, **coordinates, ) test_solar = Asset( name="Test Solar", generic_asset_type_id=solar.id, parent_asset_id=test_site.id, **coordinates, ) test_battery = Asset( name="Test Battery", generic_asset_type_id=battery.id, parent_asset_id=test_site.id, **coordinates, ) test_ev = Asset( name="Test EV", generic_asset_type_id=ev.id, parent_asset_id=test_site.id, **coordinates, ) test_battery_1h = Asset( name="Test Battery 1h", generic_asset_type_id=battery.id, parent_asset_id=test_site.id, **coordinates, ) test_heat_buffer = Asset( name="Test Heat Buffer", generic_asset_type_id=heat_buffer.id, parent_asset_id=test_site.id, **coordinates, ) assets = ( test_site, test_building, test_solar, test_battery, test_ev, test_battery_1h, test_heat_buffer, ) fresh_db.session.add_all(assets) fresh_db.session.flush() power_sensors = [] soc_sensors = [] for asset in assets: # Add power sensor sensor = Sensor( name="power", unit="MW", event_resolution=( timedelta(hours=1) if asset.name == "Test Battery 1h" else timedelta(minutes=15) ), generic_asset=asset, timezone="Europe/Amsterdam", ) power_sensors.append(sensor) # Add SOC sensors sensor = Sensor( "state of charge", unit="MWh", event_resolution=timedelta(hours=0), generic_asset=asset, timezone="Europe/Amsterdam", ) soc_sensors.append(sensor) fresh_db.session.add_all(power_sensors) fresh_db.session.add_all(soc_sensors) fresh_db.session.flush() asset_names = [asset.name for asset in assets] return ( dict(zip(asset_names, assets)), dict(zip(asset_names, power_sensors)), dict(zip(asset_names, soc_sensors)), ) @pytest.fixture(scope="function") def flex_description_sequential( smart_building, setup_markets_fresh_db, add_market_prices_fresh_db ): """Set up a flex-context and a partially deserialized flex-model. Specifically, the main flex model is deserialized, while the sensors' individual flex models are still serialized. """ assets, sensors, soc_sensors = smart_building flex_model = [ { "sensor": sensors["Test EV"], "sensor_flex_model": { "consumption-capacity": "5kW", "production-capacity": "0kW", "power-capacity": "5kW", "soc-at-start": 0.00, # 0 kWh "soc-unit": "MWh", "soc-min": 0.0, "soc-max": 0.05, # 50 kWh "soc-targets": [ { "start": "2015-01-03T00:00:00+01:00", "end": "2015-01-03T05:00:00+01:00", "value": 0.0, }, { "datetime": "2015-01-03T07:45:00+01:00", "value": 0.0125, }, # 12.5 kWh {"datetime": "2015-01-03T17:45:00+01:00", "value": 0.025}, # 25 kWh { "datetime": "2015-01-03T23:45:00+01:00", "value": 0.0375, }, # 37.5 kWh ], }, }, { "sensor": sensors["Test Battery"], "sensor_flex_model": { "consumption-capacity": "0kW", "production-capacity": "5kW", "power-capacity": "5kW", "soc-at-start": 0.1, # 100 kWh "soc-unit": "MWh", "soc-min": 0.0, "soc-max": 0.1, # 100 kWh "soc-targets": [ { "datetime": "2015-01-03T03:00:00+01:00", "value": 0.094, } # 6 kWh discharge ], }, }, ] flex_context = { "consumption-price-sensor": setup_markets_fresh_db["epex_da"].id, "production-price-sensor": setup_markets_fresh_db["epex_da"].id, "inflexible-device-sensors": [ sensors["Test Solar"].id, sensors["Test Building"].id, ], "site-production-capacity": "2kW", "site-consumption-capacity": "5kW", } return dict(flex_model=flex_model, flex_context=flex_context)