123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- from __future__ import annotations
- from datetime import datetime, timedelta
- import copy
- import logging
- import pytz
- import pytest
- from rq.job import Job, JobStatus
- from sqlalchemy import select
- from flexmeasures.data.models.data_sources import DataSource
- from flexmeasures.data.models.generic_assets import GenericAsset
- from flexmeasures.data.models.time_series import Sensor
- from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
- from flexmeasures.data.services.scheduling import create_scheduling_job
- from flexmeasures.data.services.utils import hash_function_arguments, job_cache
- @pytest.mark.parametrize(
- "args_modified, kwargs_modified, equal",
- [
- (
- [1, 2, "1"],
- {
- "key1": "value1",
- "key2": "value2",
- "key3": 3,
- "key4": {"key1_nested": 1, "key2_nested": 2},
- "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
- },
- True,
- ),
- (
- [1, 2, "1"],
- {
- "key1": "value1",
- "key2": "value2",
- "key3": 3,
- "key4": {"key2_nested": 2, "key1_nested": 1},
- "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
- },
- True,
- ),
- (
- [1, 2, 1],
- {
- "key1": "value1",
- "key2": "value2",
- "key3": 3,
- "key4": {"key1_nested": 1, "key2_nested": 2},
- "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
- },
- False,
- ),
- (
- [1, 2, "1"],
- {
- "key1": "different",
- "key2": "value2",
- "key3": 3,
- "key4": {"key1_nested": 1, "key2_nested": 2},
- "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
- },
- False,
- ),
- (
- [1, 2, "1"],
- {
- "different": "value1",
- "key2": "value2",
- "key3": 3,
- "key4": {"key1_nested": 1, "key2_nested": 2},
- "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
- },
- False,
- ),
- (
- ["1", 1, 2],
- {
- "key1": "value1",
- "key2": "value2",
- "key3": 3,
- "key4": {"key1_nested": 1, "key2_nested": 2},
- "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
- },
- False,
- ),
- (
- [1, 2, "1"],
- {
- "key1": "value1",
- "key2": "value2",
- "key3": 3,
- "key4": {"key1_nested": "different", "key2_nested": 2},
- "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
- },
- False,
- ),
- (
- [1, 2, "1"],
- {
- "key1": "value1",
- "key2": "value2",
- "key3": 3,
- "key4": {"key1_nested": 1, "key2_nested": 2},
- "key5": '{"serialized_key2_nested": 2, "serialized_key1_nested": 1}',
- },
- False,
- ),
- ],
- )
- def test_hashing_simple(args_modified: list, kwargs_modified: dict, equal: bool):
- args = [1, 2, "1"]
- kwargs = {
- "key1": "value1",
- "key2": "value2",
- "key3": 3,
- "key4": {"key1_nested": 1, "key2_nested": 2},
- "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
- }
- hash_original = hash_function_arguments(args, kwargs)
- hash_modified = hash_function_arguments(args_modified, kwargs_modified)
- if equal:
- assert hash_original == hash_modified
- else:
- assert hash_original != hash_modified
- def test_hashing(db, app, add_charging_station_assets, setup_test_data):
- soc_at_start = 1
- target_soc = 5
- duration_until_target = timedelta(hours=2)
- # Here, we need to obtain the object through a db query, otherwise we run into session issues with deepcopy later on
- # charging_station = add_charging_station_assets["Test charging station"].sensors[0]
- charging_station = db.session.execute(
- select(Sensor)
- .filter(Sensor.name == "power")
- .join(GenericAsset, Sensor.generic_asset_id == GenericAsset.id)
- .filter(GenericAsset.id == Sensor.generic_asset_id)
- .filter(GenericAsset.name == "Test charging stations")
- ).scalar_one_or_none()
- tz = pytz.timezone("Europe/Amsterdam")
- start = tz.localize(datetime(2015, 1, 2))
- end = tz.localize(datetime(2015, 1, 3))
- target_datetime = start + duration_until_target
- resolution = timedelta(minutes=15)
- soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)]
- kwargs = dict(
- sensor=charging_station,
- start=start,
- end=end,
- belief_time=start,
- resolution=resolution,
- flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
- )
- args = []
- hash = hash_function_arguments(args, kwargs)
- print("RIGHT HASH: ", hash)
- # checks that hashes are consistent between different runtime calls
- # this test needs to be updated in case of a version upgrade
- assert hash == "oAZ8tzzq50zl3I+7oFeabrj1QeH709mZdXWbpkn0krA="
- kwargs2 = copy.deepcopy(kwargs)
- args2 = copy.deepcopy(args)
- # checks that hashes are consistent within the same runtime calls
- hash2 = hash_function_arguments(args2, kwargs2)
- assert hash2 == hash
- # checks that different arguments yield different hashes
- kwargs2["resolution"] = timedelta(minutes=12)
- hash3 = hash_function_arguments(args2, kwargs2)
- assert hash != hash3
- def test_scheduling_multiple_triggers(
- caplog, db, app, add_charging_station_assets, setup_test_data
- ):
- caplog.set_level(
- logging.INFO
- ) # setting the logging level of the log capture fixture
- duration_until_target = timedelta(hours=2)
- charging_station = add_charging_station_assets["Test charging station"].sensors[0]
- tz = pytz.timezone("Europe/Amsterdam")
- start = tz.localize(datetime(2015, 1, 2))
- end = tz.localize(datetime(2015, 1, 3))
- resolution = timedelta(minutes=15)
- target_datetime = start + duration_until_target
- soc_start = 2.5
- assert (
- db.session.execute(
- select(DataSource).filter_by(name="FlexMeasures", type="scheduling script")
- ).scalar_one_or_none()
- is None
- ) # Make sure the scheduler data source isn't there
- # clear logs
- caplog.clear()
- jobs = []
- # create jobs
- for target_soc in [1, 1, 4]:
- soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)]
- job = create_scheduling_job(
- asset_or_sensor=charging_station,
- start=start,
- end=end,
- belief_time=start,
- resolution=resolution,
- flex_model={"soc-at-start": soc_start, "soc-targets": soc_targets},
- enqueue=False,
- )
- # enqueue & run job
- app.queues["scheduling"].enqueue_job(job)
- jobs.append(job)
- work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
- job1, job2, job3 = jobs
- print(job1.id, job2.id, job3.id)
- # checking that jobs 1 & 2 they have the same job id
- assert job1.id == job2.id
- # checking that job3 has different id
- assert job3.id != job1.id
- def failing_function(*args, **kwargs):
- raise Exception()
- def test_allow_trigger_failed_jobs(
- caplog, db, app, add_charging_station_assets, setup_test_data
- ):
- @job_cache("scheduling")
- def create_failing_job(
- arg1: int,
- kwarg1: int | None = None,
- kwarg2: int | None = None,
- ) -> Job:
- """
- This function creates and enqueues a failing job.
- """
- job = Job.create(
- failing_function,
- kwargs=dict(kwarg1=kwarg1, kwarg2=kwarg2),
- connection=app.queues["scheduling"].connection,
- )
- app.queues["scheduling"].enqueue_job(job)
- return job
- job1 = create_failing_job(1, 1, 1) # this job will fail when worked on
- work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
- assert job1.get_status() == JobStatus.FAILED # check that the job fails
- job2 = create_failing_job(1, 1, 1)
- work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
- assert job1.id == job2.id
- def successful_function(*args, **kwargs):
- pass
- def test_force_new_job_creation(db, app, add_charging_station_assets, setup_test_data):
- @job_cache("scheduling")
- def create_successful_job(
- arg1: int,
- kwarg1: int | None = None,
- kwarg2: int | None = None,
- force_new_job_creation=False,
- ) -> Job:
- """
- This function creates and enqueues a successful job.
- """
- job = Job.create(
- successful_function,
- kwargs=dict(kwarg1=kwarg1, kwarg2=kwarg2),
- connection=app.queues["scheduling"].connection,
- )
- app.queues["scheduling"].enqueue_job(job)
- return job
- job1 = create_successful_job(1, 1, 1, force_new_job_creation=True)
- work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
- job2 = create_successful_job(1, 1, 1, force_new_job_creation=False)
- work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
- # check that `force_new_job_creation` doesn't affect the hash
- assert job1.id == job2.id # caching job
- job3 = create_successful_job(1, 1, 1, force_new_job_creation=True)
- work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
- # check that `force_new_job_creation=True` actually triggers a new job creation
- assert job2.id != job3.id
|