test_data_add.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. import pytest
  2. import json
  3. import yaml
  4. import os
  5. from datetime import datetime
  6. import pytz
  7. from sqlalchemy import select, func
  8. from flexmeasures import Asset
  9. from flexmeasures.cli.tests.utils import to_flags
  10. from flexmeasures.data.models.annotations import (
  11. Annotation,
  12. AccountAnnotationRelationship,
  13. )
  14. from flexmeasures.data.models.user import Account
  15. from flexmeasures.data.models.data_sources import DataSource
  16. from flexmeasures.data.models.time_series import Sensor
  17. from flexmeasures.cli.tests.utils import (
  18. check_command_ran_without_error,
  19. get_click_commands,
  20. )
  21. from flexmeasures.utils.time_utils import server_now
  22. from flexmeasures.tests.utils import get_test_sensor
  23. @pytest.mark.skip_github
  24. def test_add_annotation(app, db, setup_roles_users):
  25. from flexmeasures.cli.data_add import add_annotation
  26. cli_input = {
  27. "content": "Company founding day",
  28. "at": "2016-05-11T00:00+02:00",
  29. "account": 1,
  30. "user": 1,
  31. }
  32. runner = app.test_cli_runner()
  33. result = runner.invoke(add_annotation, to_flags(cli_input))
  34. # Check result for success
  35. assert "Successfully added annotation" in result.output
  36. # Check database for annotation entry
  37. assert db.session.execute(
  38. select(Annotation)
  39. .filter_by(
  40. content=cli_input["content"],
  41. start=cli_input["at"],
  42. )
  43. .join(AccountAnnotationRelationship)
  44. .filter_by(
  45. account_id=cli_input["account"],
  46. annotation_id=Annotation.id,
  47. )
  48. .join(DataSource)
  49. .filter_by(
  50. id=Annotation.source_id,
  51. user_id=cli_input["user"],
  52. )
  53. ).scalar_one_or_none()
  54. @pytest.mark.skip_github
  55. def test_add_holidays(app, db, setup_roles_users):
  56. from flexmeasures.cli.data_add import add_holidays
  57. cli_input = {
  58. "year": 2020,
  59. "country": "NL",
  60. "account": 1,
  61. }
  62. runner = app.test_cli_runner()
  63. result = runner.invoke(add_holidays, to_flags(cli_input))
  64. # Check result for 11 public holidays
  65. assert "'NL': 11" in result.output
  66. # Check database for 11 annotation entries
  67. assert (
  68. db.session.scalar(
  69. select(func.count())
  70. .select_from(Annotation)
  71. .join(AccountAnnotationRelationship)
  72. .filter(
  73. AccountAnnotationRelationship.account_id == cli_input["account"],
  74. AccountAnnotationRelationship.annotation_id == Annotation.id,
  75. )
  76. .join(DataSource)
  77. .filter(
  78. DataSource.id == Annotation.source_id,
  79. DataSource.name == "workalendar",
  80. DataSource.model == cli_input["country"],
  81. )
  82. )
  83. == 11
  84. )
  85. def test_cli_help(app):
  86. """Test that showing help does not throw an error."""
  87. from flexmeasures.cli import data_add
  88. runner = app.test_cli_runner()
  89. for cmd in get_click_commands(data_add):
  90. result = runner.invoke(cmd, ["--help"])
  91. check_command_ran_without_error(result)
  92. assert "Usage" in result.output
  93. @pytest.mark.skip_github
  94. def test_add_reporter(app, fresh_db, setup_dummy_data):
  95. """
  96. The reporter aggregates input data from two sensors (both have 200 data points)
  97. to a two-hour resolution.
  98. The command is run twice:
  99. - The first run is for ten hours, so you expect five results.
  100. - start and end are defined in the configuration: 2023-04-10T00:00 -> 2023-04-10T10:00
  101. - this step uses 10 hours of data -> outputs 5 periods of 2 hours
  102. - The second is run without timing params, so the rest of the data
  103. - start is the time of the latest report value
  104. - end is the time of latest input data value
  105. - this step uses 190 hours of data -> outputs 95 periods of 2 hours
  106. """
  107. from flexmeasures.cli.data_add import add_report
  108. sensor1_id, sensor2_id, report_sensor_id, _ = setup_dummy_data
  109. reporter_config = dict(
  110. required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
  111. required_output=[{"name": "df_agg"}],
  112. transformations=[
  113. dict(
  114. df_input="sensor_1",
  115. method="add",
  116. args=["@sensor_2"],
  117. df_output="df_agg",
  118. ),
  119. dict(method="resample_events", args=["2h"]),
  120. ],
  121. )
  122. # Running the command with start and end values.
  123. runner = app.test_cli_runner()
  124. cli_input_params = {
  125. "config": "reporter_config.yaml",
  126. "parameters": "parameters.json",
  127. "reporter": "PandasReporter",
  128. "start": "2023-04-10T00:00:00 00:00",
  129. "end": "2023-04-10T10:00:00 00:00",
  130. "output-file": "test.csv",
  131. }
  132. parameters = dict(
  133. input=[
  134. dict(name="sensor_1", sensor=sensor1_id),
  135. dict(name="sensor_2", sensor=sensor2_id),
  136. ],
  137. output=[dict(name="df_agg", sensor=report_sensor_id)],
  138. )
  139. cli_input = to_flags(cli_input_params)
  140. # store config into config
  141. cli_input.append("--save-config")
  142. # run test in an isolated file system
  143. with runner.isolated_filesystem():
  144. # save reporter_config to a json file
  145. with open("reporter_config.yaml", "w") as f:
  146. yaml.dump(reporter_config, f)
  147. with open("parameters.json", "w") as f:
  148. json.dump(parameters, f)
  149. # call command
  150. result = runner.invoke(add_report, cli_input)
  151. check_command_ran_without_error(result)
  152. report_sensor = fresh_db.session.get(
  153. Sensor, report_sensor_id
  154. ) # get fresh report sensor instance
  155. assert "Reporter PandasReporter found" in result.output
  156. assert f"Report computation done for sensor `{report_sensor}`." in result.output
  157. # Check report is saved to the database
  158. stored_report = report_sensor.search_beliefs(
  159. event_starts_after=cli_input_params.get("start").replace(" ", "+"),
  160. event_ends_before=cli_input_params.get("end").replace(" ", "+"),
  161. )
  162. assert (
  163. stored_report.values.T == [1, 2 + 3, 4 + 5, 6 + 7, 8 + 9]
  164. ).all() # check values
  165. assert os.path.exists("test.csv") # check that the file has been created
  166. assert (
  167. os.path.getsize("test.csv") > 100
  168. ) # bytes. Check that the file is not empty
  169. # Running the command without without timing params (start-offset/end-offset nor start/end).
  170. # This makes the command default the start time to the date of the last
  171. # value of the reporter sensor and the end time as the current time.
  172. previous_command_end = cli_input_params.get("end").replace(" ", "+")
  173. cli_input_params = {
  174. "source": stored_report.sources[0].id,
  175. "parameters": "parameters.json",
  176. "output-file": "test.csv",
  177. "timezone": "UTC",
  178. }
  179. cli_input = to_flags(cli_input_params)
  180. with runner.isolated_filesystem():
  181. # save reporter_config to a json file
  182. with open("reporter_config.json", "w") as f:
  183. json.dump(reporter_config, f)
  184. with open("parameters.json", "w") as f:
  185. json.dump(parameters, f)
  186. # call command
  187. result = runner.invoke(add_report, cli_input)
  188. check_command_ran_without_error(result)
  189. # Check if the report is saved to the database
  190. report_sensor = fresh_db.session.get(
  191. Sensor, report_sensor_id
  192. ) # get fresh report sensor instance
  193. assert (
  194. "Reporter `PandasReporter` fetched successfully from the database."
  195. in result.output
  196. )
  197. assert f"Report computation done for sensor `{report_sensor}`." in result.output
  198. stored_report = report_sensor.search_beliefs(
  199. event_starts_after=previous_command_end,
  200. event_ends_before=server_now(),
  201. )
  202. assert len(stored_report) == 95
  203. @pytest.mark.skip_github
  204. def test_add_multiple_output(app, fresh_db, setup_dummy_data):
  205. """ """
  206. from flexmeasures.cli.data_add import add_report
  207. sensor_1_id, sensor_2_id, report_sensor_id, report_sensor_2_id = setup_dummy_data
  208. reporter_config = dict(
  209. required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
  210. required_output=[{"name": "df_agg"}, {"name": "df_sub"}],
  211. transformations=[
  212. dict(
  213. df_input="sensor_1",
  214. method="add",
  215. args=["@sensor_2"],
  216. df_output="df_agg",
  217. ),
  218. dict(method="resample_events", args=["2h"]),
  219. dict(
  220. df_input="sensor_1",
  221. method="subtract",
  222. args=["@sensor_2"],
  223. df_output="df_sub",
  224. ),
  225. dict(method="resample_events", args=["2h"]),
  226. ],
  227. )
  228. # Running the command with start and end values.
  229. runner = app.test_cli_runner()
  230. cli_input_params = {
  231. "config": "reporter_config.yaml",
  232. "parameters": "parameters.json",
  233. "reporter": "PandasReporter",
  234. "start": "2023-04-10T00:00:00+00:00",
  235. "end": "2023-04-10T10:00:00+00:00",
  236. "output-file": "test-$name.csv",
  237. }
  238. parameters = dict(
  239. input=[
  240. dict(name="sensor_1", sensor=sensor_1_id),
  241. dict(name="sensor_2", sensor=sensor_2_id),
  242. ],
  243. output=[
  244. dict(name="df_agg", sensor=report_sensor_id),
  245. dict(name="df_sub", sensor=report_sensor_2_id),
  246. ],
  247. )
  248. cli_input = to_flags(cli_input_params)
  249. # run test in an isolated file system
  250. with runner.isolated_filesystem():
  251. # save reporter_config to a json file
  252. with open("reporter_config.yaml", "w") as f:
  253. yaml.dump(reporter_config, f)
  254. with open("parameters.json", "w") as f:
  255. json.dump(parameters, f)
  256. # call command
  257. result = runner.invoke(add_report, cli_input)
  258. check_command_ran_without_error(result)
  259. assert os.path.exists("test-df_agg.csv")
  260. assert os.path.exists("test-df_sub.csv")
  261. report_sensor = fresh_db.session.get(Sensor, report_sensor_id)
  262. report_sensor_2 = fresh_db.session.get(Sensor, report_sensor_2_id)
  263. assert "Reporter PandasReporter found" in result.output
  264. assert f"Report computation done for sensor `{report_sensor}`." in result.output
  265. assert (
  266. f"Report computation done for sensor `{report_sensor_2}`." in result.output
  267. )
  268. # check that the reports are saved
  269. assert all(
  270. report_sensor.search_beliefs(
  271. event_ends_before=datetime(2023, 4, 10, 10, tzinfo=pytz.UTC)
  272. ).values.flatten()
  273. == [1, 5, 9, 13, 17]
  274. )
  275. assert all(report_sensor_2.search_beliefs() == 0)
  276. @pytest.mark.skip_github
  277. @pytest.mark.parametrize("process_type", [("INFLEXIBLE"), ("SHIFTABLE"), ("BREAKABLE")])
  278. def test_add_process(
  279. app, process_power_sensor, process_type, add_market_prices_fresh_db, db
  280. ):
  281. """
  282. Schedule a 4h of consumption block at a constant power of 400kW in a day using
  283. the three process policies: INFLEXIBLE, SHIFTABLE and BREAKABLE.
  284. """
  285. from flexmeasures.cli.data_add import add_schedule_process
  286. epex_da = get_test_sensor(db)
  287. process_power_sensor_id = process_power_sensor
  288. cli_input_params = {
  289. "sensor": process_power_sensor_id,
  290. "start": "2015-01-02T00:00:00+01:00",
  291. "duration": "PT24H",
  292. "process-duration": "PT4H",
  293. "process-power": "0.4MW",
  294. "process-type": process_type,
  295. "consumption-price-sensor": epex_da.id,
  296. "forbid": '{"start" : "2015-01-02T00:00:00+01:00", "duration" : "PT2H"}',
  297. }
  298. cli_input = to_flags(cli_input_params)
  299. runner = app.test_cli_runner()
  300. # call command
  301. result = runner.invoke(add_schedule_process, cli_input)
  302. check_command_ran_without_error(result)
  303. process_power_sensor = db.session.get(Sensor, process_power_sensor_id)
  304. schedule = process_power_sensor.search_beliefs()
  305. # check if the schedule is not empty more detailed testing can be found
  306. # in data/models/planning/tests/test_process.py.
  307. assert (schedule == -0.4).event_value.sum() == 4
  308. @pytest.mark.skip_github
  309. @pytest.mark.parametrize(
  310. "event_resolution, name, success",
  311. [("PT20M", "ONE", True), (15, "TWO", True), ("some_string", "THREE", False)],
  312. )
  313. def test_add_sensor(app, db, setup_dummy_asset, event_resolution, name, success):
  314. from flexmeasures.cli.data_add import add_sensor
  315. asset = setup_dummy_asset
  316. runner = app.test_cli_runner()
  317. cli_input = {
  318. "name": name,
  319. "event-resolution": event_resolution,
  320. "unit": "kWh",
  321. "asset": asset,
  322. "timezone": "UTC",
  323. }
  324. runner = app.test_cli_runner()
  325. result = runner.invoke(add_sensor, to_flags(cli_input))
  326. sensor: Sensor = db.session.execute(
  327. select(Sensor).filter_by(name=name)
  328. ).scalar_one_or_none()
  329. if success:
  330. check_command_ran_without_error(result)
  331. sensor.unit == "kWh"
  332. else:
  333. assert result.exit_code == 1
  334. assert sensor is None
  335. @pytest.mark.skip_github
  336. @pytest.mark.parametrize(
  337. "name, consultancy_account_id, success",
  338. [
  339. ("Test ConsultancyClient Account", 1, False),
  340. ("Test CLIConsultancyClient Account", 2, True),
  341. ("Test Account", None, True),
  342. ],
  343. )
  344. def test_add_account(
  345. app, fresh_db, setup_accounts_fresh_db, name, consultancy_account_id, success
  346. ):
  347. """Test adding a new account."""
  348. from flexmeasures.cli.data_add import new_account
  349. cli_input = {
  350. "name": name,
  351. "roles": "TestRole",
  352. "consultancy": consultancy_account_id,
  353. }
  354. runner = app.test_cli_runner()
  355. result = runner.invoke(new_account, to_flags(cli_input))
  356. if success:
  357. assert "successfully created." in result.output
  358. account = fresh_db.session.execute(
  359. select(Account).filter_by(name=cli_input["name"])
  360. ).scalar_one_or_none()
  361. assert account.consultancy_account_id == consultancy_account_id
  362. else:
  363. # fail because "Test ConsultancyClient Account" already exists
  364. assert result.exit_code == 1
  365. @pytest.mark.skip_github
  366. @pytest.mark.parametrize("storage_power_capacity", ["sensor", "quantity", None])
  367. @pytest.mark.parametrize("storage_efficiency", ["sensor", "quantity", None])
  368. def test_add_storage_schedule(
  369. app,
  370. add_market_prices_fresh_db,
  371. storage_schedule_sensors,
  372. storage_power_capacity,
  373. storage_efficiency,
  374. db,
  375. ):
  376. """
  377. Test the 'flexmeasures add schedule for-storage' CLI command for adding storage schedules.
  378. This test evaluates the command's functionality in creating storage schedules for different configurations
  379. of power capacity and storage efficiency. It uses a combination of sensor-based and manually specified values
  380. for these parameters.
  381. The test performs the following steps:
  382. 1. Simulates running the `flexmeasures add toy-account` command to set up a test account.
  383. 2. Configures CLI input parameters for scheduling, including the start time, duration, and sensor IDs.
  384. The test also sets up parameters for state of charge at start and roundtrip efficiency.
  385. 3. Depending on the test parameters, adjusts power capacity and efficiency settings. These settings can be
  386. either sensor-based (retrieved from storage_schedule_sensors fixture), manually specified quantities,
  387. or left undefined.
  388. 4. Executes the 'add_schedule_for_storage' command with the configured parameters.
  389. 5. Verifies that the command executes successfully (exit code 0) and that the correct number of scheduled
  390. values (48 for a 12-hour period with 15-minute resolution) are created for the power sensor.
  391. """
  392. power_capacity_sensor, storage_efficiency_sensor = storage_schedule_sensors
  393. from flexmeasures.cli.data_add import add_schedule_for_storage, add_toy_account
  394. runner = app.test_cli_runner()
  395. runner.invoke(add_toy_account)
  396. toy_account = db.session.execute(
  397. select(Account).filter_by(name="Toy Account")
  398. ).scalar_one_or_none()
  399. battery = db.session.execute(
  400. select(Asset).filter_by(name="toy-battery", owner=toy_account)
  401. ).scalar_one_or_none()
  402. power_sensor = battery.sensors[0]
  403. prices = add_market_prices_fresh_db["epex_da"]
  404. cli_input_params = {
  405. "start": "2014-12-31T23:00:00+00",
  406. "duration": "PT12H",
  407. "sensor": battery.sensors[0].id,
  408. "consumption-price-sensor": prices.id,
  409. "soc-at-start": "50%",
  410. "roundtrip-efficiency": "90%",
  411. }
  412. if storage_power_capacity is not None:
  413. if storage_power_capacity == "sensor":
  414. cli_input_params["storage-consumption-capacity"] = (
  415. f"sensor:{power_capacity_sensor}"
  416. )
  417. cli_input_params["storage-production-capacity"] = (
  418. f"sensor:{power_capacity_sensor}"
  419. )
  420. else:
  421. cli_input_params["storage-consumption-capacity"] = "700kW"
  422. cli_input_params["storage-production-capacity"] = "700kW"
  423. if storage_efficiency is not None:
  424. if storage_efficiency == "sensor":
  425. cli_input_params["storage-efficiency"] = (
  426. f"sensor:{storage_efficiency_sensor}"
  427. )
  428. else:
  429. cli_input_params["storage-efficiency"] = "90%"
  430. cli_input = to_flags(cli_input_params)
  431. result = runner.invoke(add_schedule_for_storage, cli_input)
  432. check_command_ran_without_error(result)
  433. assert len(power_sensor.search_beliefs()) == 48