test_scheduling_sequential.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. from unittest.mock import patch
  2. from flexmeasures.data.models.planning.exceptions import InfeasibleProblemException
  3. import pandas as pd
  4. from rq.job import Job
  5. from flexmeasures.data.services.scheduling import create_sequential_scheduling_job
  6. from flexmeasures.data.tests.utils import work_on_rq
  7. from flexmeasures.data.services.scheduling import handle_scheduling_exception
  8. from flexmeasures.data.models.time_series import Sensor
  9. def test_create_sequential_jobs(db, app, flex_description_sequential, smart_building):
  10. """Test sequential scheduling capabilities.
  11. It schedules the "Test Site", which contains two flexible devices and two inflexible devices.
  12. We verify that the pipeline creates the right number of jobs (two), corresponding to the inflexible devices,
  13. and an extra one which corresponds to the success callback job.
  14. """
  15. assets, sensors, soc_sensors = smart_building
  16. queue = app.queues["scheduling"]
  17. start = pd.Timestamp("2015-01-03").tz_localize("Europe/Amsterdam")
  18. end = pd.Timestamp("2015-01-04").tz_localize("Europe/Amsterdam")
  19. scheduler_specs = {
  20. "module": "flexmeasures.data.models.planning.storage",
  21. "class": "StorageScheduler",
  22. }
  23. flex_description_sequential["start"] = start
  24. flex_description_sequential["end"] = end
  25. create_sequential_scheduling_job(
  26. asset=assets["Test Site"],
  27. scheduler_specs=scheduler_specs,
  28. enqueue=True,
  29. **flex_description_sequential,
  30. )
  31. # There should be 3 jobs:
  32. # 2 jobs scheduling the 2 flexible devices in the flex-model, plus 1 'done job' to wrap things up
  33. queued_jobs = app.queues["scheduling"].jobs
  34. deferred_jobs = [
  35. Job.fetch(job_id, connection=queue.connection)
  36. for job_id in app.queues["scheduling"].deferred_job_registry.get_job_ids()
  37. ]
  38. # Sort deferred_jobs by their created_at attribute
  39. deferred_jobs = sorted(deferred_jobs, key=lambda job: job.created_at)
  40. assert (
  41. len(queued_jobs) == 1
  42. ), "Only the job for scheduling the first device sequentially should be queued."
  43. assert (
  44. len(deferred_jobs) == 2
  45. ), "The job for scheduling the second device, and the wrap-up job, should be deferred."
  46. # The EV is scheduled firstly.
  47. assert queued_jobs[0].kwargs["asset_or_sensor"] == {
  48. "id": sensors["Test EV"].id,
  49. "class": "Sensor",
  50. }
  51. # It uses the inflexible-device-sensors that are defined in the flex-context, exclusively.
  52. assert queued_jobs[0].kwargs["flex_context"]["inflexible-device-sensors"] == [
  53. sensors["Test Solar"].id,
  54. sensors["Test Building"].id,
  55. ]
  56. # The Battery is scheduled secondly (i.e. the first deferred job).
  57. assert deferred_jobs[0].kwargs["asset_or_sensor"] == {
  58. "id": sensors["Test Battery"].id,
  59. "class": "Sensor",
  60. }
  61. # In addition to the inflexible devices already present in the flex-context (PV and Building), the power sensor of the EV is included.
  62. assert deferred_jobs[0].kwargs["flex_context"]["inflexible-device-sensors"] == [
  63. sensors["Test Solar"].id,
  64. sensors["Test Building"].id,
  65. sensors["Test EV"].id,
  66. ]
  67. ev_power = sensors["Test EV"].search_beliefs()
  68. battery_power = sensors["Test Battery"].search_beliefs()
  69. # Sensors are empty before running the schedule
  70. assert ev_power.empty
  71. assert battery_power.empty
  72. # Work on jobs
  73. queued_jobs[0].perform()
  74. work_on_rq(queue, handle_scheduling_exception)
  75. # Check that the jobs completed successfully
  76. assert queued_jobs[0].get_status() == "finished"
  77. assert deferred_jobs[0].get_status() == "finished"
  78. assert deferred_jobs[1].get_status() == "finished"
  79. # check results
  80. ev_power = sensors["Test EV"].search_beliefs()
  81. assert ev_power.sources.unique()[0].model == "StorageScheduler"
  82. ev_power = ev_power.droplevel([1, 2, 3])
  83. battery_power = sensors["Test Battery"].search_beliefs()
  84. assert battery_power.sources.unique()[0].model == "StorageScheduler"
  85. battery_power = battery_power.droplevel([1, 2, 3])
  86. start_charging = start + pd.Timedelta(hours=8)
  87. end_charging = start + pd.Timedelta(hours=10) - sensors["Test EV"].event_resolution
  88. assert (ev_power.loc[start_charging:end_charging] == -0.005).values.all() # 5 kW
  89. assert (
  90. battery_power.loc[start_charging:end_charging] == 0.005
  91. ).values.all() # 5 kW
  92. # Get price data
  93. price_sensor_id = flex_description_sequential["flex_context"][
  94. "consumption-price-sensor"
  95. ]
  96. price_sensor = db.session.get(Sensor, price_sensor_id)
  97. prices = price_sensor.search_beliefs(
  98. event_starts_after=start - pd.Timedelta(hours=1), event_ends_before=end
  99. )
  100. prices = prices.droplevel([1, 2, 3])
  101. prices.index = prices.index.tz_convert("Europe/Amsterdam")
  102. # Resample prices to match power resolution
  103. prices = prices.resample("15min").ffill()
  104. # Calculate costs
  105. resolution = sensors["Test EV"].event_resolution.total_seconds() / 3600
  106. ev_costs = (-ev_power * prices * resolution).sum().item()
  107. battery_costs = (-battery_power * prices * resolution).sum().item()
  108. total_cost = ev_costs + battery_costs
  109. # Assert costs
  110. assert ev_costs == 2.2375, f"EV cost should be 2.2375 €, got {ev_costs} €"
  111. assert (
  112. battery_costs == -4.415
  113. ), f"Battery cost should be -4.415 €, got {battery_costs} €"
  114. assert total_cost == -2.1775, f"Total cost should be -2.1775 €, got {total_cost} €"
  115. def test_create_sequential_jobs_fallback(
  116. db, app, flex_description_sequential, smart_building
  117. ):
  118. """Test fallback scheduler in a chain of sequential scheduling (sub)jobs.
  119. Checks execution of a sequential scheduling job, where 1 of the subjobs is set up to fail and trigger its fallback.
  120. The deferred subjobs should still succeed after the fallback succeeds, even though the first subjob fails.
  121. """
  122. assets, sensors, _ = smart_building
  123. queue = app.queues["scheduling"]
  124. start = pd.Timestamp("2015-01-03").tz_localize("Europe/Amsterdam")
  125. end = pd.Timestamp("2015-01-04").tz_localize("Europe/Amsterdam")
  126. scheduler_specs = {
  127. "module": "flexmeasures.data.models.planning.storage",
  128. "class": "StorageScheduler",
  129. }
  130. flex_description_sequential["start"] = start
  131. flex_description_sequential["end"] = end
  132. storage_module = "flexmeasures.data.models.planning.storage"
  133. with patch(f"{storage_module}.StorageScheduler.persist_flex_model"):
  134. with patch(f"{storage_module}.StorageFallbackScheduler.persist_flex_model"):
  135. with patch(
  136. f"{storage_module}.StorageScheduler.compute",
  137. side_effect=iter([InfeasibleProblemException(), [], []]),
  138. ):
  139. create_sequential_scheduling_job(
  140. asset=assets["Test Site"],
  141. scheduler_specs=scheduler_specs,
  142. enqueue=True,
  143. force_new_job_creation=True, # otherwise the cache might kick in due to sub-jobs already created in other tests
  144. **flex_description_sequential,
  145. )
  146. # There should be 3 jobs:
  147. # 2 jobs scheduling the 2 flexible devices in the flex-model, plus 1 'done job' to wrap things up
  148. queued_jobs = app.queues["scheduling"].jobs
  149. deferred_jobs = [
  150. Job.fetch(job_id, connection=queue.connection)
  151. for job_id in app.queues[
  152. "scheduling"
  153. ].deferred_job_registry.get_job_ids()
  154. ]
  155. # Sort deferred_jobs by their created_at attribute
  156. deferred_jobs = sorted(deferred_jobs, key=lambda job: job.created_at)
  157. assert (
  158. len(queued_jobs) == 1
  159. ), "Only the job for scheduling the first device sequentially should be queued."
  160. assert (
  161. len(deferred_jobs) == 2
  162. ), "The job for scheduling the second device, and the wrap-up job, should be deferred."
  163. # Work on jobs
  164. work_on_rq(queue, exc_handler=handle_scheduling_exception)
  165. # Refresh jobs so that the fallback_job_id (which should be set by now) can be read
  166. for job in queued_jobs:
  167. job.refresh()
  168. finished_jobs = queue.finished_job_registry.get_job_ids()
  169. failed_jobs = queue.failed_job_registry.get_job_ids()
  170. # Original job failed
  171. assert queued_jobs[0].id in failed_jobs
  172. # The fallback job ran successfully
  173. assert queued_jobs[0].meta["fallback_job_id"] in finished_jobs
  174. # The deferred jobs ran successfully
  175. assert deferred_jobs[0].id in finished_jobs
  176. assert deferred_jobs[1].id in finished_jobs