model_spec_factory.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. from __future__ import annotations
  2. from datetime import datetime, timedelta, tzinfo
  3. from pprint import pformat
  4. from typing import Any
  5. import logging
  6. import pytz
  7. from flask import current_app
  8. from flexmeasures.data.queries.utils import (
  9. simplify_index,
  10. )
  11. from timely_beliefs import BeliefsDataFrame
  12. from timetomodel import ModelSpecs
  13. from timetomodel.exceptions import MissingData, NaNData
  14. from timetomodel.speccing import SeriesSpecs
  15. from timetomodel.transforming import (
  16. BoxCoxTransformation,
  17. ReversibleTransformation,
  18. Transformation,
  19. )
  20. import pandas as pd
  21. from flexmeasures.data.models.time_series import Sensor, TimedBelief
  22. from flexmeasures.data.models.forecasting.utils import (
  23. create_lags,
  24. set_training_and_testing_dates,
  25. get_query_window,
  26. )
  27. """
  28. Here we generate an initial version of timetomodel specs, given what asset and what timing
  29. is defined.
  30. These specs can be customized.
  31. """
  32. logger = logging.getLogger(__name__)
  33. class TBSeriesSpecs(SeriesSpecs):
  34. """Compatibility for using timetomodel.SeriesSpecs with timely_beliefs.BeliefsDataFrames.
  35. This implements _load_series such that <time_series_class>.search is called,
  36. with the parameters in search_params.
  37. The search function is expected to return a BeliefsDataFrame.
  38. """
  39. time_series_class: Any # with <search_fnc> method (named "search" by default)
  40. search_params: dict
  41. def __init__(
  42. self,
  43. search_params: dict,
  44. name: str,
  45. time_series_class: type | None = TimedBelief,
  46. search_fnc: str = "search",
  47. original_tz: tzinfo | None = pytz.utc, # postgres stores naive datetimes
  48. feature_transformation: ReversibleTransformation | None = None,
  49. post_load_processing: Transformation | None = None,
  50. resampling_config: dict[str, Any] = None,
  51. interpolation_config: dict[str, Any] = None,
  52. ):
  53. super().__init__(
  54. name,
  55. original_tz,
  56. feature_transformation,
  57. post_load_processing,
  58. resampling_config,
  59. interpolation_config,
  60. )
  61. self.time_series_class = time_series_class
  62. self.search_params = search_params
  63. self.search_fnc = search_fnc
  64. def _load_series(self) -> pd.Series:
  65. logger.info("Reading %s data from database" % self.time_series_class.__name__)
  66. bdf: BeliefsDataFrame = getattr(self.time_series_class, self.search_fnc)(
  67. **self.search_params
  68. )
  69. assert isinstance(bdf, BeliefsDataFrame)
  70. df = simplify_index(bdf)
  71. self.check_data(df)
  72. if self.post_load_processing is not None:
  73. df = self.post_load_processing.transform_dataframe(df)
  74. return df["event_value"]
  75. def check_data(self, df: pd.DataFrame):
  76. """Raise error if data is empty or contains nan values.
  77. Here, other than in load_series, we can show the query, which is quite helpful.
  78. """
  79. if df.empty:
  80. raise MissingData(
  81. "No values found in database for the requested %s data. It's no use to continue I'm afraid."
  82. " Here's a print-out of what I tried to search for:\n\n%s\n\n"
  83. % (
  84. self.time_series_class.__name__,
  85. pformat(self.search_params, sort_dicts=False),
  86. )
  87. )
  88. if df.isnull().values.any():
  89. raise NaNData(
  90. "Nan values found in database for the requested %s data. It's no use to continue I'm afraid."
  91. " Here's a print-out of what I tried to search for:\n\n%s\n\n"
  92. % (
  93. self.time_series_class.__name__,
  94. pformat(self.search_params, sort_dicts=False),
  95. )
  96. )
  97. def create_initial_model_specs( # noqa: C901
  98. sensor: Sensor,
  99. forecast_start: datetime, # Start of forecast period
  100. forecast_end: datetime, # End of forecast period
  101. forecast_horizon: timedelta, # Duration between time of forecasting and end time of the event that is forecast
  102. ex_post_horizon: timedelta | None = None,
  103. transform_to_normal: bool = True,
  104. use_regressors: bool = True, # If false, do not create regressor specs
  105. use_periodicity: bool = True, # If false, do not create lags given the asset's periodicity
  106. custom_model_params: (
  107. dict | None
  108. ) = None, # overwrite model params, most useful for tests or experiments
  109. time_series_class: type | None = TimedBelief,
  110. ) -> ModelSpecs:
  111. """
  112. Generic model specs for all asset types (also for markets and weather sensors) and horizons.
  113. Fills in training, testing periods, lags. Specifies input and regressor data.
  114. Does not fill in which model to actually use.
  115. TODO: check if enough data is available both for lagged variables and regressors
  116. TODO: refactor assets and markets to store a list of pandas offset or timedelta instead of booleans for
  117. seasonality, because e.g. although solar and building assets both have daily seasonality, only the former is
  118. insensitive to daylight savings. Therefore: solar periodicity is 24 hours, while building periodicity is 1
  119. calendar day.
  120. """
  121. params = _parameterise_forecasting_by_asset_and_asset_type(
  122. sensor, transform_to_normal
  123. )
  124. params.update(custom_model_params if custom_model_params is not None else {})
  125. lags = create_lags(
  126. params["n_lags"],
  127. sensor,
  128. forecast_horizon,
  129. params["resolution"],
  130. use_periodicity,
  131. )
  132. training_start, testing_end = set_training_and_testing_dates(
  133. forecast_start, params["training_and_testing_period"]
  134. )
  135. query_window = get_query_window(training_start, forecast_end, lags)
  136. regressor_specs = []
  137. regressor_transformation = {}
  138. if use_regressors:
  139. if custom_model_params:
  140. if custom_model_params.get("regressor_transformation", None) is not None:
  141. regressor_transformation = custom_model_params.get(
  142. "regressor_transformation", {}
  143. )
  144. regressor_specs = configure_regressors_for_nearest_weather_sensor(
  145. sensor,
  146. query_window,
  147. forecast_horizon,
  148. regressor_transformation,
  149. transform_to_normal,
  150. )
  151. if ex_post_horizon is None:
  152. ex_post_horizon = timedelta(hours=0)
  153. outcome_var_spec = TBSeriesSpecs(
  154. name=sensor.generic_asset.generic_asset_type.name,
  155. time_series_class=time_series_class,
  156. search_params=dict(
  157. sensors=sensor,
  158. event_starts_after=query_window[0],
  159. event_ends_before=query_window[1],
  160. horizons_at_least=None,
  161. horizons_at_most=ex_post_horizon,
  162. ),
  163. feature_transformation=params.get("outcome_var_transformation", None),
  164. interpolation_config={"method": "time"},
  165. )
  166. # Set defaults if needed
  167. if params.get("event_resolution", None) is None:
  168. params["event_resolution"] = sensor.event_resolution
  169. if params.get("remodel_frequency", None) is None:
  170. params["remodel_frequency"] = timedelta(days=7)
  171. specs = ModelSpecs(
  172. outcome_var=outcome_var_spec,
  173. model=None, # at least this will need to be configured still to make these specs usable!
  174. frequency=params[
  175. "event_resolution"
  176. ], # todo: timetomodel doesn't distinguish frequency and resolution yet
  177. horizon=forecast_horizon,
  178. lags=[int(lag / params["event_resolution"]) for lag in lags],
  179. regressors=regressor_specs,
  180. start_of_training=training_start,
  181. end_of_testing=testing_end,
  182. ratio_training_testing_data=params["ratio_training_testing_data"],
  183. remodel_frequency=params["remodel_frequency"],
  184. )
  185. return specs
  186. def _parameterise_forecasting_by_asset_and_asset_type(
  187. sensor: Sensor,
  188. transform_to_normal: bool,
  189. ) -> dict:
  190. """Fill in the best parameters we know (generic or by asset (type))"""
  191. params = dict()
  192. params["training_and_testing_period"] = timedelta(days=30)
  193. params["ratio_training_testing_data"] = 14 / 15
  194. params["n_lags"] = 7
  195. params["resolution"] = sensor.event_resolution
  196. if transform_to_normal:
  197. params["outcome_var_transformation"] = (
  198. get_normalization_transformation_from_sensor_attributes(sensor)
  199. )
  200. return params
  201. def get_normalization_transformation_from_sensor_attributes(
  202. sensor: Sensor,
  203. ) -> Transformation | None:
  204. """
  205. Transform data to be normal, using the BoxCox transformation. Lambda parameter is chosen
  206. according to the asset type.
  207. """
  208. if (
  209. sensor.get_attribute("is_consumer") and not sensor.get_attribute("is_producer")
  210. ) or (
  211. sensor.get_attribute("is_producer") and not sensor.get_attribute("is_consumer")
  212. ):
  213. return BoxCoxTransformation(lambda2=0.1)
  214. elif sensor.generic_asset.generic_asset_type.name in [
  215. "wind speed",
  216. "irradiance",
  217. ]:
  218. # Values cannot be negative and are often zero
  219. return BoxCoxTransformation(lambda2=0.1)
  220. elif sensor.generic_asset.generic_asset_type.name == "temperature":
  221. # Values can be positive or negative when given in degrees Celsius, but non-negative only in Kelvin
  222. return BoxCoxTransformation(lambda2=273.16)
  223. else:
  224. return None
  225. def configure_regressors_for_nearest_weather_sensor(
  226. sensor: Sensor,
  227. query_window,
  228. horizon,
  229. regressor_transformation, # the regressor transformation can be passed in
  230. transform_to_normal, # if not, it a normalization can be applied
  231. ) -> list[TBSeriesSpecs]:
  232. """We use weather data as regressors. Here, we configure them."""
  233. regressor_specs = []
  234. correlated_sensor_names = sensor.get_attribute("weather_correlations")
  235. if correlated_sensor_names:
  236. current_app.logger.info(
  237. "For %s, I need sensors: %s" % (sensor.name, correlated_sensor_names)
  238. )
  239. for sensor_name in correlated_sensor_names:
  240. # Find the nearest weather sensor
  241. closest_sensor = Sensor.find_closest(
  242. generic_asset_type_name="weather station",
  243. sensor_name=sensor_name,
  244. object=sensor,
  245. )
  246. if closest_sensor is None:
  247. current_app.logger.warning(
  248. "No sensor found of sensor type %s to use as regressor for %s."
  249. % (sensor_name, sensor.name)
  250. )
  251. else:
  252. current_app.logger.info(
  253. "Using sensor %s as regressor for %s." % (sensor_name, sensor.name)
  254. )
  255. # Collect the weather data for the requested time window
  256. regressor_specs_name = "%s_l0" % sensor_name
  257. if len(regressor_transformation.keys()) == 0 and transform_to_normal:
  258. regressor_transformation = (
  259. get_normalization_transformation_from_sensor_attributes(
  260. closest_sensor,
  261. )
  262. )
  263. regressor_specs.append(
  264. TBSeriesSpecs(
  265. name=regressor_specs_name,
  266. time_series_class=TimedBelief,
  267. search_params=dict(
  268. sensors=closest_sensor,
  269. event_starts_after=query_window[0],
  270. event_ends_before=query_window[1],
  271. horizons_at_least=horizon,
  272. horizons_at_most=None,
  273. ),
  274. feature_transformation=regressor_transformation,
  275. interpolation_config={"method": "time"},
  276. )
  277. )
  278. return regressor_specs