test_queries.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. from __future__ import annotations
  2. from datetime import datetime, timedelta
  3. import numpy as np
  4. import pandas as pd
  5. import pytest
  6. import pytz
  7. import timely_beliefs as tb
  8. from sqlalchemy import select
  9. from flexmeasures.data.models.data_sources import DataSource
  10. from flexmeasures.data.models.planning.utils import initialize_index
  11. from flexmeasures.data.models.time_series import Sensor, TimedBelief
  12. from flexmeasures.data.queries.utils import (
  13. multiply_dataframe_with_deterministic_beliefs,
  14. simplify_index,
  15. )
  16. from flexmeasures.tests.utils import get_test_sensor
  17. @pytest.mark.parametrize(
  18. "query_start, query_end, num_values",
  19. [
  20. (
  21. datetime(2015, 1, 1, tzinfo=pytz.utc),
  22. datetime(2015, 1, 2, tzinfo=pytz.utc),
  23. 96,
  24. ),
  25. (datetime(2015, 1, 1, tzinfo=pytz.utc), None, 96),
  26. (None, datetime(2015, 1, 2, tzinfo=pytz.utc), 96),
  27. (None, None, 96),
  28. (
  29. datetime(2015, 1, 1, tzinfo=pytz.utc),
  30. datetime(2015, 1, 1, 12, tzinfo=pytz.utc),
  31. 48,
  32. ),
  33. (None, datetime(2015, 1, 1, 12, tzinfo=pytz.utc), 48),
  34. # (
  35. # datetime(1957, 1, 1, tzinfo=pytz.utc),
  36. # datetime(1957, 1, 2, tzinfo=pytz.utc),
  37. # 0,
  38. # ), # test empty BeliefsDataFrame # todo: uncomment when this if fixed: https://github.com/pandas-dev/pandas/issues/30517
  39. ],
  40. )
  41. def test_collect_power(db, app, query_start, query_end, num_values, setup_test_data):
  42. # asset has only 1 power sensor
  43. wind_device_1: Sensor = setup_test_data["wind-asset-1"].sensors[0]
  44. data = db.session.scalars(
  45. select(TimedBelief).filter(TimedBelief.sensor_id == wind_device_1.id)
  46. ).all()
  47. print(data)
  48. bdf: tb.BeliefsDataFrame = TimedBelief.search(
  49. wind_device_1,
  50. event_starts_after=query_start,
  51. event_ends_before=query_end,
  52. )
  53. print(bdf)
  54. assert (
  55. bdf.index.names[0] == "event_start"
  56. ) # first index level of collect function should be event_start, so that df.loc[] refers to event_start
  57. assert pd.api.types.is_timedelta64_dtype(
  58. bdf.convert_index_from_belief_time_to_horizon().index.get_level_values(
  59. "belief_horizon"
  60. )
  61. ) # dtype of belief_horizon is timedelta64[ns], so the minimum horizon on an empty BeliefsDataFrame is NaT instead of NaN
  62. assert len(bdf) == num_values
  63. for v1, v2 in zip(bdf["event_value"].tolist(), data):
  64. assert abs(v1 - v2.event_value) < 10**-6
  65. @pytest.mark.parametrize(
  66. "query_start, query_end, resolution, num_values",
  67. [
  68. (
  69. datetime(2015, 1, 1, tzinfo=pytz.utc),
  70. datetime(2015, 1, 2, tzinfo=pytz.utc),
  71. timedelta(minutes=15),
  72. 96,
  73. ),
  74. (
  75. datetime(2015, 1, 1, tzinfo=pytz.utc),
  76. datetime(2015, 1, 2, tzinfo=pytz.utc),
  77. timedelta(minutes=30),
  78. 48,
  79. ),
  80. (
  81. datetime(2015, 1, 1, tzinfo=pytz.utc),
  82. datetime(2015, 1, 2, tzinfo=pytz.utc),
  83. "30min",
  84. 48,
  85. ),
  86. (
  87. datetime(2015, 1, 1, tzinfo=pytz.utc),
  88. datetime(2015, 1, 2, tzinfo=pytz.utc),
  89. "PT45M",
  90. 32,
  91. ),
  92. ],
  93. )
  94. def test_collect_power_resampled(
  95. db, app, query_start, query_end, resolution, num_values, setup_test_data
  96. ):
  97. # asset has only 1 power sensor
  98. wind_device_1: Sensor = setup_test_data["wind-asset-1"].sensors[0]
  99. bdf: tb.BeliefsDataFrame = TimedBelief.search(
  100. wind_device_1.name,
  101. event_starts_after=query_start,
  102. event_ends_before=query_end,
  103. resolution=resolution,
  104. most_recent_beliefs_only=True,
  105. )
  106. print(bdf)
  107. assert len(bdf) == num_values
  108. def test_multiplication():
  109. df1 = pd.DataFrame(
  110. [[30.0, timedelta(hours=3)]],
  111. index=initialize_index("2000-01-01 10:00", "2000-01-01 15:00", resolution="1h"),
  112. columns=["event_value", "belief_horizon"],
  113. )
  114. df2 = pd.DataFrame(
  115. [[10.0, timedelta(hours=1)]],
  116. index=initialize_index("2000-01-01 13:00", "2000-01-01 18:00", resolution="1h"),
  117. columns=["event_value", "belief_horizon"],
  118. )
  119. df = multiply_dataframe_with_deterministic_beliefs(df1, df2)
  120. df_compare = pd.concat(
  121. [
  122. pd.DataFrame(
  123. [[np.nan, timedelta(hours=3)]],
  124. index=initialize_index(
  125. "2000-01-01 10:00", "2000-01-01 13:00", resolution="1h"
  126. ),
  127. columns=["event_value", "belief_horizon"],
  128. ),
  129. pd.DataFrame(
  130. [[300.0, timedelta(hours=1)]],
  131. index=initialize_index(
  132. "2000-01-01 13:00", "2000-01-01 15:00", resolution="1h"
  133. ),
  134. columns=["event_value", "belief_horizon"],
  135. ),
  136. pd.DataFrame(
  137. [[np.nan, timedelta(hours=1)]],
  138. index=initialize_index(
  139. "2000-01-01 15:00", "2000-01-01 18:00", resolution="1h"
  140. ),
  141. columns=["event_value", "belief_horizon"],
  142. ),
  143. ],
  144. axis=0,
  145. )
  146. pd.testing.assert_frame_equal(df, df_compare)
  147. def test_multiplication_with_one_empty_dataframe():
  148. df1 = pd.DataFrame(
  149. [],
  150. columns=["event_value", "belief_horizon"],
  151. )
  152. # set correct types
  153. df1["event_value"] = pd.to_numeric(df1["event_value"])
  154. df1["belief_horizon"] = pd.to_timedelta(df1["belief_horizon"])
  155. df2 = pd.DataFrame(
  156. [[10.0, timedelta(hours=1)]],
  157. index=initialize_index("2000-01-01 13:00", "2000-01-01 18:00", resolution="1h"),
  158. columns=["event_value", "belief_horizon"],
  159. )
  160. df_compare = pd.DataFrame(
  161. [[np.nan, timedelta(hours=1)]],
  162. index=initialize_index("2000-01-01 13:00", "2000-01-01 18:00", resolution="1h"),
  163. columns=["event_value", "belief_horizon"],
  164. )
  165. # set correct types
  166. df_compare["event_value"] = pd.to_numeric(df_compare["event_value"])
  167. df_compare["belief_horizon"] = pd.to_timedelta(df_compare["belief_horizon"])
  168. df = multiply_dataframe_with_deterministic_beliefs(df1, df2)
  169. pd.testing.assert_frame_equal(df, df_compare)
  170. def test_multiplication_with_both_empty_dataframe():
  171. df1 = pd.DataFrame(
  172. [],
  173. columns=["event_value", "belief_horizon"],
  174. )
  175. # set correct types
  176. df1["event_value"] = pd.to_numeric(df1["event_value"])
  177. df1["belief_horizon"] = pd.to_timedelta(df1["belief_horizon"])
  178. df2 = pd.DataFrame(
  179. [],
  180. columns=["event_value", "belief_horizon"],
  181. )
  182. # set correct types
  183. df2["event_value"] = pd.to_numeric(df2["event_value"])
  184. df2["belief_horizon"] = pd.to_timedelta(df2["belief_horizon"])
  185. df_compare = pd.DataFrame(
  186. [],
  187. columns=["event_value", "belief_horizon"],
  188. )
  189. # set correct types
  190. df_compare["event_value"] = pd.to_numeric(df_compare["event_value"])
  191. df_compare["belief_horizon"] = pd.to_timedelta(df_compare["belief_horizon"])
  192. df = multiply_dataframe_with_deterministic_beliefs(df1, df2)
  193. pd.testing.assert_frame_equal(df, df_compare)
  194. @pytest.mark.parametrize("check_empty_frame", [True, False])
  195. def test_simplify_index(setup_test_data, check_empty_frame):
  196. """Check whether simplify_index retains the event resolution."""
  197. # asset has only 1 power sensor
  198. wind_device_1: Sensor = setup_test_data["wind-asset-1"].sensors[0]
  199. bdf: tb.BeliefsDataFrame = TimedBelief.search(
  200. wind_device_1.name,
  201. event_starts_after=datetime(2015, 1, 1, tzinfo=pytz.utc),
  202. event_ends_before=datetime(2015, 1, 2, tzinfo=pytz.utc),
  203. resolution=timedelta(minutes=15),
  204. ).convert_index_from_belief_time_to_horizon()
  205. if check_empty_frame:
  206. # We empty the BeliefsDataFrame, which retains the metadata such as sensor and resolution
  207. bdf = bdf.iloc[0:0, :]
  208. df = simplify_index(bdf)
  209. assert df.event_resolution == timedelta(minutes=15)
  210. def test_query_beliefs(setup_beliefs, db):
  211. """Check various ways of querying for beliefs."""
  212. sensor = get_test_sensor(db)
  213. source = db.session.execute(
  214. select(DataSource).filter_by(name="ENTSO-E")
  215. ).scalar_one_or_none()
  216. bdfs = [
  217. TimedBelief.search(sensor, source=source, most_recent_beliefs_only=False),
  218. TimedBelief.search(sensor.id, source=source, most_recent_beliefs_only=False),
  219. TimedBelief.search(sensor.name, source=source, most_recent_beliefs_only=False),
  220. sensor.search_beliefs(source=source, most_recent_beliefs_only=False),
  221. tb.BeliefsDataFrame(sensor.beliefs)[
  222. tb.BeliefsDataFrame(sensor.beliefs).index.get_level_values("source")
  223. == source
  224. ],
  225. ]
  226. for bdf in bdfs:
  227. assert sensor.event_resolution == timedelta(hours=1)
  228. assert bdf.event_resolution == timedelta(hours=1)
  229. assert len(bdf) == setup_beliefs
  230. def test_persist_beliefs(setup_beliefs, setup_test_data, db):
  231. """Check whether persisting beliefs works.
  232. We load the already set up beliefs, and form new beliefs an hour later.
  233. """
  234. sensor = get_test_sensor(db)
  235. source = db.session.execute(
  236. select(DataSource).filter_by(name="ENTSO-E")
  237. ).scalar_one_or_none()
  238. bdf: tb.BeliefsDataFrame = TimedBelief.search(
  239. sensor, source=source, most_recent_beliefs_only=False
  240. )
  241. # Form new beliefs
  242. df = bdf.reset_index()
  243. df["belief_time"] = df["belief_time"] + timedelta(hours=1)
  244. df["event_value"] = df["event_value"] * 10
  245. bdf = df.set_index(
  246. ["event_start", "belief_time", "source", "cumulative_probability"]
  247. )
  248. TimedBelief.add(bdf)
  249. bdf: tb.BeliefsDataFrame = TimedBelief.search(
  250. sensor, source=source, most_recent_beliefs_only=False
  251. )
  252. assert len(bdf) == setup_beliefs * 2
  253. def test_search_sources(db, setup_multiple_sources):
  254. test_sensor, s1, s2, s3 = setup_multiple_sources
  255. def get_sources_names(vec: list[DataSource]) -> list[str]:
  256. return [s.name for s in vec]
  257. # no filter
  258. assert get_sources_names(test_sensor.search_data_sources()) == ["S1", "S2", "S3"]
  259. # exclude results by type
  260. assert get_sources_names(
  261. test_sensor.search_data_sources(exclude_source_types=["type 1"])
  262. ) == ["S2", "S3"]
  263. # filter by type
  264. assert get_sources_names(
  265. test_sensor.search_data_sources(source_types=["type 2"])
  266. ) == ["S2"]
  267. # time window filter
  268. assert get_sources_names(
  269. test_sensor.search_data_sources(
  270. event_starts_after="2024-01-01T00:00:00+01:00",
  271. event_ends_before="2024-01-02T00:00:00+01:00",
  272. )
  273. ) == ["S1", "S2"]