test_sensors_api.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. from __future__ import annotations
  2. import pytest
  3. import math
  4. from flask import url_for
  5. from sqlalchemy import select, func
  6. from flexmeasures.data.models.time_series import TimedBelief
  7. from flexmeasures import Sensor
  8. from flexmeasures.api.v3_0.tests.utils import get_sensor_post_data
  9. from flexmeasures.data.models.audit_log import AssetAuditLog
  10. from flexmeasures.data.schemas.sensors import SensorSchema
  11. from flexmeasures.data.models.generic_assets import GenericAsset
  12. from flexmeasures.tests.utils import QueryCounter
  13. from flexmeasures.utils.unit_utils import is_valid_unit
  14. sensor_schema = SensorSchema()
  15. @pytest.mark.parametrize(
  16. "requesting_user, search_by, search_value, exp_sensor_name, exp_num_results, include_consultancy_clients, use_pagination, expected_status_code, filter_account_id, filter_asset_id, asset_id_of_of_first_sensor_result",
  17. [
  18. (
  19. "test_supplier_user_4@seita.nl",
  20. "unit",
  21. "°C",
  22. "some temperature sensor",
  23. 2,
  24. True,
  25. False,
  26. 200,
  27. None,
  28. 5,
  29. None,
  30. ),
  31. (
  32. "test_prosumer_user@seita.nl",
  33. None,
  34. None,
  35. "power",
  36. 2,
  37. False,
  38. False,
  39. 200,
  40. None,
  41. 7,
  42. 8, # We test that the endpoint returns the sensor on a battery asset (ID: 8) while we filter for the building asset (ID: 7) that includes it
  43. ),
  44. (
  45. "test_supplier_user_4@seita.nl",
  46. "unit",
  47. "m³/h",
  48. "some gas sensor",
  49. 1,
  50. True,
  51. False,
  52. 200,
  53. None,
  54. 5,
  55. None,
  56. ),
  57. (
  58. "test_supplier_user_4@seita.nl",
  59. None,
  60. None,
  61. None,
  62. None,
  63. None,
  64. None,
  65. 422, # Error expected due to both asset_id and account_id being provided
  66. 1,
  67. 5,
  68. None,
  69. ),
  70. (
  71. "test_dummy_account_admin@seita.nl",
  72. None,
  73. None,
  74. None,
  75. None,
  76. None,
  77. None,
  78. 403, # Error expected as the user lacks access to the specified asset
  79. None,
  80. 5,
  81. None,
  82. ),
  83. (
  84. "test_supplier_user_4@seita.nl",
  85. None,
  86. None,
  87. None,
  88. None,
  89. None,
  90. None,
  91. 403, # Error expected as the user lacks access to the specified account
  92. 1,
  93. None,
  94. None,
  95. ),
  96. (
  97. "test_supplier_user_4@seita.nl",
  98. None,
  99. None,
  100. "some temperature sensor",
  101. 3,
  102. True,
  103. True,
  104. 200,
  105. None,
  106. 5,
  107. None,
  108. ),
  109. (
  110. "test_supplier_user_4@seita.nl",
  111. "filter",
  112. "'some temperature sensor'",
  113. "some temperature sensor",
  114. 1,
  115. False,
  116. False,
  117. 200,
  118. None,
  119. 5,
  120. None,
  121. ),
  122. ],
  123. indirect=["requesting_user"],
  124. )
  125. def test_fetch_sensors(
  126. client,
  127. setup_api_test_data,
  128. add_battery_assets,
  129. requesting_user,
  130. search_by,
  131. search_value,
  132. exp_sensor_name,
  133. exp_num_results,
  134. include_consultancy_clients,
  135. use_pagination,
  136. expected_status_code,
  137. filter_account_id,
  138. filter_asset_id,
  139. asset_id_of_of_first_sensor_result,
  140. ):
  141. """
  142. Retrieve all sensors.
  143. Our user here is admin, so is allowed to see all sensors.
  144. Pagination is tested only in passing, we should test filtering and page > 1
  145. The `filter_asset_id` specifies the asset_id to filter for.
  146. The `asset_id_of_of_first_sensor_result` specifies the asset_id of the first sensor
  147. in the result list. This sensors is expected to be from a child asset of the asset
  148. specified in `filter_asset_id`.
  149. The `filter_account_id` specifies the account_id to filter for.
  150. `check_errors` is used to test the error handling of the endpoint.
  151. """
  152. query = {search_by: search_value}
  153. if use_pagination:
  154. query["page"] = 1
  155. if search_by == "unit":
  156. query["unit"] = search_value
  157. elif search_by == "filter":
  158. query["filter"] = search_value
  159. if include_consultancy_clients:
  160. query["include_consultancy_clients"] = True
  161. if filter_account_id:
  162. query["account_id"] = filter_account_id
  163. if filter_asset_id:
  164. query["asset_id"] = filter_asset_id
  165. response = client.get(
  166. url_for("SensorAPI:index"),
  167. query_string=query,
  168. )
  169. print("Server responded with:\n%s" % response.json)
  170. assert response.status_code == expected_status_code
  171. if expected_status_code == 200:
  172. if use_pagination:
  173. assert isinstance(response.json["data"][0], dict)
  174. assert is_valid_unit(response.json["data"][0]["unit"])
  175. assert response.json["num-records"] == exp_num_results
  176. assert response.json["filtered-records"] == exp_num_results
  177. else:
  178. assert isinstance(response.json, list)
  179. assert is_valid_unit(response.json[0]["unit"])
  180. assert response.json[0]["name"] == exp_sensor_name
  181. assert len(response.json) == exp_num_results
  182. if asset_id_of_of_first_sensor_result is not None:
  183. assert (
  184. response.json[0]["generic_asset_id"]
  185. == asset_id_of_of_first_sensor_result
  186. )
  187. elif filter_asset_id:
  188. assert response.json[0]["generic_asset_id"] == filter_asset_id
  189. if search_by == "unit":
  190. assert response.json[0]["unit"] == search_value
  191. @pytest.mark.parametrize(
  192. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  193. )
  194. def test_fetch_one_sensor(
  195. client, setup_api_test_data: dict[str, Sensor], requesting_user, db
  196. ):
  197. sensor_id = 1
  198. response = client.get(
  199. url_for("SensorAPI:fetch_one", id=sensor_id),
  200. )
  201. print("Server responded with:\n%s" % response.json)
  202. assert response.status_code == 200
  203. assert response.json["name"] == "some gas sensor"
  204. assert response.json["unit"] == "m³/h"
  205. assert response.json["timezone"] == "UTC"
  206. assert response.json["event_resolution"] == "PT10M"
  207. asset = db.session.get(GenericAsset, response.json["generic_asset_id"])
  208. assert asset.name == "incineration line"
  209. @pytest.mark.parametrize(
  210. "requesting_user, status_code",
  211. [(None, 401), ("test_prosumer_user_2@seita.nl", 403)],
  212. indirect=["requesting_user"],
  213. )
  214. def test_fetch_one_sensor_no_auth(
  215. client, setup_api_test_data: dict[str, Sensor], requesting_user, status_code
  216. ):
  217. """Test 1: Sensor with id 1 is not in the test_prosumer_user_2@seita.nl's account.
  218. The Supplier Account as can be seen in flexmeasures/api/v3_0/tests/conftest.py
  219. Test 2: There is no authentication int the headers"""
  220. sensor_id = 1
  221. response = client.get(url_for("SensorAPI:fetch_one", id=sensor_id))
  222. assert response.status_code == status_code
  223. if status_code == 403:
  224. assert (
  225. response.json["message"]
  226. == "You cannot be authorized for this content or functionality."
  227. )
  228. assert response.json["status"] == "INVALID_SENDER"
  229. elif status_code == 401:
  230. assert (
  231. response.json["message"]
  232. == "You could not be properly authenticated for this content or functionality."
  233. )
  234. assert response.json["status"] == "UNAUTHORIZED"
  235. else:
  236. raise NotImplementedError(f"Test did not expect status code {status_code}")
  237. @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
  238. def test_post_a_sensor(client, setup_api_test_data, requesting_user, db):
  239. post_data = get_sensor_post_data()
  240. response = client.post(
  241. url_for("SensorAPI:post"),
  242. json=post_data,
  243. )
  244. print("Server responded with:\n%s" % response.json)
  245. assert response.status_code == 201
  246. assert response.json["name"] == "power"
  247. assert response.json["event_resolution"] == "PT1H"
  248. assert response.json["generic_asset_id"] == post_data["generic_asset_id"]
  249. sensor: Sensor = db.session.execute(
  250. select(Sensor).filter_by(name="power", unit="kWh")
  251. ).scalar_one_or_none()
  252. assert sensor is not None
  253. assert sensor.unit == "kWh"
  254. assert sensor.attributes["capacity_in_mw"] == 0.0074
  255. assert db.session.execute(
  256. select(AssetAuditLog).filter_by(
  257. affected_asset_id=post_data["generic_asset_id"],
  258. event=f"Created sensor '{sensor.name}': {sensor.id}",
  259. active_user_id=requesting_user.id,
  260. active_user_name=requesting_user.username,
  261. )
  262. ).scalar_one_or_none()
  263. @pytest.mark.parametrize(
  264. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  265. )
  266. def test_post_sensor_to_asset_from_unrelated_account(
  267. client, setup_api_test_data, requesting_user
  268. ):
  269. """Tries to add sensor to account the user doesn't have access to"""
  270. post_data = get_sensor_post_data()
  271. response = client.post(
  272. url_for("SensorAPI:post"),
  273. json=post_data,
  274. )
  275. print("Server responded with:\n%s" % response.json)
  276. assert response.status_code == 403
  277. assert (
  278. response.json["message"]
  279. == "You cannot be authorized for this content or functionality."
  280. )
  281. assert response.json["status"] == "INVALID_SENDER"
  282. @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
  283. def test_patch_sensor(client, setup_api_test_data, requesting_user, db):
  284. sensor = db.session.execute(
  285. select(Sensor).filter_by(name="some gas sensor")
  286. ).scalar_one_or_none()
  287. response = client.patch(
  288. url_for("SensorAPI:patch", id=sensor.id),
  289. json={
  290. "name": "Changed name",
  291. "attributes": '{"test_attribute": "test_attribute_value"}',
  292. },
  293. )
  294. assert response.json["name"] == "Changed name"
  295. new_sensor = db.session.execute(
  296. select(Sensor).filter_by(name="Changed name")
  297. ).scalar_one_or_none()
  298. assert new_sensor.name == "Changed name"
  299. assert (
  300. db.session.execute(
  301. select(Sensor).filter_by(name="some gas sensor")
  302. ).scalar_one_or_none()
  303. is None
  304. )
  305. assert new_sensor.attributes["test_attribute"] == "test_attribute_value"
  306. audit_log_event = (
  307. f"Updated sensor 'some gas sensor': {sensor.id}. Updated fields: Field name: name, Old value: some gas sensor, New value: Changed name; Field name: attributes, "
  308. + "Old value: {}, New value: {'test_attribute': 'test_attribute_value'}"
  309. )
  310. assert db.session.execute(
  311. select(AssetAuditLog).filter_by(
  312. event=audit_log_event,
  313. active_user_id=requesting_user.id,
  314. active_user_name=requesting_user.username,
  315. affected_asset_id=sensor.generic_asset_id,
  316. )
  317. ).scalar_one_or_none()
  318. @pytest.mark.parametrize(
  319. "attribute, value",
  320. [
  321. ("generic_asset_id", 8),
  322. ("entity_address", "ea1.2025-01.io.flexmeasures:fm1.1"),
  323. ("id", 7),
  324. ],
  325. )
  326. @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
  327. def test_patch_sensor_for_excluded_attribute(
  328. client, setup_api_test_data, attribute, value, requesting_user, db
  329. ):
  330. """Test to change the generic_asset_id that should not be allowed.
  331. The generic_asset_id is excluded in the partial_sensor_schema"""
  332. sensor = db.session.execute(
  333. select(Sensor).filter_by(name="some temperature sensor")
  334. ).scalar_one_or_none()
  335. response = client.patch(
  336. url_for("SensorAPI:patch", id=sensor.id),
  337. json={
  338. attribute: value,
  339. },
  340. )
  341. print(response.json)
  342. assert response.status_code == 422
  343. assert response.json["status"] == "UNPROCESSABLE_ENTITY"
  344. assert response.json["message"]["json"][attribute] == ["Unknown field."]
  345. @pytest.mark.parametrize(
  346. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  347. )
  348. def test_patch_sensor_non_admin(client, setup_api_test_data, requesting_user, db):
  349. """Try to change the name of a sensor with a non admin account"""
  350. sensor = db.session.execute(
  351. select(Sensor).filter_by(name="some temperature sensor")
  352. ).scalar_one_or_none()
  353. response = client.patch(
  354. url_for("SensorAPI:patch", id=sensor.id),
  355. json={
  356. "name": "try to change the name",
  357. },
  358. )
  359. assert response.status_code == 403
  360. assert response.json["status"] == "INVALID_SENDER"
  361. @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
  362. def test_delete_a_sensor(client, setup_api_test_data, requesting_user, db):
  363. existing_sensor = setup_api_test_data["some temperature sensor"]
  364. existing_sensor_id = existing_sensor.id
  365. sensor_data = db.session.scalars(
  366. select(TimedBelief).filter(TimedBelief.sensor_id == existing_sensor_id)
  367. ).all()
  368. sensor_count = db.session.scalar(select(func.count()).select_from(Sensor))
  369. assert isinstance(sensor_data[0].event_value, float)
  370. delete_sensor_response = client.delete(
  371. url_for("SensorAPI:delete", id=existing_sensor_id),
  372. )
  373. assert delete_sensor_response.status_code == 204
  374. deleted_sensor = db.session.get(Sensor, existing_sensor_id)
  375. assert deleted_sensor is None
  376. assert (
  377. db.session.scalars(
  378. select(TimedBelief).filter(TimedBelief.sensor_id == existing_sensor_id)
  379. ).all()
  380. == []
  381. )
  382. assert (
  383. db.session.scalar(select(func.count()).select_from(Sensor)) == sensor_count - 1
  384. )
  385. assert db.session.execute(
  386. select(AssetAuditLog).filter_by(
  387. affected_asset_id=existing_sensor.generic_asset_id,
  388. event=f"Deleted sensor '{existing_sensor.name}': {existing_sensor.id}",
  389. active_user_id=requesting_user.id,
  390. active_user_name=requesting_user.username,
  391. )
  392. ).scalar_one_or_none()
  393. @pytest.mark.parametrize(
  394. "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
  395. )
  396. def test_fetch_sensor_stats(
  397. client, setup_api_test_data: dict[str, Sensor], requesting_user, db
  398. ):
  399. # gas sensor is set up in add_gas_measurements
  400. sensor_id = 1
  401. with QueryCounter(db.session.connection()) as counter1:
  402. response = client.get(
  403. url_for("SensorAPI:get_stats", id=sensor_id),
  404. )
  405. print("Server responded with:\n%s" % response.json)
  406. assert response.status_code == 200
  407. response_content = response.json
  408. del response_content["status"]
  409. assert sorted(list(response_content.keys())) == [
  410. "Other source",
  411. "Test Supplier User",
  412. ]
  413. for source, record in response_content.items():
  414. assert record["First event start"] == "2021-05-01T22:00:00+00:00"
  415. assert record["Last event end"] == "2021-05-01T22:30:00+00:00"
  416. assert record["Min value"] == 91.3
  417. assert record["Max value"] == 92.1
  418. if source == "Test Supplier User":
  419. # values are: 91.3, 91.7, 92.1
  420. sum_values = 275.1
  421. count_values = 3
  422. else:
  423. # values are: 91.3, NaN, 92.1
  424. sum_values = 183.4
  425. count_values = 3
  426. mean_value = 91.7
  427. assert math.isclose(
  428. record["Mean value"], mean_value, rel_tol=1e-5
  429. ), f"mean_value is close to {mean_value}"
  430. assert math.isclose(
  431. record["Sum over values"], sum_values, rel_tol=1e-5
  432. ), f"sum_values is close to {sum_values}"
  433. assert record["Number of values"] == count_values
  434. with QueryCounter(db.session.connection()) as counter2:
  435. response = client.get(
  436. url_for("SensorAPI:get_stats", id=sensor_id),
  437. )
  438. print("Server responded with:\n%s" % response.json)
  439. assert response.status_code == 200
  440. # Check stats cache works and stats query is executed only once
  441. assert counter1.count == counter2.count + 1