123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441 |
- from flask import url_for
- import pytest
- from isodate import parse_datetime
- from rq.job import Job
- from sqlalchemy import select
- from flexmeasures.api.common.responses import unknown_schedule, unrecognized_event
- from flexmeasures.api.tests.utils import check_deprecation
- from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule
- from flexmeasures.data.models.data_sources import DataSource
- from flexmeasures.data.tests.utils import work_on_rq
- from flexmeasures.data.services.scheduling import handle_scheduling_exception
- from flexmeasures.tests.utils import get_test_sensor
- from flexmeasures.utils.unit_utils import ur
- @pytest.mark.parametrize(
- "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
- )
- def test_get_schedule_wrong_job_id(
- app,
- add_market_prices,
- add_battery_assets,
- battery_soc_sensor,
- add_charging_station_assets,
- keep_scheduling_queue_empty,
- requesting_user,
- ):
- wrong_job_id = 9999
- sensor = add_battery_assets["Test battery"].sensors[0]
- with app.test_client() as client:
- get_schedule_response = client.get(
- url_for("SensorAPI:get_schedule", id=sensor.id, uuid=wrong_job_id),
- )
- print("Server responded with:\n%s" % get_schedule_response.json)
- check_deprecation(get_schedule_response, deprecation=None, sunset=None)
- assert get_schedule_response.status_code == 400
- assert get_schedule_response.json == unrecognized_event(wrong_job_id, "job")[0]
- @pytest.mark.parametrize(
- "message, field, sent_value, err_msg",
- [
- (message_for_trigger_schedule(), "soc-minn", 3, "Unknown field"),
- (
- message_for_trigger_schedule(),
- "soc-min",
- "not-a-float",
- "Cannot convert value 'not-a-float' to a valid quantity.",
- ),
- (message_for_trigger_schedule(), "soc-unit", "MWH", "Must be one of"),
- # todo: add back test in case we stop grandfathering ignoring too-far-into-the-future targets, or amend otherwise
- # (
- # message_for_trigger_schedule(
- # with_targets=True, too_far_into_the_future_targets=True
- # ),
- # "soc-targets",
- # None,
- # "Target datetime exceeds",
- # ),
- ],
- )
- @pytest.mark.parametrize(
- "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
- )
- def test_trigger_schedule_with_invalid_flexmodel(
- app,
- add_battery_assets,
- keep_scheduling_queue_empty,
- message,
- field,
- sent_value,
- err_msg,
- requesting_user,
- ):
- sensor = add_battery_assets["Test battery"].sensors[0]
- with app.test_client() as client:
- if sent_value: # if None, field is a term we expect in the response, not more
- message["flex-model"][field] = sent_value
- trigger_schedule_response = client.post(
- url_for("SensorAPI:trigger_schedule", id=sensor.id),
- json=message,
- )
- print("Server responded with:\n%s" % trigger_schedule_response.json)
- check_deprecation(trigger_schedule_response, deprecation=None, sunset=None)
- assert trigger_schedule_response.status_code == 422
- assert field in trigger_schedule_response.json["message"]["json"]
- if isinstance(trigger_schedule_response.json["message"]["json"], str):
- # ValueError
- assert err_msg in trigger_schedule_response.json["message"]["json"]
- else:
- # ValidationError (marshmallow)
- assert (
- err_msg in trigger_schedule_response.json["message"]["json"][field][0]
- )
- @pytest.mark.parametrize("message", [message_for_trigger_schedule(unknown_prices=True)])
- @pytest.mark.parametrize(
- "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
- )
- def test_trigger_and_get_schedule_with_unknown_prices(
- app,
- client,
- add_market_prices,
- add_battery_assets,
- battery_soc_sensor,
- add_charging_station_assets,
- keep_scheduling_queue_empty,
- message,
- requesting_user,
- db,
- ):
- sensor = add_battery_assets["Test battery"].sensors[0]
- # trigger a schedule through the /sensors/<id>/schedules/trigger [POST] api endpoint
- trigger_schedule_response = client.post(
- url_for("SensorAPI:trigger_schedule", id=sensor.id),
- json=message,
- )
- print("Server responded with:\n%s" % trigger_schedule_response.json)
- check_deprecation(trigger_schedule_response, deprecation=None, sunset=None)
- assert trigger_schedule_response.status_code == 200
- job_id = trigger_schedule_response.json["schedule"]
- # look for scheduling jobs in queue
- assert (
- len(app.queues["scheduling"]) == 1
- ) # only 1 schedule should be made for 1 asset
- job = app.queues["scheduling"].jobs[0]
- assert job.kwargs["asset_or_sensor"]["id"] == sensor.id
- assert job.kwargs["start"] == parse_datetime(message["start"])
- assert job.id == job_id
- # process the scheduling queue
- work_on_rq(app.queues["scheduling"], exc_handler=handle_scheduling_exception)
- assert (
- Job.fetch(job_id, connection=app.queues["scheduling"].connection).is_failed
- is True
- )
- # check results are not in the database
- scheduler_source = db.session.execute(
- select(DataSource).filter_by(name="Seita", type="scheduler")
- ).scalar_one_or_none()
- assert (
- scheduler_source is None
- ) # Make sure the scheduler data source is still not there
- # try to retrieve the schedule through the /sensors/<id>/schedules/<job_id> [GET] api endpoint
- get_schedule_response = client.get(
- url_for("SensorAPI:get_schedule", id=sensor.id, uuid=job_id),
- )
- print("Server responded with:\n%s" % get_schedule_response.json)
- check_deprecation(get_schedule_response, deprecation=None, sunset=None)
- assert get_schedule_response.status_code == 400
- assert get_schedule_response.json["status"] == unknown_schedule()[0]["status"]
- assert "prices unknown" in get_schedule_response.json["message"].lower()
- @pytest.mark.parametrize(
- "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
- )
- def test_get_schedule_fallback(
- app,
- add_battery_assets,
- add_market_prices,
- battery_soc_sensor,
- add_charging_station_assets,
- keep_scheduling_queue_empty,
- requesting_user,
- db,
- ):
- """
- Test if the fallback job is created after a failing StorageScheduler call. This test
- is based on flexmeasures/data/models/planning/tests/test_solver.py
- """
- assert app.config["FLEXMEASURES_FALLBACK_REDIRECT"] is False
- app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = True
- target_soc = 9
- charging_station_name = "Test charging station"
- start = "2015-01-02T00:00:00+01:00"
- epex_da = get_test_sensor(db)
- charging_station = add_charging_station_assets[charging_station_name].sensors[0]
- capacity = charging_station.get_attribute(
- "capacity_in_mw",
- ur.Quantity(charging_station.get_attribute("site-power-capacity"))
- .to("MW")
- .magnitude,
- )
- assert capacity == 2
- assert charging_station.get_attribute("consumption-price") == {"sensor": epex_da.id}
- # check that no Fallback schedule has been saved before
- models = [
- source.model for source in charging_station.search_beliefs().sources.unique()
- ]
- assert "StorageFallbackScheduler" not in models
- # create a scenario that yields an infeasible problem (unreachable target SOC at 2am)
- message = {
- "start": start,
- "duration": "PT24H",
- "flex-model": {
- "soc-at-start": 10,
- "soc-min": charging_station.get_attribute("min_soc_in_mwh", 0),
- "soc-max": charging_station.get_attribute("max-soc-in-mwh", target_soc),
- "roundtrip-efficiency": charging_station.get_attribute(
- "roundtrip-efficiency", 1
- ),
- "storage-efficiency": charging_station.get_attribute(
- "storage-efficiency", 1
- ),
- "soc-targets": [
- {"value": target_soc, "datetime": "2015-01-02T02:00:00+01:00"}
- ],
- },
- }
- with app.test_client() as client:
- # trigger storage scheduler
- trigger_schedule_response = client.post(
- url_for("SensorAPI:trigger_schedule", id=charging_station.id),
- json=message,
- )
- # check that the call is successful
- assert trigger_schedule_response.status_code == 200
- job_id = trigger_schedule_response.json["schedule"]
- # look for scheduling jobs in queue
- assert (
- len(app.queues["scheduling"]) == 1
- ) # only 1 schedule should be made for 1 asset
- job = app.queues["scheduling"].jobs[0]
- assert job.kwargs["asset_or_sensor"]["id"] == charging_station.id
- assert job.kwargs["start"] == parse_datetime(message["start"])
- assert job.id == job_id
- # process only the job that runs the storage scheduler (max_jobs=1)
- work_on_rq(
- app.queues["scheduling"],
- exc_handler=handle_scheduling_exception,
- max_jobs=1,
- )
- # check that the job is failing
- assert Job.fetch(
- job_id, connection=app.queues["scheduling"].connection
- ).is_failed
- # the callback creates the fallback job which is still pending
- assert len(app.queues["scheduling"]) == 1
- fallback_job_id = Job.fetch(
- job_id, connection=app.queues["scheduling"].connection
- ).meta.get("fallback_job_id")
- # check that the fallback_job_id is stored on the metadata of the original job
- assert app.queues["scheduling"].get_job_ids()[0] == fallback_job_id
- assert fallback_job_id != job_id
- get_schedule_response = client.get(
- url_for("SensorAPI:get_schedule", id=charging_station.id, uuid=job_id),
- )
- # requesting the original job redirects to the fallback job
- assert (
- get_schedule_response.status_code == 303
- ) # Status code for redirect ("See other")
- assert (
- get_schedule_response.json["message"]
- == "Scheduling job failed with InfeasibleProblemException: . StorageScheduler was used."
- )
- assert get_schedule_response.json["status"] == "UNKNOWN_SCHEDULE"
- assert get_schedule_response.json["result"] == "Rejected"
- # check that the redirection location points to the fallback job
- assert (
- get_schedule_response.headers["location"]
- == f"http://localhost/api/v3_0/sensors/{charging_station.id}/schedules/{fallback_job_id}"
- )
- # run the fallback job
- work_on_rq(
- app.queues["scheduling"],
- exc_handler=handle_scheduling_exception,
- max_jobs=1,
- )
- # check that the queue is empty
- assert len(app.queues["scheduling"]) == 0
- # get the fallback schedule
- fallback_schedule = client.get(
- get_schedule_response.headers["location"],
- json={"duration": "PT24H"},
- ).json
- # check that the fallback schedule has the right status and start dates
- assert fallback_schedule["status"] == "PROCESSED"
- assert parse_datetime(fallback_schedule["start"]) == parse_datetime(start)
- models = [
- source.model
- for source in charging_station.search_beliefs().sources.unique()
- ]
- assert "StorageFallbackScheduler" in models
- app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = False
- @pytest.mark.parametrize(
- "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
- )
- def test_get_schedule_fallback_not_redirect(
- app,
- add_battery_assets,
- add_market_prices,
- battery_soc_sensor,
- add_charging_station_assets,
- keep_scheduling_queue_empty,
- requesting_user,
- db,
- ):
- """
- Test if the fallback scheduler is returned directly after a failing StorageScheduler call. This test
- is based on flexmeasures/data/models/planning/tests/test_solver.py
- """
- app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = False
- target_soc = 9
- charging_station_name = "Test charging station"
- start = "2015-01-02T00:00:00+01:00"
- epex_da = get_test_sensor(db)
- charging_station = add_charging_station_assets[charging_station_name].sensors[0]
- capacity = charging_station.get_attribute(
- "capacity_in_mw",
- ur.Quantity(charging_station.get_attribute("site-power-capacity"))
- .to("MW")
- .magnitude,
- )
- assert capacity == 2
- assert charging_station.get_attribute("consumption-price") == {"sensor": epex_da.id}
- # create a scenario that yields an infeasible problem (unreachable target SOC at 2am)
- message = {
- "start": start,
- "duration": "PT24H",
- "flex-model": {
- "soc-at-start": 10,
- "soc-min": charging_station.get_attribute("min_soc_in_mwh", 0),
- "soc-max": charging_station.get_attribute("max-soc-in-mwh", target_soc),
- "roundtrip-efficiency": charging_station.get_attribute(
- "roundtrip-efficiency", 1
- ),
- "storage-efficiency": charging_station.get_attribute(
- "storage-efficiency", 1
- ),
- "soc-targets": [
- {
- "value": target_soc,
- "start": "2015-01-02T02:00:00+01:00",
- "duration": "PT0H",
- }
- ],
- },
- }
- with app.test_client() as client:
- # trigger storage scheduler
- trigger_schedule_response = client.post(
- url_for("SensorAPI:trigger_schedule", id=charging_station.id),
- json=message,
- )
- # check that the call is successful
- assert trigger_schedule_response.status_code == 200
- job_id = trigger_schedule_response.json["schedule"]
- # look for scheduling jobs in queue
- assert (
- len(app.queues["scheduling"]) == 1
- ) # only 1 schedule should be made for 1 asset
- job = app.queues["scheduling"].jobs[0]
- assert job.kwargs["asset_or_sensor"]["id"] == charging_station.id
- assert job.kwargs["start"] == parse_datetime(message["start"])
- assert job.id == job_id
- # process only the job that runs the storage scheduler (max_jobs=1)
- work_on_rq(
- app.queues["scheduling"],
- exc_handler=handle_scheduling_exception,
- max_jobs=1,
- )
- # check that the job is failing
- assert Job.fetch(
- job_id, connection=app.queues["scheduling"].connection
- ).is_failed
- # the callback creates the fallback job which is still pending
- assert len(app.queues["scheduling"]) == 1
- fallback_job_id = Job.fetch(
- job_id, connection=app.queues["scheduling"].connection
- ).meta.get("fallback_job_id")
- # check that the fallback_job_id is stored on the metadata of the original job
- assert app.queues["scheduling"].get_job_ids()[0] == fallback_job_id
- assert fallback_job_id != job_id
- get_schedule_response = client.get(
- url_for("SensorAPI:get_schedule", id=charging_station.id, uuid=job_id),
- )
- work_on_rq(
- app.queues["scheduling"],
- exc_handler=handle_scheduling_exception,
- max_jobs=1,
- )
- get_schedule_response = client.get(
- url_for("SensorAPI:get_schedule", id=charging_station.id, uuid=job_id),
- )
- assert get_schedule_response.status_code == 200
- schedule = get_schedule_response.json
- # check that the fallback schedule has the right status and start dates
- assert schedule["status"] == "PROCESSED"
- assert parse_datetime(schedule["start"]) == parse_datetime(start)
- assert schedule["scheduler_info"]["scheduler"] == "StorageFallbackScheduler"
- app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = False
|