test_sensor_data_schema.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. from datetime import timedelta, datetime
  2. import json
  3. import pytest
  4. import pytz
  5. from marshmallow import ValidationError
  6. import pandas as pd
  7. from unittest import mock
  8. from flexmeasures.api.common.schemas.sensor_data import (
  9. SingleValueField,
  10. PostSensorDataSchema,
  11. GetSensorDataSchema,
  12. )
  13. from flexmeasures.data.models.time_series import Sensor
  14. from flexmeasures.data.services.forecasting import create_forecasting_jobs
  15. from flexmeasures.data.services.scheduling import create_scheduling_job
  16. from flexmeasures.data.services.sensors import (
  17. get_stalenesses,
  18. get_statuses,
  19. build_asset_jobs_data,
  20. get_asset_sensors_metadata,
  21. )
  22. from flexmeasures.data.schemas.reporting import StatusSchema
  23. from flexmeasures.utils.time_utils import as_server_time
  24. @pytest.mark.parametrize(
  25. "deserialization_input, exp_deserialization_output",
  26. [
  27. (
  28. "PT1H",
  29. timedelta(hours=1),
  30. ),
  31. (
  32. "PT15M",
  33. timedelta(minutes=15),
  34. ),
  35. ],
  36. )
  37. def test_resolution_field_deserialization(
  38. deserialization_input,
  39. exp_deserialization_output,
  40. ):
  41. """Check parsing the resolution field of the GetSensorDataSchema schema.
  42. These particular ISO durations are expected to be parsed as python timedeltas.
  43. """
  44. # todo: extend test cases with some nominal durations when timely-beliefs supports these
  45. # see https://github.com/SeitaBV/timely-beliefs/issues/13
  46. vf = GetSensorDataSchema._declared_fields["resolution"]
  47. deser = vf.deserialize(deserialization_input)
  48. assert deser == exp_deserialization_output
  49. @pytest.mark.parametrize(
  50. "deserialization_input, exp_deserialization_output",
  51. [
  52. (
  53. 1,
  54. [1],
  55. ),
  56. (
  57. 2.7,
  58. [2.7],
  59. ),
  60. (
  61. [1],
  62. [1],
  63. ),
  64. (
  65. [2.7],
  66. [2.7],
  67. ),
  68. (
  69. [1, None, 3], # sending a None/null value as part of a list is allowed
  70. [1, None, 3],
  71. ),
  72. (
  73. [None], # sending a None/null value as part of a list is allowed
  74. [None],
  75. ),
  76. ],
  77. )
  78. def test_value_field_deserialization(
  79. deserialization_input,
  80. exp_deserialization_output,
  81. ):
  82. """Testing straightforward cases"""
  83. vf = PostSensorDataSchema._declared_fields["values"]
  84. deser = vf.deserialize(deserialization_input)
  85. assert deser == exp_deserialization_output
  86. @pytest.mark.parametrize(
  87. "serialization_input, exp_serialization_output",
  88. [
  89. (
  90. 1,
  91. [1],
  92. ),
  93. (
  94. 2.7,
  95. [2.7],
  96. ),
  97. ],
  98. )
  99. def test_value_field_serialization(
  100. serialization_input,
  101. exp_serialization_output,
  102. ):
  103. """Testing straightforward cases"""
  104. vf = SingleValueField()
  105. ser = vf.serialize("values", {"values": serialization_input})
  106. assert ser == exp_serialization_output
  107. @pytest.mark.parametrize(
  108. "deserialization_input, error_msg",
  109. [
  110. (
  111. ["three", 4],
  112. "Not a valid number",
  113. ),
  114. (
  115. "3, 4",
  116. "Not a valid number",
  117. ),
  118. (
  119. None,
  120. "may not be null", # sending a single None/null value is not allowed
  121. ),
  122. ],
  123. )
  124. def test_value_field_invalid(deserialization_input, error_msg):
  125. sf = SingleValueField()
  126. with pytest.raises(ValidationError) as ve:
  127. sf.deserialize(deserialization_input)
  128. assert error_msg in str(ve)
  129. # knowledge time 2016-01-01T12:00+01
  130. @pytest.mark.parametrize(
  131. "now, expected_staleness, expected_stale",
  132. [
  133. (
  134. # Knowledge time 12 hours from now
  135. "2016-01-01T00:00+01",
  136. None, # Not known yet
  137. True,
  138. ),
  139. (
  140. # Knowledge time 12 hours and 18 minutes ago
  141. "2016-01-02T00:18+01",
  142. timedelta(hours=12, minutes=18),
  143. True,
  144. ),
  145. (
  146. # Knowledge time 1 day and 12 hours ago
  147. "2016-01-03T00:00+01",
  148. timedelta(days=1, hours=12),
  149. True,
  150. ),
  151. (
  152. # Knowledge time 1 min ago
  153. "2016-01-01T12:01+01",
  154. timedelta(minutes=1),
  155. False,
  156. ),
  157. ],
  158. )
  159. def test_get_status_single_source(
  160. add_market_prices,
  161. now,
  162. expected_staleness,
  163. expected_stale,
  164. ):
  165. sensor = add_market_prices["epex_da"]
  166. staleness_search = dict()
  167. now = pd.Timestamp(now)
  168. stalenesses = get_stalenesses(
  169. sensor=sensor, staleness_search=staleness_search, now=now
  170. )
  171. if stalenesses is not None:
  172. stalenesses.pop("forecaster", None)
  173. source_type_of_interest = "reporter"
  174. if expected_staleness is None:
  175. assert stalenesses is None
  176. else:
  177. assert stalenesses[source_type_of_interest] == (mock.ANY, expected_staleness)
  178. status_specs = {
  179. "staleness_search": staleness_search,
  180. "max_staleness": "PT1H",
  181. "max_future_staleness": "-PT12H",
  182. }
  183. assert StatusSchema().load(status_specs)
  184. sensor_statuses = get_statuses(
  185. sensor=sensor,
  186. status_specs=status_specs,
  187. now=now,
  188. )
  189. if not expected_staleness:
  190. return # the following
  191. sensor_statuses = [
  192. status
  193. for status in sensor_statuses
  194. if status["source_type"] == source_type_of_interest
  195. ]
  196. sensor_status = sensor_statuses[0]
  197. assert sensor_status["staleness"] == expected_staleness
  198. assert sensor_status["stale"] == expected_stale
  199. if stalenesses is None:
  200. assert sensor_status["source_type"] is None
  201. else:
  202. assert sensor_status["source_type"] == source_type_of_interest
  203. # both sources have the same data
  204. # max_staleness for forecaster is 12 hours
  205. # max_staleness for reporter is 1 day
  206. @pytest.mark.parametrize(
  207. "now, expected_forecaster_staleness, expected_forecaster_stale, expect_forecaster_reason, expected_reporter_staleness, expected_reporter_stale, expect_reporter_reason",
  208. [
  209. (
  210. # Both stale
  211. # Last event start at 2016-01-02T23:00+01 10 hours from now,
  212. # with knowledge time 2016-01-01T12:00+01, 1 day 1 hour ago
  213. "2016-01-02T13:00+01",
  214. timedelta(hours=10),
  215. True,
  216. "most recent data is 10 hours in the future, but should be more than 12 hours in the future",
  217. timedelta(days=1, hours=1),
  218. True,
  219. "most recent data is 1 day and 1 hour old, but should not be more than 1 day old",
  220. ),
  221. (
  222. # Both not stale
  223. # Last event start at 2016-01-02T23:00+01 13 hours from now,
  224. # with knowledge time 2016-01-01T12:00+01, 22 hours ago
  225. "2016-01-02T10:00+01",
  226. timedelta(hours=13),
  227. False,
  228. "most recent data is 13 hours in the future, which is not less than 12 hours in the future",
  229. timedelta(hours=22),
  230. False,
  231. "most recent data is 22 hours old, which is not more than 1 day old",
  232. ),
  233. (
  234. # Reporter not stale, forecaster stale
  235. # Last event start at 2016-01-02T23:00+01,
  236. # with knowledge time 2016-01-01T12:00+01, 1 day ago
  237. "2016-01-02T12:00+01",
  238. timedelta(hours=11),
  239. True,
  240. "most recent data is 11 hours in the future, but should be more than 12 hours in the future",
  241. timedelta(days=1),
  242. False,
  243. "most recent data is 1 day old, which is not more than 1 day old",
  244. ),
  245. (
  246. # Both stale, no data in the future
  247. # Last event start at 2016-01-02T23:00+01,
  248. # with knowledge time 2016-01-01T12:00+01, 2 days ago
  249. "2016-01-03T12:00+01",
  250. None,
  251. True,
  252. "Found no future data which this source should have",
  253. timedelta(days=2),
  254. True,
  255. "most recent data is 2 days old, but should not be more than 1 day old",
  256. ),
  257. ],
  258. )
  259. def test_get_status_multi_source(
  260. add_market_prices,
  261. now,
  262. expected_forecaster_staleness,
  263. expected_forecaster_stale,
  264. expect_forecaster_reason,
  265. expected_reporter_staleness,
  266. expected_reporter_stale,
  267. expect_reporter_reason,
  268. ):
  269. sensor = add_market_prices["epex_da"]
  270. now = pd.Timestamp(now)
  271. sensor_statuses = get_statuses(
  272. sensor=sensor,
  273. now=now,
  274. )
  275. for sensor_status in sensor_statuses:
  276. if sensor_status["source_type"] == "reporter":
  277. assert sensor_status["staleness"] == expected_reporter_staleness
  278. assert sensor_status["stale"] == expected_reporter_stale
  279. assert sensor_status["reason"] == expect_reporter_reason
  280. if sensor_status["source_type"] == "forecaster":
  281. assert sensor_status["staleness"] == expected_forecaster_staleness
  282. assert sensor_status["stale"] == expected_forecaster_stale
  283. assert sensor_status["reason"] == expect_forecaster_reason
  284. @pytest.mark.parametrize(
  285. "source_type, now, expected_staleness, expected_stale, expected_stale_reason",
  286. [
  287. # sensor resolution is 15 min
  288. (
  289. "demo script",
  290. # Last event start (in the past) at 2015-01-02T07:45+01, with knowledge time 2015-01-02T08:00+01, 29 minutes ago
  291. "2015-01-02T08:29+01",
  292. timedelta(minutes=29),
  293. False,
  294. "not more than 30 minutes old",
  295. ),
  296. (
  297. "demo script",
  298. # Last event start (in the past) at 2015-01-02T07:45+01, with knowledge time 2015-01-02T08:00+01, 31 minutes ago
  299. "2015-01-02T08:31+01",
  300. timedelta(minutes=31),
  301. True,
  302. "more than 30 minutes old",
  303. ),
  304. (
  305. "scheduler",
  306. # Last event start (in the future) at 2016-01-02T07:45+01, in 24 hours 45 minutes
  307. "2016-01-01T07:00+01",
  308. timedelta(minutes=24 * 60 + 45),
  309. False,
  310. "not less than 12 hours in the future",
  311. ),
  312. ],
  313. )
  314. def test_get_status_no_status_specs(
  315. capacity_sensors,
  316. source_type,
  317. now,
  318. expected_staleness,
  319. expected_stale,
  320. expected_stale_reason,
  321. ):
  322. sensor = capacity_sensors["production"]
  323. now = pd.Timestamp(now)
  324. sensor_statuses = get_statuses(
  325. sensor=sensor,
  326. status_specs=None,
  327. now=now,
  328. )
  329. assert source_type in [ss["source_type"] for ss in sensor_statuses]
  330. for sensor_status in sensor_statuses:
  331. if sensor_status["source_type"] == source_type:
  332. assert sensor_status["staleness"] == expected_staleness
  333. assert sensor_status["stale"] == expected_stale
  334. assert expected_stale_reason in sensor_status["reason"]
  335. def test_asset_sensors_metadata(
  336. db, mock_get_statuses, add_weather_sensors, add_battery_assets
  337. ):
  338. """
  339. Test the function to build status meta data structure, using a weather station asset.
  340. We include the sensor of a different asset (a battery) via the flex context
  341. (as production price, does not make too much sense actually).
  342. One sensor which the asset already includes is also set in the context as inflexible device,
  343. so we can test if the relationship tagging works for that as well.
  344. """
  345. asset = add_weather_sensors["asset"]
  346. battery_asset = add_battery_assets["Test battery"]
  347. wind_sensor, temperature_sensor = (
  348. add_weather_sensors["wind"],
  349. add_weather_sensors["temperature"],
  350. )
  351. production_price_sensor = Sensor(
  352. name="production price",
  353. generic_asset=battery_asset,
  354. event_resolution=timedelta(minutes=5),
  355. unit="EUR/MWh",
  356. )
  357. db.session.add(production_price_sensor)
  358. db.session.flush()
  359. asset.flex_context["production-price"] = {"sensor": production_price_sensor.id}
  360. asset.flex_context["inflexible-device-sensors"] = [temperature_sensor.id]
  361. db.session.add(asset)
  362. wind_speed_res, temperature_res = {"staleness": True}, {"staleness": False}
  363. production_price_res = {"staleness": True}
  364. mock_get_statuses.side_effect = (
  365. [wind_speed_res],
  366. [temperature_res],
  367. [production_price_res],
  368. )
  369. status_data = get_asset_sensors_metadata(asset=asset)
  370. assert status_data != [
  371. {
  372. "name": "wind speed",
  373. "id": wind_sensor.id,
  374. "asset_name": asset.name,
  375. },
  376. {
  377. "name": "temperature",
  378. "id": temperature_sensor.id,
  379. "asset_name": asset.name,
  380. },
  381. {
  382. "name": "production price",
  383. "id": production_price_sensor.id,
  384. "asset_name": battery_asset.name,
  385. },
  386. ]
  387. # Make sure the Wind speed is not in the sensor data as it is not in sensors_to_show or flex-context
  388. assert status_data == [
  389. {
  390. "name": "temperature",
  391. "id": temperature_sensor.id,
  392. "asset_name": asset.name,
  393. },
  394. {
  395. "name": "production price",
  396. "id": production_price_sensor.id,
  397. "asset_name": battery_asset.name,
  398. },
  399. ]
  400. def custom_model_params():
  401. """little training as we have little data, turn off transformations until they let this test run (TODO)"""
  402. return dict(
  403. training_and_testing_period=timedelta(hours=2),
  404. outcome_var_transformation=None,
  405. regressor_transformation={},
  406. )
  407. def test_build_asset_jobs_data(db, app, add_battery_assets):
  408. """Check that we get both types of jobs for a battery asset."""
  409. battery_asset = add_battery_assets["Test battery"]
  410. battery = battery_asset.sensors[0]
  411. tz = pytz.timezone("Europe/Amsterdam")
  412. start, end = tz.localize(datetime(2015, 1, 2)), tz.localize(datetime(2015, 1, 3))
  413. scheduling_job = create_scheduling_job(
  414. asset_or_sensor=battery,
  415. start=start,
  416. end=end,
  417. belief_time=start,
  418. resolution=timedelta(minutes=15),
  419. )
  420. forecasting_jobs = create_forecasting_jobs(
  421. start_of_roll=as_server_time(datetime(2015, 1, 1, 6)),
  422. end_of_roll=as_server_time(datetime(2015, 1, 1, 7)),
  423. horizons=[timedelta(hours=1)],
  424. sensor_id=battery.id,
  425. custom_model_params=custom_model_params(),
  426. )
  427. jobs_data = build_asset_jobs_data(battery_asset)
  428. assert sorted([j["queue"] for j in jobs_data]) == ["forecasting", "scheduling"]
  429. for job_data in jobs_data:
  430. metadata = json.loads(job_data["metadata"])
  431. if job_data["queue"] == "forecasting":
  432. assert metadata["job_id"] == forecasting_jobs[0].id
  433. else:
  434. assert metadata["job_id"] == scheduling_job.id
  435. assert job_data["status"] == "queued"
  436. assert job_data["entity"] == f"sensor: {battery.name} (Id: {battery.id})"
  437. # Clean up queues
  438. app.queues["scheduling"].empty()
  439. app.queues["forecasting"].empty()
  440. assert app.queues["scheduling"].count == 0
  441. assert app.queues["forecasting"].count == 0