test_sensor_data.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. from __future__ import annotations
  2. from datetime import timedelta
  3. from flask import url_for
  4. import pytest
  5. from sqlalchemy import event
  6. from sqlalchemy.engine import Engine
  7. from flexmeasures import Sensor, Source, User
  8. from flexmeasures.api.v3_0.tests.utils import make_sensor_data_request_for_gas_sensor
  9. @pytest.mark.parametrize(
  10. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  11. )
  12. def test_get_no_sensor_data(
  13. client,
  14. setup_api_test_data: dict[str, Sensor],
  15. requesting_user,
  16. ):
  17. """Check the /sensors/data endpoint for fetching data for a period without any data."""
  18. sensor = setup_api_test_data["some gas sensor"]
  19. message = {
  20. "sensor": f"ea1.2021-01.io.flexmeasures:fm1.{sensor.id}",
  21. "start": "1921-05-02T00:00:00+02:00", # we have loaded no test data for this year
  22. "duration": "PT1H20M",
  23. "horizon": "PT0H",
  24. "unit": "m³/h",
  25. "resolution": "PT20M",
  26. }
  27. response = client.get(
  28. url_for("SensorAPI:get_data"),
  29. query_string=message,
  30. )
  31. print("Server responded with:\n%s" % response.json)
  32. assert response.status_code == 200
  33. values = response.json["values"]
  34. # We expect only null values (which are converted to None by .json)
  35. assert all(a == b for a, b in zip(values, [None, None, None, None]))
  36. @pytest.mark.parametrize(
  37. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  38. )
  39. def test_get_sensor_data(
  40. client,
  41. setup_api_test_data: dict[str, Sensor],
  42. setup_roles_users: dict[str, User],
  43. requesting_user,
  44. db,
  45. ):
  46. """Check the /sensors/data endpoint for fetching 1 hour of data of a 10-minute resolution sensor."""
  47. sensor = setup_api_test_data["some gas sensor"]
  48. source: Source = db.session.get(
  49. User, setup_roles_users["Test Supplier User"]
  50. ).data_source[0]
  51. assert sensor.event_resolution == timedelta(minutes=10)
  52. message = {
  53. "sensor": f"ea1.2021-01.io.flexmeasures:fm1.{sensor.id}",
  54. "start": "2021-05-02T00:00:00+02:00",
  55. "duration": "PT1H20M",
  56. "horizon": "PT0H",
  57. "unit": "m³/h",
  58. "source": source.id,
  59. "resolution": "PT20M",
  60. }
  61. response = client.get(
  62. url_for("SensorAPI:get_data"),
  63. query_string=message,
  64. )
  65. print("Server responded with:\n%s" % response.json)
  66. assert response.status_code == 200
  67. values = response.json["values"]
  68. # We expect two data points (from conftest) followed by 2 null values (which are converted to None by .json)
  69. # The first data point averages [91.3, 91.7], and the second data point averages [92.1, None].
  70. assert all(a == b for a, b in zip(values, [91.5, 92.1, None, None]))
  71. @pytest.mark.parametrize(
  72. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  73. )
  74. def test_get_instantaneous_sensor_data(
  75. client,
  76. setup_api_test_data: dict[str, Sensor],
  77. setup_roles_users: dict[str, User],
  78. requesting_user,
  79. db,
  80. ):
  81. """Check the /sensors/data endpoint for fetching 1 hour of data of an instantaneous sensor."""
  82. sensor = setup_api_test_data["some temperature sensor"]
  83. source: Source = db.session.get(
  84. User, setup_roles_users["Test Supplier User"]
  85. ).data_source[0]
  86. assert sensor.event_resolution == timedelta(minutes=0)
  87. message = {
  88. "sensor": f"ea1.2021-01.io.flexmeasures:fm1.{sensor.id}",
  89. "start": "2021-05-02T00:00:00+02:00",
  90. "duration": "PT1H20M",
  91. "horizon": "PT0H",
  92. "unit": "°C",
  93. "source": source.id,
  94. "resolution": "PT20M",
  95. }
  96. response = client.get(
  97. url_for("SensorAPI:get_data"),
  98. query_string=message,
  99. )
  100. print("Server responded with:\n%s" % response.json)
  101. assert response.status_code == 200
  102. values = response.json["values"]
  103. # We expect two data point (from conftest) followed by 2 null values (which are converted to None by .json)
  104. # The first data point is the first of [815, 817], and the second data point is the first of [818, None].
  105. assert all(a == b for a, b in zip(values, [815, 818, None, None]))
  106. @pytest.mark.parametrize(
  107. "requesting_user, status_code",
  108. [
  109. (None, 401), # the case without auth: authentication will fail
  110. (
  111. "test_dummy_user_3@seita.nl",
  112. 403,
  113. ), # in this case, we successfully authenticate, but fail authorization (not member of the account in which the sensor lies)
  114. ],
  115. indirect=["requesting_user"],
  116. )
  117. def test_post_sensor_data_bad_auth(
  118. client, setup_api_test_data, requesting_user, status_code
  119. ):
  120. """
  121. Attempt to post sensor data with insufficient or missing auth.
  122. """
  123. post_data = make_sensor_data_request_for_gas_sensor()
  124. post_data_response = client.post(
  125. url_for("SensorAPI:post_data"),
  126. json=post_data,
  127. )
  128. print("Server responded with:\n%s" % post_data_response.data)
  129. assert post_data_response.status_code == status_code
  130. @pytest.mark.parametrize(
  131. "request_field, new_value, error_field, error_text",
  132. [
  133. ("start", "2021-06-07T00:00:00", "start", "Not a valid aware datetime"),
  134. (
  135. "duration",
  136. "PT30M",
  137. "_schema",
  138. "Resolution of 0:05:00 is incompatible",
  139. ), # downsampling not supported
  140. ("sensor", "ea1.2021-01.io.flexmeasures:fm1.666", "sensor", "doesn't exist"),
  141. ("unit", "m", "_schema", "Required unit"),
  142. ("type", "GetSensorDataRequest", "type", "Must be one of"),
  143. ],
  144. )
  145. @pytest.mark.parametrize(
  146. "requesting_user",
  147. [
  148. "test_supplier_user_4@seita.nl", # this guy is allowed to post sensorData
  149. ],
  150. indirect=True,
  151. )
  152. def test_post_invalid_sensor_data(
  153. client,
  154. setup_api_test_data,
  155. request_field,
  156. new_value,
  157. error_field,
  158. error_text,
  159. requesting_user,
  160. ):
  161. post_data = make_sensor_data_request_for_gas_sensor()
  162. post_data[request_field] = new_value
  163. response = client.post(
  164. url_for("SensorAPI:post_data"),
  165. json=post_data,
  166. )
  167. print(response.json)
  168. assert response.status_code == 422
  169. assert error_text in response.json["message"]["json"][error_field][0]
  170. @pytest.mark.parametrize(
  171. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  172. )
  173. def test_post_sensor_data_twice(client, setup_api_test_data, requesting_user, db):
  174. post_data = make_sensor_data_request_for_gas_sensor()
  175. @event.listens_for(Engine, "handle_error")
  176. def receive_handle_error(exception_context):
  177. """
  178. Check that the error that we are getting is of type IntegrityError.
  179. """
  180. error_info = exception_context.sqlalchemy_exception
  181. # If the assert failed, we would get a 500 status code
  182. assert error_info.__class__.__name__ == "IntegrityError"
  183. # Check that 1st time posting the data succeeds
  184. response = client.post(
  185. url_for("SensorAPI:post_data"),
  186. json=post_data,
  187. )
  188. print(response.json)
  189. assert response.status_code == 200
  190. # Check that 2nd time posting the same data succeeds informatively
  191. response = client.post(
  192. url_for("SensorAPI:post_data"),
  193. json=post_data,
  194. )
  195. print(response.json)
  196. assert response.status_code == 200
  197. assert "data has already been received" in response.json["message"]
  198. # Check that replacing data fails informatively
  199. post_data["values"][0] = 100
  200. response = client.post(
  201. url_for("SensorAPI:post_data"),
  202. json=post_data,
  203. )
  204. print(response.json)
  205. assert response.status_code == 403
  206. assert "data represents a replacement" in response.json["message"]
  207. # at this point, the transaction has failed and needs to be rolled back.
  208. db.session.rollback()
  209. @pytest.mark.parametrize(
  210. "num_values, status_code, message, saved_rows",
  211. [
  212. (1, 200, "Request has been processed.", 1),
  213. (
  214. 2,
  215. 422,
  216. "Cannot save multiple instantaneous values that overlap. That is, two values spanning the same moment (a zero duration). Try sending a single value or definining a non-zero duration.",
  217. 0,
  218. ),
  219. ],
  220. )
  221. @pytest.mark.parametrize(
  222. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  223. )
  224. def test_post_sensor_instantaneous_data(
  225. client,
  226. setup_api_test_data,
  227. num_values,
  228. status_code,
  229. message,
  230. saved_rows,
  231. requesting_user,
  232. ):
  233. post_data = make_sensor_data_request_for_gas_sensor(
  234. sensor_name="empty temperature sensor",
  235. num_values=num_values,
  236. unit="°C",
  237. duration="PT0H",
  238. )
  239. sensor = setup_api_test_data["empty temperature sensor"]
  240. rows = len(sensor.search_beliefs())
  241. # Check that 1st time posting the data succeeds
  242. response = client.post(
  243. url_for("SensorAPI:post_data"),
  244. json=post_data,
  245. )
  246. assert response.status_code == status_code
  247. if status_code == 422:
  248. assert response.json["message"]["json"]["_schema"][0] == message
  249. else:
  250. assert response.json["message"] == message
  251. assert len(sensor.search_beliefs()) - rows == saved_rows