forecasting.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. """
  2. Logic around scheduling (jobs)
  3. """
  4. from __future__ import annotations
  5. from datetime import datetime, timedelta
  6. from flask import current_app
  7. import click
  8. from rq import get_current_job
  9. from rq.job import Job
  10. from timetomodel.forecasting import make_rolling_forecasts
  11. import timely_beliefs as tb
  12. from flexmeasures.data import db
  13. from flexmeasures.data.models.forecasting import lookup_model_specs_configurator
  14. from flexmeasures.data.models.forecasting.exceptions import InvalidHorizonException
  15. from flexmeasures.data.models.time_series import Sensor, TimedBelief
  16. from flexmeasures.data.models.forecasting.utils import (
  17. get_query_window,
  18. check_data_availability,
  19. )
  20. from flexmeasures.data.utils import get_data_source, save_to_db
  21. from flexmeasures.utils.time_utils import (
  22. as_server_time,
  23. server_now,
  24. forecast_horizons_for,
  25. supported_horizons,
  26. )
  27. """
  28. The life cycle of a forecasting job:
  29. 1. A forecasting job is born in create_forecasting_jobs.
  30. 2. It is run in make_rolling_viewpoint_forecasts or make_fixed_viewpoint_forecasts, which write results to the db.
  31. This is also where model specs are configured and a possible fallback model is stored for step 3.
  32. 3. If an error occurs (and the worker is configured accordingly), handle_forecasting_exception comes in.
  33. This might re-enqueue the job or try a different model (which creates a new job).
  34. """
  35. # TODO: we could also monitor the failed queue and re-enqueue jobs who had missing data
  36. # (and maybe failed less than three times so far)
  37. class MisconfiguredForecastingJobException(Exception):
  38. pass
  39. def create_forecasting_jobs(
  40. sensor_id: int,
  41. start_of_roll: datetime,
  42. end_of_roll: datetime,
  43. resolution: timedelta = None,
  44. horizons: list[timedelta] = None,
  45. model_search_term="linear-OLS",
  46. custom_model_params: dict = None,
  47. enqueue: bool = True,
  48. ) -> list[Job]:
  49. """Create forecasting jobs by rolling through a time window, for a number of given forecast horizons.
  50. Start and end of the forecasting jobs are equal to the time window (start_of_roll, end_of_roll) plus the horizon.
  51. For example (with shorthand notation):
  52. start_of_roll = 3pm
  53. end_of_roll = 5pm
  54. resolution = 15min
  55. horizons = [1h, 6h, 1d]
  56. This creates the following 3 jobs:
  57. 1) forecast each quarter-hour from 4pm to 6pm, i.e. the 1h forecast
  58. 2) forecast each quarter-hour from 9pm to 11pm, i.e. the 6h forecast
  59. 3) forecast each quarter-hour from 3pm to 5pm the next day, i.e. the 1d forecast
  60. If not given, relevant horizons are derived from the resolution of the posted data.
  61. The job needs a model configurator, for which you can supply a model search term. If omitted, the
  62. current default model configuration will be used.
  63. It's possible to customize model parameters, but this feature is (currently) meant to only
  64. be used by tests, so that model behaviour can be adapted to test conditions. If used outside
  65. of testing, an exception is raised.
  66. if enqueue is True (default), the jobs are put on the redis queue.
  67. Returns the redis-queue forecasting jobs which were created.
  68. """
  69. if horizons is None:
  70. if resolution is None:
  71. raise MisconfiguredForecastingJobException(
  72. "Cannot create forecasting jobs - set either horizons or resolution."
  73. )
  74. horizons = forecast_horizons_for(resolution)
  75. jobs: list[Job] = []
  76. for horizon in horizons:
  77. job = Job.create(
  78. make_rolling_viewpoint_forecasts,
  79. kwargs=dict(
  80. sensor_id=sensor_id,
  81. horizon=horizon,
  82. start=start_of_roll + horizon,
  83. end=end_of_roll + horizon,
  84. custom_model_params=custom_model_params,
  85. ),
  86. connection=current_app.queues["forecasting"].connection,
  87. ttl=int(
  88. current_app.config.get(
  89. "FLEXMEASURES_JOB_TTL", timedelta(-1)
  90. ).total_seconds()
  91. ),
  92. )
  93. job.meta["model_search_term"] = model_search_term
  94. job.save_meta()
  95. jobs.append(job)
  96. if enqueue:
  97. current_app.queues["forecasting"].enqueue_job(job)
  98. current_app.job_cache.add(
  99. sensor_id, job.id, queue="forecasting", asset_or_sensor_type="sensor"
  100. )
  101. return jobs
  102. def make_fixed_viewpoint_forecasts(
  103. sensor_id: int,
  104. horizon: timedelta,
  105. start: datetime,
  106. end: datetime,
  107. custom_model_params: dict = None,
  108. ) -> int:
  109. """Build forecasting model specs, make fixed-viewpoint forecasts, and save the forecasts made.
  110. Each individual forecast is a belief about a time interval.
  111. Fixed-viewpoint forecasts share the same belief time.
  112. See the timely-beliefs lib for relevant terminology.
  113. """
  114. # todo: implement fixed-viewpoint forecasts
  115. raise NotImplementedError
  116. def make_rolling_viewpoint_forecasts(
  117. sensor_id: int,
  118. horizon: timedelta,
  119. start: datetime,
  120. end: datetime,
  121. custom_model_params: dict = None,
  122. ) -> int:
  123. """Build forecasting model specs, make rolling-viewpoint forecasts, and save the forecasts made.
  124. Each individual forecast is a belief about a time interval.
  125. Rolling-viewpoint forecasts share the same belief horizon (the duration between belief time and knowledge time).
  126. Model specs are also retrained in a rolling fashion, but with its own frequency set in custom_model_params.
  127. See the timely-beliefs lib for relevant terminology.
  128. Parameters
  129. ----------
  130. :param sensor_id: int
  131. To identify which sensor to forecast
  132. :param horizon: timedelta
  133. duration between the end of each interval and the time at which the belief about that interval is formed
  134. :param start: datetime
  135. start of forecast period, i.e. start time of the first interval to be forecast
  136. :param end: datetime
  137. end of forecast period, i.e end time of the last interval to be forecast
  138. :param custom_model_params: dict
  139. pass in params which will be passed to the model specs configurator,
  140. e.g. outcome_var_transformation, only advisable to be used for testing.
  141. :returns: int
  142. the number of forecasts made
  143. """
  144. # https://docs.sqlalchemy.org/en/13/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork
  145. db.engine.dispose()
  146. rq_job = get_current_job()
  147. # find out which model to run, fall back to latest recommended
  148. model_search_term = rq_job.meta.get("model_search_term", "linear-OLS")
  149. # find sensor
  150. sensor = db.session.get(Sensor, sensor_id)
  151. click.echo(
  152. "Running Forecasting Job %s: %s for %s on model '%s', from %s to %s"
  153. % (rq_job.id, sensor, horizon, model_search_term, start, end)
  154. )
  155. if hasattr(sensor, "market_type"):
  156. ex_post_horizon = None # Todo: until we sorted out the ex_post_horizon, use all available price data
  157. else:
  158. ex_post_horizon = timedelta(hours=0)
  159. # Make model specs
  160. model_configurator = lookup_model_specs_configurator(model_search_term)
  161. model_specs, model_identifier, fallback_model_search_term = model_configurator(
  162. sensor=sensor,
  163. forecast_start=as_server_time(start),
  164. forecast_end=as_server_time(end),
  165. forecast_horizon=horizon,
  166. ex_post_horizon=ex_post_horizon,
  167. custom_model_params=custom_model_params,
  168. )
  169. model_specs.creation_time = server_now()
  170. rq_job.meta["model_identifier"] = model_identifier
  171. rq_job.meta["fallback_model_search_term"] = fallback_model_search_term
  172. rq_job.save()
  173. # before we run the model, check if horizon is okay and enough data is available
  174. if horizon not in supported_horizons():
  175. raise InvalidHorizonException(
  176. "Invalid horizon on job %s: %s" % (rq_job.id, horizon)
  177. )
  178. query_window = get_query_window(
  179. model_specs.start_of_training,
  180. end,
  181. [lag * model_specs.frequency for lag in model_specs.lags],
  182. )
  183. check_data_availability(
  184. sensor,
  185. TimedBelief,
  186. start,
  187. end,
  188. query_window,
  189. horizon,
  190. )
  191. data_source = get_data_source(
  192. data_source_name="Seita (%s)"
  193. % rq_job.meta.get("model_identifier", "unknown model"),
  194. data_source_type="forecasting script",
  195. )
  196. forecasts, model_state = make_rolling_forecasts(
  197. start=as_server_time(start),
  198. end=as_server_time(end),
  199. model_specs=model_specs,
  200. )
  201. click.echo("Job %s made %d forecasts." % (rq_job.id, len(forecasts)))
  202. ts_value_forecasts = [
  203. TimedBelief(
  204. event_start=dt,
  205. belief_horizon=horizon,
  206. event_value=value,
  207. sensor=sensor,
  208. source=data_source,
  209. )
  210. for dt, value in forecasts.items()
  211. ]
  212. bdf = tb.BeliefsDataFrame(ts_value_forecasts)
  213. save_to_db(bdf)
  214. db.session.commit()
  215. return len(forecasts)
  216. def handle_forecasting_exception(job, exc_type, exc_value, traceback):
  217. """
  218. Decide if we can do something about this failure:
  219. * Try a different model
  220. * Re-queue at a later time (using rq_scheduler)
  221. """
  222. click.echo(
  223. "HANDLING RQ FORECASTING WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value)
  224. )
  225. if "failures" not in job.meta:
  226. job.meta["failures"] = 1
  227. else:
  228. job.meta["failures"] = job.meta["failures"] + 1
  229. job.save_meta()
  230. # We might use this to decide if we want to re-queue a failed job
  231. # if job.meta['failures'] < 3:
  232. # job.queue.failures.requeue(job)
  233. # TODO: use this to add more meta information?
  234. # if exc_type == NotEnoughDataException:
  235. if "fallback_model_search_term" in job.meta:
  236. if job.meta["fallback_model_search_term"] is not None:
  237. new_job = Job.create(
  238. make_rolling_viewpoint_forecasts,
  239. args=job.args,
  240. kwargs=job.kwargs,
  241. connection=current_app.queues["forecasting"].connection,
  242. )
  243. new_job.meta["model_search_term"] = job.meta["fallback_model_search_term"]
  244. new_job.save_meta()
  245. current_app.queues["forecasting"].enqueue_job(new_job)
  246. def num_forecasts(start: datetime, end: datetime, resolution: timedelta) -> int:
  247. """Compute how many forecasts a job needs to make, given a resolution"""
  248. return (end - start) // resolution