123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533 |
- import pytest
- import json
- import yaml
- import os
- from datetime import datetime
- import pytz
- from sqlalchemy import select, func
- from flexmeasures import Asset
- from flexmeasures.cli.tests.utils import to_flags
- from flexmeasures.data.models.annotations import (
- Annotation,
- AccountAnnotationRelationship,
- )
- from flexmeasures.data.models.user import Account
- from flexmeasures.data.models.data_sources import DataSource
- from flexmeasures.data.models.time_series import Sensor
- from flexmeasures.cli.tests.utils import (
- check_command_ran_without_error,
- get_click_commands,
- )
- from flexmeasures.utils.time_utils import server_now
- from flexmeasures.tests.utils import get_test_sensor
- @pytest.mark.skip_github
- def test_add_annotation(app, db, setup_roles_users):
- from flexmeasures.cli.data_add import add_annotation
- cli_input = {
- "content": "Company founding day",
- "at": "2016-05-11T00:00+02:00",
- "account": 1,
- "user": 1,
- }
- runner = app.test_cli_runner()
- result = runner.invoke(add_annotation, to_flags(cli_input))
- # Check result for success
- assert "Successfully added annotation" in result.output
- # Check database for annotation entry
- assert db.session.execute(
- select(Annotation)
- .filter_by(
- content=cli_input["content"],
- start=cli_input["at"],
- )
- .join(AccountAnnotationRelationship)
- .filter_by(
- account_id=cli_input["account"],
- annotation_id=Annotation.id,
- )
- .join(DataSource)
- .filter_by(
- id=Annotation.source_id,
- user_id=cli_input["user"],
- )
- ).scalar_one_or_none()
- @pytest.mark.skip_github
- def test_add_holidays(app, db, setup_roles_users):
- from flexmeasures.cli.data_add import add_holidays
- cli_input = {
- "year": 2020,
- "country": "NL",
- "account": 1,
- }
- runner = app.test_cli_runner()
- result = runner.invoke(add_holidays, to_flags(cli_input))
- # Check result for 11 public holidays
- assert "'NL': 11" in result.output
- # Check database for 11 annotation entries
- assert (
- db.session.scalar(
- select(func.count())
- .select_from(Annotation)
- .join(AccountAnnotationRelationship)
- .filter(
- AccountAnnotationRelationship.account_id == cli_input["account"],
- AccountAnnotationRelationship.annotation_id == Annotation.id,
- )
- .join(DataSource)
- .filter(
- DataSource.id == Annotation.source_id,
- DataSource.name == "workalendar",
- DataSource.model == cli_input["country"],
- )
- )
- == 11
- )
- def test_cli_help(app):
- """Test that showing help does not throw an error."""
- from flexmeasures.cli import data_add
- runner = app.test_cli_runner()
- for cmd in get_click_commands(data_add):
- result = runner.invoke(cmd, ["--help"])
- check_command_ran_without_error(result)
- assert "Usage" in result.output
- @pytest.mark.skip_github
- def test_add_reporter(app, fresh_db, setup_dummy_data):
- """
- The reporter aggregates input data from two sensors (both have 200 data points)
- to a two-hour resolution.
- The command is run twice:
- - The first run is for ten hours, so you expect five results.
- - start and end are defined in the configuration: 2023-04-10T00:00 -> 2023-04-10T10:00
- - this step uses 10 hours of data -> outputs 5 periods of 2 hours
- - The second is run without timing params, so the rest of the data
- - start is the time of the latest report value
- - end is the time of latest input data value
- - this step uses 190 hours of data -> outputs 95 periods of 2 hours
- """
- from flexmeasures.cli.data_add import add_report
- sensor1_id, sensor2_id, report_sensor_id, _ = setup_dummy_data
- reporter_config = dict(
- required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
- required_output=[{"name": "df_agg"}],
- transformations=[
- dict(
- df_input="sensor_1",
- method="add",
- args=["@sensor_2"],
- df_output="df_agg",
- ),
- dict(method="resample_events", args=["2h"]),
- ],
- )
- # Running the command with start and end values.
- runner = app.test_cli_runner()
- cli_input_params = {
- "config": "reporter_config.yaml",
- "parameters": "parameters.json",
- "reporter": "PandasReporter",
- "start": "2023-04-10T00:00:00 00:00",
- "end": "2023-04-10T10:00:00 00:00",
- "output-file": "test.csv",
- }
- parameters = dict(
- input=[
- dict(name="sensor_1", sensor=sensor1_id),
- dict(name="sensor_2", sensor=sensor2_id),
- ],
- output=[dict(name="df_agg", sensor=report_sensor_id)],
- )
- cli_input = to_flags(cli_input_params)
- # store config into config
- cli_input.append("--save-config")
- # run test in an isolated file system
- with runner.isolated_filesystem():
- # save reporter_config to a json file
- with open("reporter_config.yaml", "w") as f:
- yaml.dump(reporter_config, f)
- with open("parameters.json", "w") as f:
- json.dump(parameters, f)
- # call command
- result = runner.invoke(add_report, cli_input)
- check_command_ran_without_error(result)
- report_sensor = fresh_db.session.get(
- Sensor, report_sensor_id
- ) # get fresh report sensor instance
- assert "Reporter PandasReporter found" in result.output
- assert f"Report computation done for sensor `{report_sensor}`." in result.output
- # Check report is saved to the database
- stored_report = report_sensor.search_beliefs(
- event_starts_after=cli_input_params.get("start").replace(" ", "+"),
- event_ends_before=cli_input_params.get("end").replace(" ", "+"),
- )
- assert (
- stored_report.values.T == [1, 2 + 3, 4 + 5, 6 + 7, 8 + 9]
- ).all() # check values
- assert os.path.exists("test.csv") # check that the file has been created
- assert (
- os.path.getsize("test.csv") > 100
- ) # bytes. Check that the file is not empty
- # Running the command without without timing params (start-offset/end-offset nor start/end).
- # This makes the command default the start time to the date of the last
- # value of the reporter sensor and the end time as the current time.
- previous_command_end = cli_input_params.get("end").replace(" ", "+")
- cli_input_params = {
- "source": stored_report.sources[0].id,
- "parameters": "parameters.json",
- "output-file": "test.csv",
- "timezone": "UTC",
- }
- cli_input = to_flags(cli_input_params)
- with runner.isolated_filesystem():
- # save reporter_config to a json file
- with open("reporter_config.json", "w") as f:
- json.dump(reporter_config, f)
- with open("parameters.json", "w") as f:
- json.dump(parameters, f)
- # call command
- result = runner.invoke(add_report, cli_input)
- check_command_ran_without_error(result)
- # Check if the report is saved to the database
- report_sensor = fresh_db.session.get(
- Sensor, report_sensor_id
- ) # get fresh report sensor instance
- assert (
- "Reporter `PandasReporter` fetched successfully from the database."
- in result.output
- )
- assert f"Report computation done for sensor `{report_sensor}`." in result.output
- stored_report = report_sensor.search_beliefs(
- event_starts_after=previous_command_end,
- event_ends_before=server_now(),
- )
- assert len(stored_report) == 95
- @pytest.mark.skip_github
- def test_add_multiple_output(app, fresh_db, setup_dummy_data):
- """ """
- from flexmeasures.cli.data_add import add_report
- sensor_1_id, sensor_2_id, report_sensor_id, report_sensor_2_id = setup_dummy_data
- reporter_config = dict(
- required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
- required_output=[{"name": "df_agg"}, {"name": "df_sub"}],
- transformations=[
- dict(
- df_input="sensor_1",
- method="add",
- args=["@sensor_2"],
- df_output="df_agg",
- ),
- dict(method="resample_events", args=["2h"]),
- dict(
- df_input="sensor_1",
- method="subtract",
- args=["@sensor_2"],
- df_output="df_sub",
- ),
- dict(method="resample_events", args=["2h"]),
- ],
- )
- # Running the command with start and end values.
- runner = app.test_cli_runner()
- cli_input_params = {
- "config": "reporter_config.yaml",
- "parameters": "parameters.json",
- "reporter": "PandasReporter",
- "start": "2023-04-10T00:00:00+00:00",
- "end": "2023-04-10T10:00:00+00:00",
- "output-file": "test-$name.csv",
- }
- parameters = dict(
- input=[
- dict(name="sensor_1", sensor=sensor_1_id),
- dict(name="sensor_2", sensor=sensor_2_id),
- ],
- output=[
- dict(name="df_agg", sensor=report_sensor_id),
- dict(name="df_sub", sensor=report_sensor_2_id),
- ],
- )
- cli_input = to_flags(cli_input_params)
- # run test in an isolated file system
- with runner.isolated_filesystem():
- # save reporter_config to a json file
- with open("reporter_config.yaml", "w") as f:
- yaml.dump(reporter_config, f)
- with open("parameters.json", "w") as f:
- json.dump(parameters, f)
- # call command
- result = runner.invoke(add_report, cli_input)
- check_command_ran_without_error(result)
- assert os.path.exists("test-df_agg.csv")
- assert os.path.exists("test-df_sub.csv")
- report_sensor = fresh_db.session.get(Sensor, report_sensor_id)
- report_sensor_2 = fresh_db.session.get(Sensor, report_sensor_2_id)
- assert "Reporter PandasReporter found" in result.output
- assert f"Report computation done for sensor `{report_sensor}`." in result.output
- assert (
- f"Report computation done for sensor `{report_sensor_2}`." in result.output
- )
- # check that the reports are saved
- assert all(
- report_sensor.search_beliefs(
- event_ends_before=datetime(2023, 4, 10, 10, tzinfo=pytz.UTC)
- ).values.flatten()
- == [1, 5, 9, 13, 17]
- )
- assert all(report_sensor_2.search_beliefs() == 0)
- @pytest.mark.skip_github
- @pytest.mark.parametrize("process_type", [("INFLEXIBLE"), ("SHIFTABLE"), ("BREAKABLE")])
- def test_add_process(
- app, process_power_sensor, process_type, add_market_prices_fresh_db, db
- ):
- """
- Schedule a 4h of consumption block at a constant power of 400kW in a day using
- the three process policies: INFLEXIBLE, SHIFTABLE and BREAKABLE.
- """
- from flexmeasures.cli.data_add import add_schedule_process
- epex_da = get_test_sensor(db)
- process_power_sensor_id = process_power_sensor
- cli_input_params = {
- "sensor": process_power_sensor_id,
- "start": "2015-01-02T00:00:00+01:00",
- "duration": "PT24H",
- "process-duration": "PT4H",
- "process-power": "0.4MW",
- "process-type": process_type,
- "consumption-price-sensor": epex_da.id,
- "forbid": '{"start" : "2015-01-02T00:00:00+01:00", "duration" : "PT2H"}',
- }
- cli_input = to_flags(cli_input_params)
- runner = app.test_cli_runner()
- # call command
- result = runner.invoke(add_schedule_process, cli_input)
- check_command_ran_without_error(result)
- process_power_sensor = db.session.get(Sensor, process_power_sensor_id)
- schedule = process_power_sensor.search_beliefs()
- # check if the schedule is not empty more detailed testing can be found
- # in data/models/planning/tests/test_process.py.
- assert (schedule == -0.4).event_value.sum() == 4
- @pytest.mark.skip_github
- @pytest.mark.parametrize(
- "event_resolution, name, success",
- [("PT20M", "ONE", True), (15, "TWO", True), ("some_string", "THREE", False)],
- )
- def test_add_sensor(app, db, setup_dummy_asset, event_resolution, name, success):
- from flexmeasures.cli.data_add import add_sensor
- asset = setup_dummy_asset
- runner = app.test_cli_runner()
- cli_input = {
- "name": name,
- "event-resolution": event_resolution,
- "unit": "kWh",
- "asset": asset,
- "timezone": "UTC",
- }
- runner = app.test_cli_runner()
- result = runner.invoke(add_sensor, to_flags(cli_input))
- sensor: Sensor = db.session.execute(
- select(Sensor).filter_by(name=name)
- ).scalar_one_or_none()
- if success:
- check_command_ran_without_error(result)
- sensor.unit == "kWh"
- else:
- assert result.exit_code == 1
- assert sensor is None
- @pytest.mark.skip_github
- @pytest.mark.parametrize(
- "name, consultancy_account_id, success",
- [
- ("Test ConsultancyClient Account", 1, False),
- ("Test CLIConsultancyClient Account", 2, True),
- ("Test Account", None, True),
- ],
- )
- def test_add_account(
- app, fresh_db, setup_accounts_fresh_db, name, consultancy_account_id, success
- ):
- """Test adding a new account."""
- from flexmeasures.cli.data_add import new_account
- cli_input = {
- "name": name,
- "roles": "TestRole",
- "consultancy": consultancy_account_id,
- }
- runner = app.test_cli_runner()
- result = runner.invoke(new_account, to_flags(cli_input))
- if success:
- assert "successfully created." in result.output
- account = fresh_db.session.execute(
- select(Account).filter_by(name=cli_input["name"])
- ).scalar_one_or_none()
- assert account.consultancy_account_id == consultancy_account_id
- else:
- # fail because "Test ConsultancyClient Account" already exists
- assert result.exit_code == 1
- @pytest.mark.skip_github
- @pytest.mark.parametrize("storage_power_capacity", ["sensor", "quantity", None])
- @pytest.mark.parametrize("storage_efficiency", ["sensor", "quantity", None])
- def test_add_storage_schedule(
- app,
- add_market_prices_fresh_db,
- storage_schedule_sensors,
- storage_power_capacity,
- storage_efficiency,
- db,
- ):
- """
- Test the 'flexmeasures add schedule for-storage' CLI command for adding storage schedules.
- This test evaluates the command's functionality in creating storage schedules for different configurations
- of power capacity and storage efficiency. It uses a combination of sensor-based and manually specified values
- for these parameters.
- The test performs the following steps:
- 1. Simulates running the `flexmeasures add toy-account` command to set up a test account.
- 2. Configures CLI input parameters for scheduling, including the start time, duration, and sensor IDs.
- The test also sets up parameters for state of charge at start and roundtrip efficiency.
- 3. Depending on the test parameters, adjusts power capacity and efficiency settings. These settings can be
- either sensor-based (retrieved from storage_schedule_sensors fixture), manually specified quantities,
- or left undefined.
- 4. Executes the 'add_schedule_for_storage' command with the configured parameters.
- 5. Verifies that the command executes successfully (exit code 0) and that the correct number of scheduled
- values (48 for a 12-hour period with 15-minute resolution) are created for the power sensor.
- """
- power_capacity_sensor, storage_efficiency_sensor = storage_schedule_sensors
- from flexmeasures.cli.data_add import add_schedule_for_storage, add_toy_account
- runner = app.test_cli_runner()
- runner.invoke(add_toy_account)
- toy_account = db.session.execute(
- select(Account).filter_by(name="Toy Account")
- ).scalar_one_or_none()
- battery = db.session.execute(
- select(Asset).filter_by(name="toy-battery", owner=toy_account)
- ).scalar_one_or_none()
- power_sensor = battery.sensors[0]
- prices = add_market_prices_fresh_db["epex_da"]
- cli_input_params = {
- "start": "2014-12-31T23:00:00+00",
- "duration": "PT12H",
- "sensor": battery.sensors[0].id,
- "consumption-price-sensor": prices.id,
- "soc-at-start": "50%",
- "roundtrip-efficiency": "90%",
- }
- if storage_power_capacity is not None:
- if storage_power_capacity == "sensor":
- cli_input_params["storage-consumption-capacity"] = (
- f"sensor:{power_capacity_sensor}"
- )
- cli_input_params["storage-production-capacity"] = (
- f"sensor:{power_capacity_sensor}"
- )
- else:
- cli_input_params["storage-consumption-capacity"] = "700kW"
- cli_input_params["storage-production-capacity"] = "700kW"
- if storage_efficiency is not None:
- if storage_efficiency == "sensor":
- cli_input_params["storage-efficiency"] = (
- f"sensor:{storage_efficiency_sensor}"
- )
- else:
- cli_input_params["storage-efficiency"] = "90%"
- cli_input = to_flags(cli_input_params)
- result = runner.invoke(add_schedule_for_storage, cli_input)
- check_command_ran_without_error(result)
- assert len(power_sensor.search_beliefs()) == 48
|