test_sensor_schedules.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. from flask import url_for
  2. import pytest
  3. from isodate import parse_datetime
  4. from rq.job import Job
  5. from sqlalchemy import select
  6. from flexmeasures.api.common.responses import unknown_schedule, unrecognized_event
  7. from flexmeasures.api.tests.utils import check_deprecation
  8. from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule
  9. from flexmeasures.data.models.data_sources import DataSource
  10. from flexmeasures.data.tests.utils import work_on_rq
  11. from flexmeasures.data.services.scheduling import handle_scheduling_exception
  12. from flexmeasures.tests.utils import get_test_sensor
  13. from flexmeasures.utils.unit_utils import ur
  14. @pytest.mark.parametrize(
  15. "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
  16. )
  17. def test_get_schedule_wrong_job_id(
  18. app,
  19. add_market_prices,
  20. add_battery_assets,
  21. battery_soc_sensor,
  22. add_charging_station_assets,
  23. keep_scheduling_queue_empty,
  24. requesting_user,
  25. ):
  26. wrong_job_id = 9999
  27. sensor = add_battery_assets["Test battery"].sensors[0]
  28. with app.test_client() as client:
  29. get_schedule_response = client.get(
  30. url_for("SensorAPI:get_schedule", id=sensor.id, uuid=wrong_job_id),
  31. )
  32. print("Server responded with:\n%s" % get_schedule_response.json)
  33. check_deprecation(get_schedule_response, deprecation=None, sunset=None)
  34. assert get_schedule_response.status_code == 400
  35. assert get_schedule_response.json == unrecognized_event(wrong_job_id, "job")[0]
  36. @pytest.mark.parametrize(
  37. "message, field, sent_value, err_msg",
  38. [
  39. (message_for_trigger_schedule(), "soc-minn", 3, "Unknown field"),
  40. (
  41. message_for_trigger_schedule(),
  42. "soc-min",
  43. "not-a-float",
  44. "Cannot convert value 'not-a-float' to a valid quantity.",
  45. ),
  46. (message_for_trigger_schedule(), "soc-unit", "MWH", "Must be one of"),
  47. # todo: add back test in case we stop grandfathering ignoring too-far-into-the-future targets, or amend otherwise
  48. # (
  49. # message_for_trigger_schedule(
  50. # with_targets=True, too_far_into_the_future_targets=True
  51. # ),
  52. # "soc-targets",
  53. # None,
  54. # "Target datetime exceeds",
  55. # ),
  56. ],
  57. )
  58. @pytest.mark.parametrize(
  59. "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
  60. )
  61. def test_trigger_schedule_with_invalid_flexmodel(
  62. app,
  63. add_battery_assets,
  64. keep_scheduling_queue_empty,
  65. message,
  66. field,
  67. sent_value,
  68. err_msg,
  69. requesting_user,
  70. ):
  71. sensor = add_battery_assets["Test battery"].sensors[0]
  72. with app.test_client() as client:
  73. if sent_value: # if None, field is a term we expect in the response, not more
  74. message["flex-model"][field] = sent_value
  75. trigger_schedule_response = client.post(
  76. url_for("SensorAPI:trigger_schedule", id=sensor.id),
  77. json=message,
  78. )
  79. print("Server responded with:\n%s" % trigger_schedule_response.json)
  80. check_deprecation(trigger_schedule_response, deprecation=None, sunset=None)
  81. assert trigger_schedule_response.status_code == 422
  82. assert field in trigger_schedule_response.json["message"]["json"]
  83. if isinstance(trigger_schedule_response.json["message"]["json"], str):
  84. # ValueError
  85. assert err_msg in trigger_schedule_response.json["message"]["json"]
  86. else:
  87. # ValidationError (marshmallow)
  88. assert (
  89. err_msg in trigger_schedule_response.json["message"]["json"][field][0]
  90. )
  91. @pytest.mark.parametrize("message", [message_for_trigger_schedule(unknown_prices=True)])
  92. @pytest.mark.parametrize(
  93. "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
  94. )
  95. def test_trigger_and_get_schedule_with_unknown_prices(
  96. app,
  97. client,
  98. add_market_prices,
  99. add_battery_assets,
  100. battery_soc_sensor,
  101. add_charging_station_assets,
  102. keep_scheduling_queue_empty,
  103. message,
  104. requesting_user,
  105. db,
  106. ):
  107. sensor = add_battery_assets["Test battery"].sensors[0]
  108. # trigger a schedule through the /sensors/<id>/schedules/trigger [POST] api endpoint
  109. trigger_schedule_response = client.post(
  110. url_for("SensorAPI:trigger_schedule", id=sensor.id),
  111. json=message,
  112. )
  113. print("Server responded with:\n%s" % trigger_schedule_response.json)
  114. check_deprecation(trigger_schedule_response, deprecation=None, sunset=None)
  115. assert trigger_schedule_response.status_code == 200
  116. job_id = trigger_schedule_response.json["schedule"]
  117. # look for scheduling jobs in queue
  118. assert (
  119. len(app.queues["scheduling"]) == 1
  120. ) # only 1 schedule should be made for 1 asset
  121. job = app.queues["scheduling"].jobs[0]
  122. assert job.kwargs["asset_or_sensor"]["id"] == sensor.id
  123. assert job.kwargs["start"] == parse_datetime(message["start"])
  124. assert job.id == job_id
  125. # process the scheduling queue
  126. work_on_rq(app.queues["scheduling"], exc_handler=handle_scheduling_exception)
  127. assert (
  128. Job.fetch(job_id, connection=app.queues["scheduling"].connection).is_failed
  129. is True
  130. )
  131. # check results are not in the database
  132. scheduler_source = db.session.execute(
  133. select(DataSource).filter_by(name="Seita", type="scheduler")
  134. ).scalar_one_or_none()
  135. assert (
  136. scheduler_source is None
  137. ) # Make sure the scheduler data source is still not there
  138. # try to retrieve the schedule through the /sensors/<id>/schedules/<job_id> [GET] api endpoint
  139. get_schedule_response = client.get(
  140. url_for("SensorAPI:get_schedule", id=sensor.id, uuid=job_id),
  141. )
  142. print("Server responded with:\n%s" % get_schedule_response.json)
  143. check_deprecation(get_schedule_response, deprecation=None, sunset=None)
  144. assert get_schedule_response.status_code == 400
  145. assert get_schedule_response.json["status"] == unknown_schedule()[0]["status"]
  146. assert "prices unknown" in get_schedule_response.json["message"].lower()
  147. @pytest.mark.parametrize(
  148. "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
  149. )
  150. def test_get_schedule_fallback(
  151. app,
  152. add_battery_assets,
  153. add_market_prices,
  154. battery_soc_sensor,
  155. add_charging_station_assets,
  156. keep_scheduling_queue_empty,
  157. requesting_user,
  158. db,
  159. ):
  160. """
  161. Test if the fallback job is created after a failing StorageScheduler call. This test
  162. is based on flexmeasures/data/models/planning/tests/test_solver.py
  163. """
  164. assert app.config["FLEXMEASURES_FALLBACK_REDIRECT"] is False
  165. app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = True
  166. target_soc = 9
  167. charging_station_name = "Test charging station"
  168. start = "2015-01-02T00:00:00+01:00"
  169. epex_da = get_test_sensor(db)
  170. charging_station = add_charging_station_assets[charging_station_name].sensors[0]
  171. capacity = charging_station.get_attribute(
  172. "capacity_in_mw",
  173. ur.Quantity(charging_station.get_attribute("site-power-capacity"))
  174. .to("MW")
  175. .magnitude,
  176. )
  177. assert capacity == 2
  178. assert charging_station.get_attribute("consumption-price") == {"sensor": epex_da.id}
  179. # check that no Fallback schedule has been saved before
  180. models = [
  181. source.model for source in charging_station.search_beliefs().sources.unique()
  182. ]
  183. assert "StorageFallbackScheduler" not in models
  184. # create a scenario that yields an infeasible problem (unreachable target SOC at 2am)
  185. message = {
  186. "start": start,
  187. "duration": "PT24H",
  188. "flex-model": {
  189. "soc-at-start": 10,
  190. "soc-min": charging_station.get_attribute("min_soc_in_mwh", 0),
  191. "soc-max": charging_station.get_attribute("max-soc-in-mwh", target_soc),
  192. "roundtrip-efficiency": charging_station.get_attribute(
  193. "roundtrip-efficiency", 1
  194. ),
  195. "storage-efficiency": charging_station.get_attribute(
  196. "storage-efficiency", 1
  197. ),
  198. "soc-targets": [
  199. {"value": target_soc, "datetime": "2015-01-02T02:00:00+01:00"}
  200. ],
  201. },
  202. }
  203. with app.test_client() as client:
  204. # trigger storage scheduler
  205. trigger_schedule_response = client.post(
  206. url_for("SensorAPI:trigger_schedule", id=charging_station.id),
  207. json=message,
  208. )
  209. # check that the call is successful
  210. assert trigger_schedule_response.status_code == 200
  211. job_id = trigger_schedule_response.json["schedule"]
  212. # look for scheduling jobs in queue
  213. assert (
  214. len(app.queues["scheduling"]) == 1
  215. ) # only 1 schedule should be made for 1 asset
  216. job = app.queues["scheduling"].jobs[0]
  217. assert job.kwargs["asset_or_sensor"]["id"] == charging_station.id
  218. assert job.kwargs["start"] == parse_datetime(message["start"])
  219. assert job.id == job_id
  220. # process only the job that runs the storage scheduler (max_jobs=1)
  221. work_on_rq(
  222. app.queues["scheduling"],
  223. exc_handler=handle_scheduling_exception,
  224. max_jobs=1,
  225. )
  226. # check that the job is failing
  227. assert Job.fetch(
  228. job_id, connection=app.queues["scheduling"].connection
  229. ).is_failed
  230. # the callback creates the fallback job which is still pending
  231. assert len(app.queues["scheduling"]) == 1
  232. fallback_job_id = Job.fetch(
  233. job_id, connection=app.queues["scheduling"].connection
  234. ).meta.get("fallback_job_id")
  235. # check that the fallback_job_id is stored on the metadata of the original job
  236. assert app.queues["scheduling"].get_job_ids()[0] == fallback_job_id
  237. assert fallback_job_id != job_id
  238. get_schedule_response = client.get(
  239. url_for("SensorAPI:get_schedule", id=charging_station.id, uuid=job_id),
  240. )
  241. # requesting the original job redirects to the fallback job
  242. assert (
  243. get_schedule_response.status_code == 303
  244. ) # Status code for redirect ("See other")
  245. assert (
  246. get_schedule_response.json["message"]
  247. == "Scheduling job failed with InfeasibleProblemException: . StorageScheduler was used."
  248. )
  249. assert get_schedule_response.json["status"] == "UNKNOWN_SCHEDULE"
  250. assert get_schedule_response.json["result"] == "Rejected"
  251. # check that the redirection location points to the fallback job
  252. assert (
  253. get_schedule_response.headers["location"]
  254. == f"http://localhost/api/v3_0/sensors/{charging_station.id}/schedules/{fallback_job_id}"
  255. )
  256. # run the fallback job
  257. work_on_rq(
  258. app.queues["scheduling"],
  259. exc_handler=handle_scheduling_exception,
  260. max_jobs=1,
  261. )
  262. # check that the queue is empty
  263. assert len(app.queues["scheduling"]) == 0
  264. # get the fallback schedule
  265. fallback_schedule = client.get(
  266. get_schedule_response.headers["location"],
  267. json={"duration": "PT24H"},
  268. ).json
  269. # check that the fallback schedule has the right status and start dates
  270. assert fallback_schedule["status"] == "PROCESSED"
  271. assert parse_datetime(fallback_schedule["start"]) == parse_datetime(start)
  272. models = [
  273. source.model
  274. for source in charging_station.search_beliefs().sources.unique()
  275. ]
  276. assert "StorageFallbackScheduler" in models
  277. app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = False
  278. @pytest.mark.parametrize(
  279. "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
  280. )
  281. def test_get_schedule_fallback_not_redirect(
  282. app,
  283. add_battery_assets,
  284. add_market_prices,
  285. battery_soc_sensor,
  286. add_charging_station_assets,
  287. keep_scheduling_queue_empty,
  288. requesting_user,
  289. db,
  290. ):
  291. """
  292. Test if the fallback scheduler is returned directly after a failing StorageScheduler call. This test
  293. is based on flexmeasures/data/models/planning/tests/test_solver.py
  294. """
  295. app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = False
  296. target_soc = 9
  297. charging_station_name = "Test charging station"
  298. start = "2015-01-02T00:00:00+01:00"
  299. epex_da = get_test_sensor(db)
  300. charging_station = add_charging_station_assets[charging_station_name].sensors[0]
  301. capacity = charging_station.get_attribute(
  302. "capacity_in_mw",
  303. ur.Quantity(charging_station.get_attribute("site-power-capacity"))
  304. .to("MW")
  305. .magnitude,
  306. )
  307. assert capacity == 2
  308. assert charging_station.get_attribute("consumption-price") == {"sensor": epex_da.id}
  309. # create a scenario that yields an infeasible problem (unreachable target SOC at 2am)
  310. message = {
  311. "start": start,
  312. "duration": "PT24H",
  313. "flex-model": {
  314. "soc-at-start": 10,
  315. "soc-min": charging_station.get_attribute("min_soc_in_mwh", 0),
  316. "soc-max": charging_station.get_attribute("max-soc-in-mwh", target_soc),
  317. "roundtrip-efficiency": charging_station.get_attribute(
  318. "roundtrip-efficiency", 1
  319. ),
  320. "storage-efficiency": charging_station.get_attribute(
  321. "storage-efficiency", 1
  322. ),
  323. "soc-targets": [
  324. {
  325. "value": target_soc,
  326. "start": "2015-01-02T02:00:00+01:00",
  327. "duration": "PT0H",
  328. }
  329. ],
  330. },
  331. }
  332. with app.test_client() as client:
  333. # trigger storage scheduler
  334. trigger_schedule_response = client.post(
  335. url_for("SensorAPI:trigger_schedule", id=charging_station.id),
  336. json=message,
  337. )
  338. # check that the call is successful
  339. assert trigger_schedule_response.status_code == 200
  340. job_id = trigger_schedule_response.json["schedule"]
  341. # look for scheduling jobs in queue
  342. assert (
  343. len(app.queues["scheduling"]) == 1
  344. ) # only 1 schedule should be made for 1 asset
  345. job = app.queues["scheduling"].jobs[0]
  346. assert job.kwargs["asset_or_sensor"]["id"] == charging_station.id
  347. assert job.kwargs["start"] == parse_datetime(message["start"])
  348. assert job.id == job_id
  349. # process only the job that runs the storage scheduler (max_jobs=1)
  350. work_on_rq(
  351. app.queues["scheduling"],
  352. exc_handler=handle_scheduling_exception,
  353. max_jobs=1,
  354. )
  355. # check that the job is failing
  356. assert Job.fetch(
  357. job_id, connection=app.queues["scheduling"].connection
  358. ).is_failed
  359. # the callback creates the fallback job which is still pending
  360. assert len(app.queues["scheduling"]) == 1
  361. fallback_job_id = Job.fetch(
  362. job_id, connection=app.queues["scheduling"].connection
  363. ).meta.get("fallback_job_id")
  364. # check that the fallback_job_id is stored on the metadata of the original job
  365. assert app.queues["scheduling"].get_job_ids()[0] == fallback_job_id
  366. assert fallback_job_id != job_id
  367. get_schedule_response = client.get(
  368. url_for("SensorAPI:get_schedule", id=charging_station.id, uuid=job_id),
  369. )
  370. work_on_rq(
  371. app.queues["scheduling"],
  372. exc_handler=handle_scheduling_exception,
  373. max_jobs=1,
  374. )
  375. get_schedule_response = client.get(
  376. url_for("SensorAPI:get_schedule", id=charging_station.id, uuid=job_id),
  377. )
  378. assert get_schedule_response.status_code == 200
  379. schedule = get_schedule_response.json
  380. # check that the fallback schedule has the right status and start dates
  381. assert schedule["status"] == "PROCESSED"
  382. assert parse_datetime(schedule["start"]) == parse_datetime(start)
  383. assert schedule["scheduler_info"]["scheduler"] == "StorageFallbackScheduler"
  384. app.config["FLEXMEASURES_FALLBACK_REDIRECT"] = False