test_scheduling_repeated_jobs.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. from __future__ import annotations
  2. from datetime import datetime, timedelta
  3. import copy
  4. import logging
  5. import pytz
  6. import pytest
  7. from rq.job import Job, JobStatus
  8. from sqlalchemy import select
  9. from flexmeasures.data.models.data_sources import DataSource
  10. from flexmeasures.data.models.generic_assets import GenericAsset
  11. from flexmeasures.data.models.time_series import Sensor
  12. from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
  13. from flexmeasures.data.services.scheduling import create_scheduling_job
  14. from flexmeasures.data.services.utils import hash_function_arguments, job_cache
  15. @pytest.mark.parametrize(
  16. "args_modified, kwargs_modified, equal",
  17. [
  18. (
  19. [1, 2, "1"],
  20. {
  21. "key1": "value1",
  22. "key2": "value2",
  23. "key3": 3,
  24. "key4": {"key1_nested": 1, "key2_nested": 2},
  25. "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
  26. },
  27. True,
  28. ),
  29. (
  30. [1, 2, "1"],
  31. {
  32. "key1": "value1",
  33. "key2": "value2",
  34. "key3": 3,
  35. "key4": {"key2_nested": 2, "key1_nested": 1},
  36. "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
  37. },
  38. True,
  39. ),
  40. (
  41. [1, 2, 1],
  42. {
  43. "key1": "value1",
  44. "key2": "value2",
  45. "key3": 3,
  46. "key4": {"key1_nested": 1, "key2_nested": 2},
  47. "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
  48. },
  49. False,
  50. ),
  51. (
  52. [1, 2, "1"],
  53. {
  54. "key1": "different",
  55. "key2": "value2",
  56. "key3": 3,
  57. "key4": {"key1_nested": 1, "key2_nested": 2},
  58. "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
  59. },
  60. False,
  61. ),
  62. (
  63. [1, 2, "1"],
  64. {
  65. "different": "value1",
  66. "key2": "value2",
  67. "key3": 3,
  68. "key4": {"key1_nested": 1, "key2_nested": 2},
  69. "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
  70. },
  71. False,
  72. ),
  73. (
  74. ["1", 1, 2],
  75. {
  76. "key1": "value1",
  77. "key2": "value2",
  78. "key3": 3,
  79. "key4": {"key1_nested": 1, "key2_nested": 2},
  80. "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
  81. },
  82. False,
  83. ),
  84. (
  85. [1, 2, "1"],
  86. {
  87. "key1": "value1",
  88. "key2": "value2",
  89. "key3": 3,
  90. "key4": {"key1_nested": "different", "key2_nested": 2},
  91. "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
  92. },
  93. False,
  94. ),
  95. (
  96. [1, 2, "1"],
  97. {
  98. "key1": "value1",
  99. "key2": "value2",
  100. "key3": 3,
  101. "key4": {"key1_nested": 1, "key2_nested": 2},
  102. "key5": '{"serialized_key2_nested": 2, "serialized_key1_nested": 1}',
  103. },
  104. False,
  105. ),
  106. ],
  107. )
  108. def test_hashing_simple(args_modified: list, kwargs_modified: dict, equal: bool):
  109. args = [1, 2, "1"]
  110. kwargs = {
  111. "key1": "value1",
  112. "key2": "value2",
  113. "key3": 3,
  114. "key4": {"key1_nested": 1, "key2_nested": 2},
  115. "key5": '{"serialized_key1_nested": 1, "serialized_key2_nested": 2}',
  116. }
  117. hash_original = hash_function_arguments(args, kwargs)
  118. hash_modified = hash_function_arguments(args_modified, kwargs_modified)
  119. if equal:
  120. assert hash_original == hash_modified
  121. else:
  122. assert hash_original != hash_modified
  123. def test_hashing(db, app, add_charging_station_assets, setup_test_data):
  124. soc_at_start = 1
  125. target_soc = 5
  126. duration_until_target = timedelta(hours=2)
  127. # Here, we need to obtain the object through a db query, otherwise we run into session issues with deepcopy later on
  128. # charging_station = add_charging_station_assets["Test charging station"].sensors[0]
  129. charging_station = db.session.execute(
  130. select(Sensor)
  131. .filter(Sensor.name == "power")
  132. .join(GenericAsset, Sensor.generic_asset_id == GenericAsset.id)
  133. .filter(GenericAsset.id == Sensor.generic_asset_id)
  134. .filter(GenericAsset.name == "Test charging stations")
  135. ).scalar_one_or_none()
  136. tz = pytz.timezone("Europe/Amsterdam")
  137. start = tz.localize(datetime(2015, 1, 2))
  138. end = tz.localize(datetime(2015, 1, 3))
  139. target_datetime = start + duration_until_target
  140. resolution = timedelta(minutes=15)
  141. soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)]
  142. kwargs = dict(
  143. sensor=charging_station,
  144. start=start,
  145. end=end,
  146. belief_time=start,
  147. resolution=resolution,
  148. flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
  149. )
  150. args = []
  151. hash = hash_function_arguments(args, kwargs)
  152. print("RIGHT HASH: ", hash)
  153. # checks that hashes are consistent between different runtime calls
  154. # this test needs to be updated in case of a version upgrade
  155. assert hash == "oAZ8tzzq50zl3I+7oFeabrj1QeH709mZdXWbpkn0krA="
  156. kwargs2 = copy.deepcopy(kwargs)
  157. args2 = copy.deepcopy(args)
  158. # checks that hashes are consistent within the same runtime calls
  159. hash2 = hash_function_arguments(args2, kwargs2)
  160. assert hash2 == hash
  161. # checks that different arguments yield different hashes
  162. kwargs2["resolution"] = timedelta(minutes=12)
  163. hash3 = hash_function_arguments(args2, kwargs2)
  164. assert hash != hash3
  165. def test_scheduling_multiple_triggers(
  166. caplog, db, app, add_charging_station_assets, setup_test_data
  167. ):
  168. caplog.set_level(
  169. logging.INFO
  170. ) # setting the logging level of the log capture fixture
  171. duration_until_target = timedelta(hours=2)
  172. charging_station = add_charging_station_assets["Test charging station"].sensors[0]
  173. tz = pytz.timezone("Europe/Amsterdam")
  174. start = tz.localize(datetime(2015, 1, 2))
  175. end = tz.localize(datetime(2015, 1, 3))
  176. resolution = timedelta(minutes=15)
  177. target_datetime = start + duration_until_target
  178. soc_start = 2.5
  179. assert (
  180. db.session.execute(
  181. select(DataSource).filter_by(name="FlexMeasures", type="scheduling script")
  182. ).scalar_one_or_none()
  183. is None
  184. ) # Make sure the scheduler data source isn't there
  185. # clear logs
  186. caplog.clear()
  187. jobs = []
  188. # create jobs
  189. for target_soc in [1, 1, 4]:
  190. soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)]
  191. job = create_scheduling_job(
  192. asset_or_sensor=charging_station,
  193. start=start,
  194. end=end,
  195. belief_time=start,
  196. resolution=resolution,
  197. flex_model={"soc-at-start": soc_start, "soc-targets": soc_targets},
  198. enqueue=False,
  199. )
  200. # enqueue & run job
  201. app.queues["scheduling"].enqueue_job(job)
  202. jobs.append(job)
  203. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  204. job1, job2, job3 = jobs
  205. print(job1.id, job2.id, job3.id)
  206. # checking that jobs 1 & 2 they have the same job id
  207. assert job1.id == job2.id
  208. # checking that job3 has different id
  209. assert job3.id != job1.id
  210. def failing_function(*args, **kwargs):
  211. raise Exception()
  212. def test_allow_trigger_failed_jobs(
  213. caplog, db, app, add_charging_station_assets, setup_test_data
  214. ):
  215. @job_cache("scheduling")
  216. def create_failing_job(
  217. arg1: int,
  218. kwarg1: int | None = None,
  219. kwarg2: int | None = None,
  220. ) -> Job:
  221. """
  222. This function creates and enqueues a failing job.
  223. """
  224. job = Job.create(
  225. failing_function,
  226. kwargs=dict(kwarg1=kwarg1, kwarg2=kwarg2),
  227. connection=app.queues["scheduling"].connection,
  228. )
  229. app.queues["scheduling"].enqueue_job(job)
  230. return job
  231. job1 = create_failing_job(1, 1, 1) # this job will fail when worked on
  232. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  233. assert job1.get_status() == JobStatus.FAILED # check that the job fails
  234. job2 = create_failing_job(1, 1, 1)
  235. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  236. assert job1.id == job2.id
  237. def successful_function(*args, **kwargs):
  238. pass
  239. def test_force_new_job_creation(db, app, add_charging_station_assets, setup_test_data):
  240. @job_cache("scheduling")
  241. def create_successful_job(
  242. arg1: int,
  243. kwarg1: int | None = None,
  244. kwarg2: int | None = None,
  245. force_new_job_creation=False,
  246. ) -> Job:
  247. """
  248. This function creates and enqueues a successful job.
  249. """
  250. job = Job.create(
  251. successful_function,
  252. kwargs=dict(kwarg1=kwarg1, kwarg2=kwarg2),
  253. connection=app.queues["scheduling"].connection,
  254. )
  255. app.queues["scheduling"].enqueue_job(job)
  256. return job
  257. job1 = create_successful_job(1, 1, 1, force_new_job_creation=True)
  258. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  259. job2 = create_successful_job(1, 1, 1, force_new_job_creation=False)
  260. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  261. # check that `force_new_job_creation` doesn't affect the hash
  262. assert job1.id == job2.id # caching job
  263. job3 = create_successful_job(1, 1, 1, force_new_job_creation=True)
  264. work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
  265. # check that `force_new_job_creation=True` actually triggers a new job creation
  266. assert job2.id != job3.id