test_scheduling_repeated_jobs_fresh_db.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from __future__ import annotations
  2. from datetime import datetime, timedelta
  3. import pytz
  4. from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
  5. from flexmeasures.data.services.scheduling import create_scheduling_job
  6. from flexmeasures.data.models.planning import Scheduler
  7. from flexmeasures.data.services.scheduling import load_custom_scheduler
  8. class FailingScheduler(Scheduler):
  9. __author__ = "Test Organization"
  10. __version__ = "1"
  11. def compute(self):
  12. """
  13. This is a schedule that fails
  14. """
  15. raise Exception()
  16. def deserialize_config(self):
  17. """Do not care about any config sent in."""
  18. self.config_deserialized = True
  19. def test_requeue_failing_job(
  20. fresh_db, app, add_charging_station_assets_fresh_db, setup_fresh_test_data
  21. ):
  22. """
  23. Testing that failing jobs are requeued.
  24. This test is called with a fresh db so that previous schedules don't interfere.
  25. """
  26. tz = pytz.timezone("Europe/Amsterdam")
  27. start = tz.localize(datetime(2016, 1, 2))
  28. end = tz.localize(datetime(2016, 1, 3))
  29. resolution = timedelta(minutes=15)
  30. charging_station = add_charging_station_assets_fresh_db[
  31. "Test charging station"
  32. ].sensors[0]
  33. custom_scheduler = {
  34. "module": "flexmeasures.data.tests.test_scheduling_repeated_jobs_fresh_db",
  35. "class": "FailingScheduler",
  36. }
  37. # test if we can fetch the right scheduler class
  38. scheduler = load_custom_scheduler(custom_scheduler)(
  39. charging_station, start, end, resolution
  40. )
  41. assert isinstance(scheduler, FailingScheduler)
  42. # assigning scheduler to the sensor "Test charging station"
  43. charging_station.attributes["custom-scheduler"] = custom_scheduler
  44. # clean queue
  45. app.queues["scheduling"].empty()
  46. # calling the job twice, with the requeue argument to true
  47. jobs = []
  48. for _ in range(2):
  49. job = create_scheduling_job(
  50. asset_or_sensor=charging_station,
  51. start=start,
  52. end=end,
  53. resolution=resolution,
  54. enqueue=True,
  55. requeue=True,
  56. )
  57. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  58. jobs.append(job)
  59. job1, job2 = jobs
  60. print(job1.failed_job_registry, len(job1.failed_job_registry))
  61. assert job1.id == job2.id # equal job IDs
  62. assert job1.is_failed
  63. assert job2.is_failed
  64. print("JOB2: ", job2.enqueued_at)
  65. print("JOB1: ", job1.enqueued_at)
  66. # check if job2 has actually been requeued
  67. assert job1.enqueued_at < job2.enqueued_at