123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- from __future__ import annotations
- import pytest
- import math
- from flask import url_for
- from sqlalchemy import select, func
- from flexmeasures.data.models.time_series import TimedBelief
- from flexmeasures import Sensor
- from flexmeasures.api.v3_0.tests.utils import get_sensor_post_data
- from flexmeasures.data.models.audit_log import AssetAuditLog
- from flexmeasures.data.schemas.sensors import SensorSchema
- from flexmeasures.data.models.generic_assets import GenericAsset
- from flexmeasures.tests.utils import QueryCounter
- from flexmeasures.utils.unit_utils import is_valid_unit
- sensor_schema = SensorSchema()
- @pytest.mark.parametrize(
- "requesting_user, search_by, search_value, exp_sensor_name, exp_num_results, include_consultancy_clients, use_pagination, expected_status_code, filter_account_id, filter_asset_id, asset_id_of_of_first_sensor_result",
- [
- (
- "test_supplier_user_4@seita.nl",
- "unit",
- "°C",
- "some temperature sensor",
- 2,
- True,
- False,
- 200,
- None,
- 5,
- None,
- ),
- (
- "test_prosumer_user@seita.nl",
- None,
- None,
- "power",
- 2,
- False,
- False,
- 200,
- None,
- 7,
- 8, # We test that the endpoint returns the sensor on a battery asset (ID: 8) while we filter for the building asset (ID: 7) that includes it
- ),
- (
- "test_supplier_user_4@seita.nl",
- "unit",
- "m³/h",
- "some gas sensor",
- 1,
- True,
- False,
- 200,
- None,
- 5,
- None,
- ),
- (
- "test_supplier_user_4@seita.nl",
- None,
- None,
- None,
- None,
- None,
- None,
- 422, # Error expected due to both asset_id and account_id being provided
- 1,
- 5,
- None,
- ),
- (
- "test_dummy_account_admin@seita.nl",
- None,
- None,
- None,
- None,
- None,
- None,
- 403, # Error expected as the user lacks access to the specified asset
- None,
- 5,
- None,
- ),
- (
- "test_supplier_user_4@seita.nl",
- None,
- None,
- None,
- None,
- None,
- None,
- 403, # Error expected as the user lacks access to the specified account
- 1,
- None,
- None,
- ),
- (
- "test_supplier_user_4@seita.nl",
- None,
- None,
- "some temperature sensor",
- 3,
- True,
- True,
- 200,
- None,
- 5,
- None,
- ),
- (
- "test_supplier_user_4@seita.nl",
- "filter",
- "'some temperature sensor'",
- "some temperature sensor",
- 1,
- False,
- False,
- 200,
- None,
- 5,
- None,
- ),
- ],
- indirect=["requesting_user"],
- )
- def test_fetch_sensors(
- client,
- setup_api_test_data,
- add_battery_assets,
- requesting_user,
- search_by,
- search_value,
- exp_sensor_name,
- exp_num_results,
- include_consultancy_clients,
- use_pagination,
- expected_status_code,
- filter_account_id,
- filter_asset_id,
- asset_id_of_of_first_sensor_result,
- ):
- """
- Retrieve all sensors.
- Our user here is admin, so is allowed to see all sensors.
- Pagination is tested only in passing, we should test filtering and page > 1
- The `filter_asset_id` specifies the asset_id to filter for.
- The `asset_id_of_of_first_sensor_result` specifies the asset_id of the first sensor
- in the result list. This sensors is expected to be from a child asset of the asset
- specified in `filter_asset_id`.
- The `filter_account_id` specifies the account_id to filter for.
- `check_errors` is used to test the error handling of the endpoint.
- """
- query = {search_by: search_value}
- if use_pagination:
- query["page"] = 1
- if search_by == "unit":
- query["unit"] = search_value
- elif search_by == "filter":
- query["filter"] = search_value
- if include_consultancy_clients:
- query["include_consultancy_clients"] = True
- if filter_account_id:
- query["account_id"] = filter_account_id
- if filter_asset_id:
- query["asset_id"] = filter_asset_id
- response = client.get(
- url_for("SensorAPI:index"),
- query_string=query,
- )
- print("Server responded with:\n%s" % response.json)
- assert response.status_code == expected_status_code
- if expected_status_code == 200:
- if use_pagination:
- assert isinstance(response.json["data"][0], dict)
- assert is_valid_unit(response.json["data"][0]["unit"])
- assert response.json["num-records"] == exp_num_results
- assert response.json["filtered-records"] == exp_num_results
- else:
- assert isinstance(response.json, list)
- assert is_valid_unit(response.json[0]["unit"])
- assert response.json[0]["name"] == exp_sensor_name
- assert len(response.json) == exp_num_results
- if asset_id_of_of_first_sensor_result is not None:
- assert (
- response.json[0]["generic_asset_id"]
- == asset_id_of_of_first_sensor_result
- )
- elif filter_asset_id:
- assert response.json[0]["generic_asset_id"] == filter_asset_id
- if search_by == "unit":
- assert response.json[0]["unit"] == search_value
- @pytest.mark.parametrize(
- "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
- )
- def test_fetch_one_sensor(
- client, setup_api_test_data: dict[str, Sensor], requesting_user, db
- ):
- sensor_id = 1
- response = client.get(
- url_for("SensorAPI:fetch_one", id=sensor_id),
- )
- print("Server responded with:\n%s" % response.json)
- assert response.status_code == 200
- assert response.json["name"] == "some gas sensor"
- assert response.json["unit"] == "m³/h"
- assert response.json["timezone"] == "UTC"
- assert response.json["event_resolution"] == "PT10M"
- asset = db.session.get(GenericAsset, response.json["generic_asset_id"])
- assert asset.name == "incineration line"
- @pytest.mark.parametrize(
- "requesting_user, status_code",
- [(None, 401), ("test_prosumer_user_2@seita.nl", 403)],
- indirect=["requesting_user"],
- )
- def test_fetch_one_sensor_no_auth(
- client, setup_api_test_data: dict[str, Sensor], requesting_user, status_code
- ):
- """Test 1: Sensor with id 1 is not in the test_prosumer_user_2@seita.nl's account.
- The Supplier Account as can be seen in flexmeasures/api/v3_0/tests/conftest.py
- Test 2: There is no authentication int the headers"""
- sensor_id = 1
- response = client.get(url_for("SensorAPI:fetch_one", id=sensor_id))
- assert response.status_code == status_code
- if status_code == 403:
- assert (
- response.json["message"]
- == "You cannot be authorized for this content or functionality."
- )
- assert response.json["status"] == "INVALID_SENDER"
- elif status_code == 401:
- assert (
- response.json["message"]
- == "You could not be properly authenticated for this content or functionality."
- )
- assert response.json["status"] == "UNAUTHORIZED"
- else:
- raise NotImplementedError(f"Test did not expect status code {status_code}")
- @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
- def test_post_a_sensor(client, setup_api_test_data, requesting_user, db):
- post_data = get_sensor_post_data()
- response = client.post(
- url_for("SensorAPI:post"),
- json=post_data,
- )
- print("Server responded with:\n%s" % response.json)
- assert response.status_code == 201
- assert response.json["name"] == "power"
- assert response.json["event_resolution"] == "PT1H"
- assert response.json["generic_asset_id"] == post_data["generic_asset_id"]
- sensor: Sensor = db.session.execute(
- select(Sensor).filter_by(name="power", unit="kWh")
- ).scalar_one_or_none()
- assert sensor is not None
- assert sensor.unit == "kWh"
- assert sensor.attributes["capacity_in_mw"] == 0.0074
- assert db.session.execute(
- select(AssetAuditLog).filter_by(
- affected_asset_id=post_data["generic_asset_id"],
- event=f"Created sensor '{sensor.name}': {sensor.id}",
- active_user_id=requesting_user.id,
- active_user_name=requesting_user.username,
- )
- ).scalar_one_or_none()
- @pytest.mark.parametrize(
- "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
- )
- def test_post_sensor_to_asset_from_unrelated_account(
- client, setup_api_test_data, requesting_user
- ):
- """Tries to add sensor to account the user doesn't have access to"""
- post_data = get_sensor_post_data()
- response = client.post(
- url_for("SensorAPI:post"),
- json=post_data,
- )
- print("Server responded with:\n%s" % response.json)
- assert response.status_code == 403
- assert (
- response.json["message"]
- == "You cannot be authorized for this content or functionality."
- )
- assert response.json["status"] == "INVALID_SENDER"
- @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
- def test_patch_sensor(client, setup_api_test_data, requesting_user, db):
- sensor = db.session.execute(
- select(Sensor).filter_by(name="some gas sensor")
- ).scalar_one_or_none()
- response = client.patch(
- url_for("SensorAPI:patch", id=sensor.id),
- json={
- "name": "Changed name",
- "attributes": '{"test_attribute": "test_attribute_value"}',
- },
- )
- assert response.json["name"] == "Changed name"
- new_sensor = db.session.execute(
- select(Sensor).filter_by(name="Changed name")
- ).scalar_one_or_none()
- assert new_sensor.name == "Changed name"
- assert (
- db.session.execute(
- select(Sensor).filter_by(name="some gas sensor")
- ).scalar_one_or_none()
- is None
- )
- assert new_sensor.attributes["test_attribute"] == "test_attribute_value"
- audit_log_event = (
- f"Updated sensor 'some gas sensor': {sensor.id}. Updated fields: Field name: name, Old value: some gas sensor, New value: Changed name; Field name: attributes, "
- + "Old value: {}, New value: {'test_attribute': 'test_attribute_value'}"
- )
- assert db.session.execute(
- select(AssetAuditLog).filter_by(
- event=audit_log_event,
- active_user_id=requesting_user.id,
- active_user_name=requesting_user.username,
- affected_asset_id=sensor.generic_asset_id,
- )
- ).scalar_one_or_none()
- @pytest.mark.parametrize(
- "attribute, value",
- [
- ("generic_asset_id", 8),
- ("entity_address", "ea1.2025-01.io.flexmeasures:fm1.1"),
- ("id", 7),
- ],
- )
- @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
- def test_patch_sensor_for_excluded_attribute(
- client, setup_api_test_data, attribute, value, requesting_user, db
- ):
- """Test to change the generic_asset_id that should not be allowed.
- The generic_asset_id is excluded in the partial_sensor_schema"""
- sensor = db.session.execute(
- select(Sensor).filter_by(name="some temperature sensor")
- ).scalar_one_or_none()
- response = client.patch(
- url_for("SensorAPI:patch", id=sensor.id),
- json={
- attribute: value,
- },
- )
- print(response.json)
- assert response.status_code == 422
- assert response.json["status"] == "UNPROCESSABLE_ENTITY"
- assert response.json["message"]["json"][attribute] == ["Unknown field."]
- @pytest.mark.parametrize(
- "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
- )
- def test_patch_sensor_non_admin(client, setup_api_test_data, requesting_user, db):
- """Try to change the name of a sensor with a non admin account"""
- sensor = db.session.execute(
- select(Sensor).filter_by(name="some temperature sensor")
- ).scalar_one_or_none()
- response = client.patch(
- url_for("SensorAPI:patch", id=sensor.id),
- json={
- "name": "try to change the name",
- },
- )
- assert response.status_code == 403
- assert response.json["status"] == "INVALID_SENDER"
- @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True)
- def test_delete_a_sensor(client, setup_api_test_data, requesting_user, db):
- existing_sensor = setup_api_test_data["some temperature sensor"]
- existing_sensor_id = existing_sensor.id
- sensor_data = db.session.scalars(
- select(TimedBelief).filter(TimedBelief.sensor_id == existing_sensor_id)
- ).all()
- sensor_count = db.session.scalar(select(func.count()).select_from(Sensor))
- assert isinstance(sensor_data[0].event_value, float)
- delete_sensor_response = client.delete(
- url_for("SensorAPI:delete", id=existing_sensor_id),
- )
- assert delete_sensor_response.status_code == 204
- deleted_sensor = db.session.get(Sensor, existing_sensor_id)
- assert deleted_sensor is None
- assert (
- db.session.scalars(
- select(TimedBelief).filter(TimedBelief.sensor_id == existing_sensor_id)
- ).all()
- == []
- )
- assert (
- db.session.scalar(select(func.count()).select_from(Sensor)) == sensor_count - 1
- )
- assert db.session.execute(
- select(AssetAuditLog).filter_by(
- affected_asset_id=existing_sensor.generic_asset_id,
- event=f"Deleted sensor '{existing_sensor.name}': {existing_sensor.id}",
- active_user_id=requesting_user.id,
- active_user_name=requesting_user.username,
- )
- ).scalar_one_or_none()
- @pytest.mark.parametrize(
- "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
- )
- def test_fetch_sensor_stats(
- client, setup_api_test_data: dict[str, Sensor], requesting_user, db
- ):
- # gas sensor is set up in add_gas_measurements
- sensor_id = 1
- with QueryCounter(db.session.connection()) as counter1:
- response = client.get(
- url_for("SensorAPI:get_stats", id=sensor_id),
- )
- print("Server responded with:\n%s" % response.json)
- assert response.status_code == 200
- response_content = response.json
- del response_content["status"]
- assert sorted(list(response_content.keys())) == [
- "Other source",
- "Test Supplier User",
- ]
- for source, record in response_content.items():
- assert record["First event start"] == "2021-05-01T22:00:00+00:00"
- assert record["Last event end"] == "2021-05-01T22:30:00+00:00"
- assert record["Min value"] == 91.3
- assert record["Max value"] == 92.1
- if source == "Test Supplier User":
- # values are: 91.3, 91.7, 92.1
- sum_values = 275.1
- count_values = 3
- else:
- # values are: 91.3, NaN, 92.1
- sum_values = 183.4
- count_values = 3
- mean_value = 91.7
- assert math.isclose(
- record["Mean value"], mean_value, rel_tol=1e-5
- ), f"mean_value is close to {mean_value}"
- assert math.isclose(
- record["Sum over values"], sum_values, rel_tol=1e-5
- ), f"sum_values is close to {sum_values}"
- assert record["Number of values"] == count_values
- with QueryCounter(db.session.connection()) as counter2:
- response = client.get(
- url_for("SensorAPI:get_stats", id=sensor_id),
- )
- print("Server responded with:\n%s" % response.json)
- assert response.status_code == 200
- # Check stats cache works and stats query is executed only once
- assert counter1.count == counter2.count + 1
|