test_aggregator.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. import pytest
  2. from flexmeasures.data.models.reporting.aggregator import AggregatorReporter
  3. from flexmeasures.data.models.data_sources import DataSource
  4. from datetime import datetime
  5. from pytz import utc, timezone
  6. import pandas as pd
  7. @pytest.mark.parametrize(
  8. "aggregation_method, expected_value",
  9. [
  10. ("sum", 0),
  11. ("mean", 0),
  12. ("var", 2),
  13. ("std", 2**0.5),
  14. ("max", 1),
  15. ("min", -1),
  16. ("prod", -1),
  17. ("median", 0),
  18. ],
  19. )
  20. def test_aggregator(setup_dummy_data, aggregation_method, expected_value, db):
  21. """
  22. This test computes the aggregation of two sensors containing 24 entries
  23. with value 1 and -1, respectively, for sensors 1 and 2.
  24. Test cases:
  25. 1) sum: 0 = 1 + (-1)
  26. 2) mean: 0 = ((1) + (-1))/2
  27. 3) var: 2 = (1)^2 + (-1)^2
  28. 4) std: sqrt(2) = sqrt((1)^2 + (-1)^2)
  29. 5) max: 1 = max(1, -1)
  30. 6) min: -1 = min(1, -1)
  31. 7) prod: -1 = (1) * (-1)
  32. 8) median: even number of elements, mean of the most central elements, 0 = ((1) + (-1))/2
  33. """
  34. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  35. agg_reporter = AggregatorReporter(method=aggregation_method)
  36. source_1 = db.session.get(DataSource, 1)
  37. source_2 = db.session.get(DataSource, 2)
  38. result = agg_reporter.compute(
  39. input=[dict(sensor=s1, source=source_1), dict(sensor=s2, source=source_2)],
  40. output=[dict(sensor=report_sensor)],
  41. start=datetime(2023, 5, 10, tzinfo=utc),
  42. end=datetime(2023, 5, 11, tzinfo=utc),
  43. )[0]["data"]
  44. # check that we got a result for 24 hours
  45. assert len(result) == 24
  46. # check that the value is equal to expected_value
  47. assert (result == expected_value).all().event_value
  48. @pytest.mark.parametrize(
  49. "weight_1, weight_2, expected_result",
  50. [(1, 1, 0), (1, -1, 2), (2, 0, 2), (0, 2, -2)],
  51. )
  52. def test_aggregator_reporter_weights(
  53. setup_dummy_data, weight_1, weight_2, expected_result, db
  54. ):
  55. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  56. reporter_config = dict(method="sum", weights={"s1": weight_1, "sensor_2": weight_2})
  57. source_1 = db.session.get(DataSource, 1)
  58. source_2 = db.session.get(DataSource, 2)
  59. agg_reporter = AggregatorReporter(config=reporter_config)
  60. result = agg_reporter.compute(
  61. input=[
  62. dict(name="s1", sensor=s1, source=source_1),
  63. dict(sensor=s2, source=source_2),
  64. ],
  65. output=[dict(sensor=report_sensor)],
  66. start=datetime(2023, 5, 10, tzinfo=utc),
  67. end=datetime(2023, 5, 11, tzinfo=utc),
  68. )[0]["data"]
  69. # check that we got a result for 24 hours
  70. assert len(result) == 24
  71. # check that the value is equal to expected_value
  72. assert (result == expected_result).all().event_value
  73. def test_dst_transition(setup_dummy_data, db):
  74. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  75. agg_reporter = AggregatorReporter()
  76. tz = timezone("Europe/Amsterdam")
  77. # transition from winter (CET) to summer (CEST)
  78. result = agg_reporter.compute(
  79. input=[dict(sensor=s3, source=db.session.get(DataSource, 1))],
  80. output=[dict(sensor=report_sensor)],
  81. start=tz.localize(datetime(2023, 3, 26)),
  82. end=tz.localize(datetime(2023, 3, 27)),
  83. belief_time=tz.localize(datetime(2023, 12, 1)),
  84. )[0]["data"]
  85. assert len(result) == 23
  86. # transition from summer (CEST) to winter (CET)
  87. result = agg_reporter.compute(
  88. input=[dict(sensor=s3, source=db.session.get(DataSource, 1))],
  89. output=[dict(sensor=report_sensor)],
  90. start=tz.localize(datetime(2023, 10, 29)),
  91. end=tz.localize(datetime(2023, 10, 30)),
  92. belief_time=tz.localize(datetime(2023, 12, 1)),
  93. )[0]["data"]
  94. assert len(result) == 25
  95. def test_resampling(setup_dummy_data, db):
  96. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  97. agg_reporter = AggregatorReporter()
  98. tz = timezone("Europe/Amsterdam")
  99. # transition from winter (CET) to summer (CEST)
  100. result = agg_reporter.compute(
  101. start=tz.localize(datetime(2023, 3, 27)),
  102. end=tz.localize(datetime(2023, 3, 28)),
  103. input=[dict(sensor=s3, source=db.session.get(DataSource, 1))],
  104. output=[dict(sensor=daily_report_sensor, source=db.session.get(DataSource, 1))],
  105. belief_time=tz.localize(datetime(2023, 12, 1)),
  106. resolution=pd.Timedelta("1D"),
  107. )[0]["data"]
  108. assert result.event_starts[0] == pd.Timestamp(
  109. year=2023, month=3, day=27, tz="Europe/Amsterdam"
  110. )
  111. # transition from summer (CEST) to winter (CET)
  112. result = agg_reporter.compute(
  113. start=tz.localize(datetime(2023, 10, 29)),
  114. end=tz.localize(datetime(2023, 10, 30)),
  115. input=[dict(sensor=s3, source=db.session.get(DataSource, 1))],
  116. output=[dict(sensor=daily_report_sensor, source=db.session.get(DataSource, 1))],
  117. belief_time=tz.localize(datetime(2023, 12, 1)),
  118. resolution=pd.Timedelta("1D"),
  119. )[0]["data"]
  120. assert result.event_starts[0] == pd.Timestamp(
  121. year=2023, month=10, day=29, tz="Europe/Amsterdam"
  122. )
  123. def test_source_transition(setup_dummy_data, db):
  124. """The first 13 hours of the time window "belong" to Source 1 and are filled with 1.0.
  125. From 12:00 to 24:00, there are events belonging to Source 2 with value -1.
  126. We expect the reporter to use only the values defined in the `sources` array in the `input` field.
  127. In case of encountering more than one source per event, the first source defined in the sources
  128. array is prioritized.
  129. """
  130. s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
  131. agg_reporter = AggregatorReporter()
  132. tz = timezone("UTC")
  133. ds1 = db.session.get(DataSource, 1)
  134. ds2 = db.session.get(DataSource, 2)
  135. # considering DataSource 1 and 2
  136. result = agg_reporter.compute(
  137. start=tz.localize(datetime(2023, 4, 24)),
  138. end=tz.localize(datetime(2023, 4, 25)),
  139. input=[dict(sensor=s3, sources=[ds1, ds2])],
  140. output=[dict(sensor=report_sensor)],
  141. belief_time=tz.localize(datetime(2023, 12, 1)),
  142. )[0]["data"]
  143. assert len(result) == 24
  144. assert (
  145. (result[:13] == 1).all().event_value
  146. ) # the data from the first source is used
  147. assert (result[13:] == -1).all().event_value
  148. # only considering DataSource 1
  149. result = agg_reporter.compute(
  150. start=tz.localize(datetime(2023, 4, 24)),
  151. end=tz.localize(datetime(2023, 4, 25)),
  152. input=[dict(sensor=s3, sources=[ds1])],
  153. output=[dict(sensor=report_sensor)],
  154. belief_time=tz.localize(datetime(2023, 12, 1)),
  155. )[0]["data"]
  156. assert len(result) == 13
  157. assert (result == 1).all().event_value
  158. # only considering DataSource 2
  159. result = agg_reporter.compute(
  160. start=tz.localize(datetime(2023, 4, 24)),
  161. end=tz.localize(datetime(2023, 4, 25)),
  162. input=[dict(sensor=s3, sources=[ds2])],
  163. output=[dict(sensor=report_sensor)],
  164. belief_time=tz.localize(datetime(2023, 12, 1)),
  165. )[0]["data"]
  166. assert len(result) == 12
  167. assert (result == -1).all().event_value
  168. # if no source is passed, the reporter should raise a ValueError
  169. # as there are events with different data sources in the report time period.
  170. # This is important, for instance, for sensors containing power and scheduled values
  171. # where we could get beliefs from both sources.
  172. with pytest.raises(ValueError):
  173. result = agg_reporter.compute(
  174. start=tz.localize(datetime(2023, 4, 24)),
  175. end=tz.localize(datetime(2023, 4, 25)),
  176. input=[dict(sensor=s3)],
  177. output=[dict(sensor=report_sensor)],
  178. belief_time=tz.localize(datetime(2023, 12, 1)),
  179. )[0]["data"]
  180. # The exception to the above is when a new version of the same source recorded a value,
  181. # in which case the latest version takes precedence. This happened in the last hour of the day.
  182. result = agg_reporter.compute(
  183. start=tz.localize(datetime(2023, 4, 24, 18, 0)),
  184. end=tz.localize(datetime(2023, 4, 25)),
  185. input=[dict(sensor=s3)],
  186. output=[dict(sensor=report_sensor)],
  187. belief_time=tz.localize(datetime(2023, 12, 1)),
  188. )[0]["data"]
  189. assert (result[:5] == -1).all().event_value # beliefs from the older version
  190. assert (result[5:] == 3).all().event_value # belief from the latest version
  191. # If we exclude source type "A" (source 1 is of that type) we should get the same result.
  192. same_result = agg_reporter.compute(
  193. start=tz.localize(datetime(2023, 4, 24, 18, 0)),
  194. end=tz.localize(datetime(2023, 4, 25)),
  195. input=[dict(sensor=s3, exclude_source_types=["A"])],
  196. output=[dict(sensor=report_sensor)],
  197. belief_time=tz.localize(datetime(2023, 12, 1)),
  198. )[0]["data"]
  199. assert (same_result == result).all().event_value
  200. # If we exclude source type "B" (both versions of source 2 are of that type) we should get an empty result
  201. result = agg_reporter.compute(
  202. start=tz.localize(datetime(2023, 4, 24, 18, 0)),
  203. end=tz.localize(datetime(2023, 4, 25)),
  204. input=[dict(sensor=s3, exclude_source_types=["B"])],
  205. output=[dict(sensor=report_sensor)],
  206. belief_time=tz.localize(datetime(2023, 12, 1)),
  207. )[0]["data"]
  208. assert result.empty
  209. # If we set use_latest_version_per_event=False, we should get both versions of source 2,
  210. # and one_deterministic_belief_per_event=True kicks in to give back the most recent version
  211. result = agg_reporter.compute(
  212. start=tz.localize(datetime(2023, 4, 24, 18, 0)),
  213. end=tz.localize(datetime(2023, 4, 25)),
  214. input=[dict(sensor=s3, use_latest_version_per_event=False)],
  215. output=[dict(sensor=report_sensor)],
  216. belief_time=tz.localize(datetime(2023, 12, 1)),
  217. )[0]["data"]
  218. assert len(result) == 6
  219. assert (result[:5] == -1).all().event_value # beliefs from the older version
  220. assert (result[5:] == 3).all().event_value # belief from the latest version