test_pandas_reporter.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. import pytest
  2. from datetime import datetime
  3. from pytz import utc
  4. from flexmeasures.data.models.reporting.pandas_reporter import PandasReporter
  5. def test_reporter(app, setup_dummy_data):
  6. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  7. reporter_config = dict(
  8. required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
  9. required_output=[{"name": "df_merge"}],
  10. transformations=[
  11. dict(
  12. df_input="sensor_1",
  13. df_output="sensor_1_source_1",
  14. method="xs",
  15. args=["@source_1"],
  16. kwargs=dict(level=2),
  17. ),
  18. dict(
  19. df_input="sensor_2",
  20. df_output="sensor_2_source_1",
  21. method="xs",
  22. args=["@source_1"],
  23. kwargs=dict(level=2),
  24. ),
  25. dict(
  26. df_output="df_merge",
  27. df_input="sensor_1_source_1",
  28. method="merge",
  29. args=["@sensor_2_source_1"],
  30. kwargs=dict(on="event_start", suffixes=("_sensor1", "_sensor2")),
  31. ),
  32. dict(method="resample", args=["2h"]),
  33. dict(method="mean"),
  34. dict(method="sum", kwargs=dict(axis=1)),
  35. ],
  36. )
  37. reporter = PandasReporter(config=reporter_config)
  38. start = datetime(2023, 4, 10, tzinfo=utc)
  39. end = datetime(2023, 4, 10, 10, tzinfo=utc)
  40. input = [dict(name="sensor_1", sensor=s1), dict(name="sensor_2", sensor=s2)]
  41. output = [dict(name="df_merge", sensor=report_sensor)]
  42. report1 = reporter.compute(start=start, end=end, input=input, output=output)
  43. result = report1[0]["data"]
  44. assert len(result) == 5
  45. assert str(result.event_starts[0]) == "2023-04-10 00:00:00+00:00"
  46. assert (
  47. result.sensor == report_sensor
  48. ) # check that the output sensor is effectively assigned.
  49. data_source_name = app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE")
  50. data_source_type = "reporter"
  51. assert all(
  52. (source.name == data_source_name) and (source.type == data_source_type)
  53. for source in result.sources
  54. ) # check data source is assigned
  55. # check that calling compute with different parameters changes the result
  56. report2 = reporter.compute(
  57. start=datetime(2023, 4, 10, 3, tzinfo=utc), end=end, input=input, output=output
  58. )
  59. result2 = report2[0]["data"]
  60. assert len(result2) == 4
  61. assert str(result2.event_starts[0]) == "2023-04-10 02:00:00+00:00"
  62. def test_reporter_repeated(setup_dummy_data):
  63. """check that calling compute doesn't change the result"""
  64. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  65. reporter_config = dict(
  66. required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
  67. required_output=[{"name": "df_merge"}],
  68. transformations=[
  69. dict(
  70. df_input="sensor_1",
  71. df_output="sensor_1_source_1",
  72. method="xs",
  73. args=["@source_1"],
  74. kwargs=dict(level=2),
  75. ),
  76. dict(
  77. df_input="sensor_2",
  78. df_output="sensor_2_source_1",
  79. method="xs",
  80. args=["@source_1"],
  81. kwargs=dict(level=2),
  82. ),
  83. dict(
  84. df_output="df_merge",
  85. df_input="sensor_1_source_1",
  86. method="merge",
  87. args=["@sensor_2_source_1"],
  88. kwargs=dict(on="event_start", suffixes=("_sensor1", "_sensor2")),
  89. ),
  90. dict(method="resample", args=["2h"]),
  91. dict(method="mean"),
  92. dict(method="sum", kwargs=dict(axis=1)),
  93. ],
  94. )
  95. parameters = dict(
  96. start="2023-04-10T00:00:00 00:00",
  97. end="2023-04-10T10:00:00 00:00",
  98. input=[
  99. dict(name="sensor_1", sensor=s1.id),
  100. dict(name="sensor_2", sensor=s2.id),
  101. ],
  102. output=[dict(name="df_merge", sensor=report_sensor.id)],
  103. )
  104. reporter = PandasReporter(config=reporter_config)
  105. report1 = reporter.compute(parameters=parameters)
  106. report2 = reporter.compute(parameters=parameters)
  107. assert all(report2[0]["data"].values == report1[0]["data"].values)
  108. def test_reporter_empty(setup_dummy_data):
  109. """check that calling compute with missing data returns an empty report"""
  110. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  111. config = dict(
  112. required_input=[{"name": "sensor_1"}],
  113. required_output=[{"name": "sensor_1"}],
  114. transformations=[],
  115. )
  116. reporter = PandasReporter(config=config)
  117. # compute report on available data
  118. report = reporter.compute(
  119. start=datetime(2023, 4, 10, tzinfo=utc),
  120. end=datetime(2023, 4, 10, 10, tzinfo=utc),
  121. input=[dict(name="sensor_1", sensor=s1)],
  122. output=[dict(name="sensor_1", sensor=report_sensor)],
  123. )
  124. assert not report[0]["data"].empty
  125. # compute report on dates with no data available
  126. report = reporter.compute(
  127. sensor=report_sensor,
  128. start=datetime(2021, 4, 10, tzinfo=utc),
  129. end=datetime(2021, 4, 10, 10, tzinfo=utc),
  130. input=[dict(name="sensor_1", sensor=s1)],
  131. output=[dict(name="sensor_1", sensor=report_sensor)],
  132. )
  133. assert report[0]["data"].empty
  134. def test_pandas_reporter_unit_conversion(app, setup_dummy_data):
  135. """
  136. Check that the unit conversion feature can handle the following cases:
  137. - kW -> kW
  138. - kW -> MW
  139. - kW -> MWh
  140. - kW -> W -> kW
  141. """
  142. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  143. reporter_config = dict(
  144. required_input=[
  145. {"name": "sensor_4"},
  146. {"name": "sensor_4_kw"},
  147. {"name": "sensor_4_mw", "unit": "MW"},
  148. {"name": "sensor_4_mwh", "unit": "MWh"},
  149. ],
  150. required_output=[
  151. {"name": "sensor_4_kw"},
  152. {"name": "sensor_4_mw"},
  153. {"name": "sensor_4_mwh"},
  154. # Assume that the internal operations that produce sensor_4_output_w have "W"
  155. {"name": "sensor_4_output_w", "unit": "W"},
  156. ],
  157. transformations=[
  158. {"df_input": "sensor_4", "method": "copy", "df_output": "sensor_4_output_w"}
  159. ],
  160. )
  161. reporter = PandasReporter(config=reporter_config)
  162. start = datetime(2023, 1, 1, tzinfo=utc)
  163. end = datetime(2023, 1, 2, tzinfo=utc)
  164. input = [
  165. dict(name="sensor_4", sensor=s4),
  166. dict(name="sensor_4_kw", sensor=s4),
  167. dict(name="sensor_4_mw", sensor=s4),
  168. dict(name="sensor_4_mwh", sensor=s4),
  169. ]
  170. output = [
  171. dict(name="sensor_4_kw", sensor=s4),
  172. dict(name="sensor_4_mw", sensor=s4),
  173. dict(name="sensor_4_mwh", sensor=s4),
  174. dict(name="sensor_4_output_w", sensor=s4),
  175. ]
  176. report = reporter.compute(start=start, end=end, input=input, output=output)
  177. result_kw = report[0]["data"]
  178. result_mw = report[1]["data"]
  179. result_mwh = report[2]["data"]
  180. result_output_w = report[3]["data"]
  181. # MW = kW / 1000
  182. assert (result_mw.event_value.values == result_kw.event_value.values / 1000).all()
  183. # MWh = MW * 0.25 (resolution = 15 min)
  184. assert (result_mwh.event_value.values == result_mw.event_value.values * 0.25).all()
  185. # Input is in kW; the operations transform the data to produce values in W and it transforms the values to the output sensor unit (kW).
  186. # In summary, Input = 1 kW -(copy the values)-> 1 W -> 0.001 kW
  187. assert (
  188. result_output_w.event_value.values == result_kw.event_value.values * 0.001
  189. ).all()
  190. @pytest.mark.parametrize("shortcut", [True, False])
  191. def test_pandas_reporter_valid_range(app, setup_dummy_data, shortcut):
  192. """
  193. Check that we can select a valid range of values, where values outside the range are dropped.
  194. If shortcut=True, we test a shorter approach (fewer transformations) using pd.eval.
  195. """
  196. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  197. range = [3, 7]
  198. if shortcut:
  199. transformations = [
  200. {
  201. "df_input": "any values",
  202. "method": "eval",
  203. "args": [f"event_value > {range[0]} & event_value < {range[1]}"],
  204. "df_output": "mask",
  205. },
  206. {
  207. "df_input": "any values",
  208. "method": "where",
  209. "args": ["@mask"],
  210. "df_output": "ranged values",
  211. },
  212. {
  213. # "df_input": "ranged values", # redundant: defaults to previous df_output
  214. "method": "dropna",
  215. # "df_output": "ranged values", # redundant: defaults to current df_input
  216. },
  217. ]
  218. else:
  219. transformations = [
  220. {
  221. "df_input": "any values",
  222. "method": "gt",
  223. "args": [range[0]],
  224. "df_output": "gt_value",
  225. },
  226. {
  227. "df_input": "any values",
  228. "method": "lt",
  229. "args": [range[1]],
  230. "df_output": "lt_value",
  231. },
  232. {
  233. "df_input": "any values",
  234. "method": "where",
  235. "args": ["@gt_value"],
  236. "df_output": "ranged values",
  237. },
  238. {
  239. # "df_input": "ranged values", # redundant: defaults to previous df_output
  240. "method": "where",
  241. "args": ["@lt_value"],
  242. # "df_output": "ranged values", # redundant: defaults to current df_input
  243. },
  244. {
  245. # "df_input": "ranged values", # redundant: defaults to previous df_output
  246. "method": "dropna",
  247. # "df_output": "ranged values", # redundant: defaults to current df_input
  248. },
  249. ]
  250. reporter_config = dict(
  251. required_input=[
  252. {"name": "any values"},
  253. ],
  254. required_output=[
  255. {"name": "ranged values"},
  256. ],
  257. transformations=transformations,
  258. )
  259. reporter = PandasReporter(config=reporter_config)
  260. start = datetime(2023, 4, 10, tzinfo=utc)
  261. end = datetime(2023, 4, 11, tzinfo=utc)
  262. input = [
  263. dict(name="any values", sensor=s1),
  264. ]
  265. output = [
  266. dict(name="ranged values", sensor=s1),
  267. ]
  268. report = reporter.compute(start=start, end=end, input=input, output=output)
  269. result = report[0]["data"]
  270. # Check that some values were originally outside the range
  271. original_values = s1.search_beliefs(
  272. event_starts_after=start,
  273. event_ends_before=end,
  274. )
  275. assert not (original_values.event_value.values > range[0]).all()
  276. assert not (original_values.event_value.values < range[-1]).all()
  277. # Check that all values are now inside the range
  278. assert (result.event_value.values > range[0]).all()
  279. assert (result.event_value.values < range[1]).all()