12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- from __future__ import annotations
- from datetime import datetime, timedelta
- import pytz
- from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
- from flexmeasures.data.services.scheduling import create_scheduling_job
- from flexmeasures.data.models.planning import Scheduler
- from flexmeasures.data.services.scheduling import load_custom_scheduler
- class FailingScheduler(Scheduler):
- __author__ = "Test Organization"
- __version__ = "1"
- def compute(self):
- """
- This is a schedule that fails
- """
- raise Exception()
- def deserialize_config(self):
- """Do not care about any config sent in."""
- self.config_deserialized = True
- def test_requeue_failing_job(
- fresh_db, app, add_charging_station_assets_fresh_db, setup_fresh_test_data
- ):
- """
- Testing that failing jobs are requeued.
- This test is called with a fresh db so that previous schedules don't interfere.
- """
- tz = pytz.timezone("Europe/Amsterdam")
- start = tz.localize(datetime(2016, 1, 2))
- end = tz.localize(datetime(2016, 1, 3))
- resolution = timedelta(minutes=15)
- charging_station = add_charging_station_assets_fresh_db[
- "Test charging station"
- ].sensors[0]
- custom_scheduler = {
- "module": "flexmeasures.data.tests.test_scheduling_repeated_jobs_fresh_db",
- "class": "FailingScheduler",
- }
- # test if we can fetch the right scheduler class
- scheduler = load_custom_scheduler(custom_scheduler)(
- charging_station, start, end, resolution
- )
- assert isinstance(scheduler, FailingScheduler)
- # assigning scheduler to the sensor "Test charging station"
- charging_station.attributes["custom-scheduler"] = custom_scheduler
- # clean queue
- app.queues["scheduling"].empty()
- # calling the job twice, with the requeue argument to true
- jobs = []
- for _ in range(2):
- job = create_scheduling_job(
- asset_or_sensor=charging_station,
- start=start,
- end=end,
- resolution=resolution,
- enqueue=True,
- requeue=True,
- )
- work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
- jobs.append(job)
- job1, job2 = jobs
- print(job1.failed_job_registry, len(job1.failed_job_registry))
- assert job1.id == job2.id # equal job IDs
- assert job1.is_failed
- assert job2.is_failed
- print("JOB2: ", job2.enqueued_at)
- print("JOB1: ", job1.enqueued_at)
- # check if job2 has actually been requeued
- assert job1.enqueued_at < job2.enqueued_at
|