test_forecasting_jobs_fresh_db.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. from datetime import timedelta, datetime
  2. import pytest
  3. from sqlalchemy import select, Select
  4. from flexmeasures.data.models.time_series import Sensor, TimedBelief
  5. from flexmeasures.data.services.forecasting import (
  6. create_forecasting_jobs,
  7. handle_forecasting_exception,
  8. )
  9. from flexmeasures.data.tests.test_forecasting_jobs import (
  10. custom_model_params,
  11. check_aggregate,
  12. check_failures,
  13. get_data_source,
  14. )
  15. from flexmeasures.data.tests.utils import work_on_rq
  16. from flexmeasures.utils.time_utils import as_server_time
  17. def test_forecasting_three_hours_of_wind(
  18. app, run_as_cli, setup_fresh_test_data, clean_redis, fresh_db
  19. ):
  20. # asset has only 1 power sensor
  21. wind_device_2: Sensor = setup_fresh_test_data["wind-asset-2"].sensors[0]
  22. # makes 12 forecasts
  23. horizon = timedelta(hours=1)
  24. job = create_forecasting_jobs(
  25. start_of_roll=as_server_time(datetime(2015, 1, 1, 10)),
  26. end_of_roll=as_server_time(datetime(2015, 1, 1, 13)),
  27. horizons=[horizon],
  28. sensor_id=wind_device_2.id,
  29. custom_model_params=custom_model_params(),
  30. )
  31. print("Job: %s" % job[0].id)
  32. work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception)
  33. forecasts = fresh_db.session.scalars(
  34. select(TimedBelief)
  35. .filter(TimedBelief.sensor_id == wind_device_2.id)
  36. .filter(TimedBelief.belief_horizon == horizon)
  37. .filter(
  38. (TimedBelief.event_start >= as_server_time(datetime(2015, 1, 1, 11)))
  39. & (TimedBelief.event_start < as_server_time(datetime(2015, 1, 1, 14)))
  40. )
  41. ).all()
  42. assert len(forecasts) == 12
  43. check_aggregate(12, horizon, wind_device_2.id)
  44. def test_forecasting_two_hours_of_solar(
  45. app, run_as_cli, setup_fresh_test_data, clean_redis, fresh_db
  46. ):
  47. # asset has only 1 power sensor
  48. solar_device_1: Sensor = setup_fresh_test_data["solar-asset-1"].sensors[0]
  49. # makes 8 forecasts
  50. horizon = timedelta(hours=1)
  51. job = create_forecasting_jobs(
  52. start_of_roll=as_server_time(datetime(2015, 1, 1, 12)),
  53. end_of_roll=as_server_time(datetime(2015, 1, 1, 14)),
  54. horizons=[horizon],
  55. sensor_id=solar_device_1.id,
  56. custom_model_params=custom_model_params(),
  57. )
  58. print("Job: %s" % job[0].id)
  59. work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception)
  60. forecasts = fresh_db.session.scalars(
  61. select(TimedBelief)
  62. .filter(TimedBelief.sensor_id == solar_device_1.id)
  63. .filter(TimedBelief.belief_horizon == horizon)
  64. .filter(
  65. (TimedBelief.event_start >= as_server_time(datetime(2015, 1, 1, 13)))
  66. & (TimedBelief.event_start < as_server_time(datetime(2015, 1, 1, 15)))
  67. )
  68. ).all()
  69. assert len(forecasts) == 8
  70. check_aggregate(8, horizon, solar_device_1.id)
  71. @pytest.mark.parametrize(
  72. "model_to_start_with, model_version",
  73. [
  74. ("failing-test", 1),
  75. ("linear-OLS", 2),
  76. ],
  77. )
  78. def test_failed_model_with_too_much_training_then_succeed_with_fallback(
  79. app,
  80. run_as_cli,
  81. clean_redis,
  82. setup_fresh_test_data,
  83. model_to_start_with,
  84. model_version,
  85. fresh_db,
  86. ):
  87. """
  88. Here we fail once - because we start with a model that needs too much training.
  89. So we check for this failure happening as expected.
  90. But then, we do succeed with the fallback model one level down.
  91. (fail-test falls back to linear & linear falls back to naive).
  92. As a result, there should be forecasts in the DB.
  93. """
  94. # asset has only 1 power sensor
  95. solar_device_1: Sensor = setup_fresh_test_data["solar-asset-1"].sensors[0]
  96. # Remove each seasonality, so we don't query test data that isn't there
  97. solar_device_1.set_attribute("daily_seasonality", False)
  98. solar_device_1.set_attribute("weekly_seasonality", False)
  99. solar_device_1.set_attribute("yearly_seasonality", False)
  100. horizon_hours = 1
  101. horizon = timedelta(hours=horizon_hours)
  102. cmp = custom_model_params()
  103. hour_start = 5
  104. if model_to_start_with == "linear-OLS":
  105. # making the linear model fail and fall back to naive
  106. hour_start = 3 # Todo: explain this parameter; why would it fail to forecast if data is there for the full day?
  107. # The failed test model (this failure enqueues a new job)
  108. create_forecasting_jobs(
  109. start_of_roll=as_server_time(datetime(2015, 1, 1, hour_start)),
  110. end_of_roll=as_server_time(datetime(2015, 1, 1, hour_start + 2)),
  111. horizons=[horizon],
  112. sensor_id=solar_device_1.id,
  113. model_search_term=model_to_start_with,
  114. custom_model_params=cmp,
  115. )
  116. work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception)
  117. # Check if the correct model failed in the expected way
  118. check_failures(
  119. app.queues["forecasting"],
  120. ["NotEnoughDataException"],
  121. ["%s model v%d" % (model_to_start_with, model_version)],
  122. )
  123. # this query is useful to check data:
  124. def make_query(the_horizon_hours: int) -> Select:
  125. the_horizon = timedelta(hours=the_horizon_hours)
  126. return (
  127. select(TimedBelief)
  128. .filter(TimedBelief.sensor_id == solar_device_1.id)
  129. .filter(TimedBelief.belief_horizon == the_horizon)
  130. .filter(
  131. (
  132. TimedBelief.event_start
  133. >= as_server_time(
  134. datetime(2015, 1, 1, hour_start + the_horizon_hours)
  135. )
  136. )
  137. & (
  138. TimedBelief.event_start
  139. < as_server_time(
  140. datetime(2015, 1, 1, hour_start + the_horizon_hours + 2)
  141. )
  142. )
  143. )
  144. )
  145. # The successful (linear or naive) OLS leads to these.
  146. forecasts = fresh_db.session.scalars(
  147. make_query(the_horizon_hours=horizon_hours)
  148. ).all()
  149. assert len(forecasts) == 8
  150. check_aggregate(8, horizon, solar_device_1.id)
  151. if model_to_start_with == "linear-OLS":
  152. existing_data = fresh_db.session.scalars(make_query(the_horizon_hours=0)).all()
  153. for ed, fd in zip(existing_data, forecasts):
  154. assert ed.event_value == fd.event_value
  155. # Now to check which models actually got to work.
  156. # We check which data sources do and do not exist by now:
  157. assert (
  158. get_data_source("failing-test model v1") is None
  159. ) # the test failure model failed -> no data source
  160. if model_to_start_with == "linear-OLS":
  161. assert (
  162. get_data_source() is None
  163. ) # the default (linear regression) (was made to) fail, as well
  164. assert (
  165. get_data_source("naive model v1") is not None
  166. ) # the naive one had to be used
  167. else:
  168. assert get_data_source() is not None # the default (linear regression)
  169. assert (
  170. get_data_source("naive model v1") is None
  171. ) # the naive one did not have to be used