utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. from __future__ import annotations
  2. from typing import Type
  3. from datetime import datetime, timedelta
  4. from flask_security import current_user
  5. from werkzeug.exceptions import Forbidden
  6. import pandas as pd
  7. import timely_beliefs as tb
  8. from sqlalchemy.orm import Session
  9. from sqlalchemy.sql.elements import BinaryExpression, or_
  10. from sqlalchemy.sql.expression import null
  11. from sqlalchemy import select, Select
  12. from flexmeasures.data.config import db
  13. from flexmeasures.data.models.generic_assets import GenericAsset
  14. from flexmeasures.data.models.data_sources import DataSource
  15. from flexmeasures.utils import flexmeasures_inflection
  16. from flexmeasures.auth.policy import user_has_admin_access
  17. from flexmeasures.cli import is_running as running_as_cli
  18. import flexmeasures.data.models.time_series as ts # noqa: F401
  19. def create_beliefs_query(
  20. cls: "Type[ts.TimedValue]",
  21. session: Session,
  22. old_sensor_class: db.Model,
  23. old_sensor_names: tuple[str],
  24. start: datetime | None,
  25. end: datetime | None,
  26. ) -> Select:
  27. query = (
  28. select(old_sensor_class.name, cls.datetime, cls.value, cls.horizon, DataSource)
  29. .join(DataSource)
  30. .filter(cls.data_source_id == DataSource.id)
  31. .join(old_sensor_class)
  32. .filter(old_sensor_class.name.in_(old_sensor_names))
  33. )
  34. if start is not None:
  35. query = query.filter((cls.datetime > start - old_sensor_class.event_resolution))
  36. if end is not None:
  37. query = query.filter((cls.datetime < end))
  38. return query
  39. def potentially_limit_assets_query_to_account(
  40. query: Select[tuple[GenericAsset]],
  41. account_id: int | None = None,
  42. ) -> Select[tuple[GenericAsset]]:
  43. """Filter out all assets that are not in the current user's account.
  44. For admins and CLI users, no assets are filtered out, unless an account_id is set.
  45. :param account_id: if set, all assets that are not in the given account will be filtered out (only works for admins and CLI users). For querying public assets in particular, don't use this function.
  46. """
  47. if not running_as_cli() and not current_user.is_authenticated:
  48. raise Forbidden("Unauthenticated user cannot list assets.")
  49. user_is_admin = running_as_cli() or user_has_admin_access(
  50. current_user, permission="read" if query.is_select else "update"
  51. )
  52. if account_id is None and user_is_admin:
  53. return query # allow admins to query assets across all accounts
  54. if (
  55. account_id is not None
  56. and account_id != current_user.account_id
  57. and not user_is_admin
  58. ):
  59. raise Forbidden("Non-admin cannot access assets from other accounts.")
  60. account_id_to_filter = (
  61. account_id if account_id is not None else current_user.account_id
  62. )
  63. return query.filter(
  64. or_(
  65. GenericAsset.account_id == account_id_to_filter,
  66. GenericAsset.account_id == null(),
  67. )
  68. )
  69. def get_source_criteria(
  70. cls: "Type[ts.TimedValue] | Type[ts.TimedBelief]",
  71. user_source_ids: int | list[int],
  72. source_types: list[str],
  73. exclude_source_types: list[str],
  74. ) -> list[BinaryExpression]:
  75. source_criteria: list[BinaryExpression] = []
  76. if user_source_ids is not None:
  77. source_criteria.append(user_source_criterion(cls, user_source_ids))
  78. if source_types is not None:
  79. if user_source_ids and "user" not in source_types:
  80. source_types.append("user")
  81. source_criteria.append(source_type_criterion(source_types))
  82. if exclude_source_types is not None:
  83. if user_source_ids and "user" in exclude_source_types:
  84. exclude_source_types.remove("user")
  85. source_criteria.append(source_type_exclusion_criterion(exclude_source_types))
  86. return source_criteria
  87. def user_source_criterion(
  88. cls: "Type[ts.TimedValue] | Type[ts.TimedBelief]",
  89. user_source_ids: int | list[int],
  90. ) -> BinaryExpression:
  91. """Criterion to search only through user data from the specified user sources.
  92. We distinguish user sources (sources with source.type == "user") from other sources (source.type != "user").
  93. Data with a user source originates from a registered user. Data with e.g. a script source originates from a script.
  94. This criterion doesn't affect the query over non-user type sources.
  95. It does so by ignoring user sources that are not in the given list of source_ids.
  96. """
  97. if user_source_ids is not None and not isinstance(user_source_ids, list):
  98. user_source_ids = [user_source_ids] # ensure user_source_ids is a list
  99. ignorable_user_sources = db.session.scalars(
  100. select(DataSource)
  101. .filter(DataSource.type == "user")
  102. .filter(DataSource.id.not_in(user_source_ids))
  103. ).all()
  104. ignorable_user_source_ids = [
  105. user_source.id for user_source in ignorable_user_sources
  106. ]
  107. # todo: [legacy] deprecate this if-statement, which is used to support the TimedValue class
  108. if hasattr(cls, "data_source_id"):
  109. return cls.data_source_id.not_in(ignorable_user_source_ids)
  110. return cls.source_id.not_in(ignorable_user_source_ids)
  111. def source_type_criterion(source_types: list[str]) -> BinaryExpression:
  112. """Criterion to collect only data from sources that are of the given type."""
  113. return DataSource.type.in_(source_types)
  114. def source_type_exclusion_criterion(source_types: list[str]) -> BinaryExpression:
  115. """Criterion to exclude sources that are of the given type."""
  116. return DataSource.type.not_in(source_types)
  117. def get_belief_timing_criteria(
  118. cls: "Type[ts.TimedValue]",
  119. asset_class: db.Model,
  120. belief_horizon_window: tuple[timedelta | None, timedelta | None],
  121. belief_time_window: tuple[datetime | None, datetime | None],
  122. ) -> list[BinaryExpression]:
  123. """Get filter criteria for the desired windows with relevant belief times and belief horizons.
  124. # todo: interpret belief horizons with respect to knowledge time rather than event end.
  125. - a positive horizon denotes a before-the-fact belief (ex-ante w.r.t. knowledge time)
  126. - a negative horizon denotes an after-the-fact belief (ex-post w.r.t. knowledge time)
  127. :param belief_horizon_window: short belief horizon and long belief horizon, each an optional timedelta
  128. Interpretation:
  129. - a positive short horizon denotes "at least <horizon> before the fact" (min ex-ante)
  130. - a positive long horizon denotes "at most <horizon> before the fact" (max ex-ante)
  131. - a negative short horizon denotes "at most <horizon> after the fact" (max ex-post)
  132. - a negative long horizon denotes "at least <horizon> after the fact" (min ex-post)
  133. :param belief_time_window: earliest belief time and latest belief time, each an optional datetime
  134. Examples (assuming the knowledge time of each event coincides with the end of the event):
  135. # Query beliefs formed between 1 and 7 days before each individual event
  136. belief_horizon_window = (timedelta(days=1), timedelta(days=7))
  137. # Query beliefs formed at least 2 hours before each individual event
  138. belief_horizon_window = (timedelta(hours=2), None)
  139. # Query beliefs formed at most 2 hours after each individual event
  140. belief_horizon_window = (-timedelta(hours=2), None)
  141. # Query beliefs formed at least after each individual event
  142. belief_horizon_window = (None, timedelta(hours=0))
  143. # Query beliefs formed from May 2nd to May 13th (left inclusive, right exclusive)
  144. belief_time_window = (datetime(2020, 5, 2), datetime(2020, 5, 13))
  145. # Query beliefs formed from May 14th onwards
  146. belief_time_window = (datetime(2020, 5, 14), None)
  147. # Query beliefs formed before May 13th
  148. belief_time_window = (None, datetime(2020, 5, 13))
  149. """
  150. criteria: list[BinaryExpression] = []
  151. earliest_belief_time, latest_belief_time = belief_time_window
  152. if (
  153. earliest_belief_time is not None
  154. and latest_belief_time is not None
  155. and earliest_belief_time == latest_belief_time
  156. ): # search directly for a unique belief time
  157. criteria.append(
  158. cls.datetime + asset_class.event_resolution - cls.horizon
  159. == earliest_belief_time
  160. )
  161. else:
  162. if earliest_belief_time is not None:
  163. criteria.append(
  164. cls.datetime + asset_class.event_resolution - cls.horizon
  165. >= earliest_belief_time
  166. )
  167. if latest_belief_time is not None:
  168. criteria.append(
  169. cls.datetime + asset_class.event_resolution - cls.horizon
  170. <= latest_belief_time
  171. )
  172. short_horizon, long_horizon = belief_horizon_window
  173. if (
  174. short_horizon is not None
  175. and long_horizon is not None
  176. and short_horizon == long_horizon
  177. ): # search directly for a unique belief horizon
  178. criteria.append(cls.horizon == short_horizon)
  179. else:
  180. if short_horizon is not None:
  181. criteria.append(cls.horizon >= short_horizon)
  182. if long_horizon is not None:
  183. criteria.append(cls.horizon <= long_horizon)
  184. return criteria
  185. def simplify_index(
  186. bdf: tb.BeliefsDataFrame, index_levels_to_columns: list[str] | None = None
  187. ) -> pd.DataFrame:
  188. """Drops indices other than event_start.
  189. Optionally, salvage index levels as new columns.
  190. Because information stored in the index levels is potentially lost*,
  191. we cannot guarantee a complete description of beliefs in the BeliefsDataFrame.
  192. Therefore, we type the result as a regular pandas DataFrame.
  193. * The index levels are dropped (by overwriting the multi-level index with just the “event_start” index level).
  194. Only for the columns named in index_levels_to_columns, the relevant information is kept around.
  195. """
  196. if index_levels_to_columns is not None:
  197. for col in index_levels_to_columns:
  198. try:
  199. bdf[col] = bdf.index.get_level_values(col)
  200. except KeyError:
  201. if hasattr(bdf, col):
  202. bdf[col] = getattr(bdf, col)
  203. elif hasattr(bdf, flexmeasures_inflection.pluralize(col)):
  204. bdf[col] = getattr(bdf, flexmeasures_inflection.pluralize(col))
  205. else:
  206. raise KeyError(f"Level {col} not found")
  207. bdf.index = bdf.index.get_level_values("event_start")
  208. return bdf
  209. def multiply_dataframe_with_deterministic_beliefs(
  210. df1: pd.DataFrame,
  211. df2: pd.DataFrame,
  212. multiplication_factor: float = 1,
  213. result_source: str | None = None,
  214. ) -> pd.DataFrame:
  215. """
  216. Create new DataFrame where the event_value columns of df1 and df2 are multiplied.
  217. If df1 and df2 have belief_horizon columns, the belief_horizon column of the new DataFrame is
  218. determined as the minimum of the input horizons.
  219. The source columns of df1 and df2 are not used. A source column for the new DataFrame can be set
  220. by passing a result_source (string).
  221. The index of the resulting DataFrame contains the outer join of the indices of df1 and df2.
  222. Event values are np.nan for rows that are not in both DataFrames.
  223. :param df1: DataFrame with "event_value" column and optional "belief_horizon" and "source" columns
  224. :param df2: DataFrame with "event_value" column and optional "belief_horizon" and "source" columns
  225. :param multiplication_factor: extra scalar to determine the event_value of the resulting DataFrame
  226. :param result_source: string label for the source of the resulting DataFrame
  227. :returns: DataFrame with "event_value" column,
  228. an additional "belief_horizon" column if both df1 and df2 contain this column, and
  229. an additional "source" column if result_source is set.
  230. """
  231. if df1.empty and df2.empty:
  232. return df1
  233. df = (df1["event_value"] * df2["event_value"] * multiplication_factor).to_frame(
  234. name="event_value"
  235. )
  236. if "belief_horizon" in df1.columns and "belief_horizon" in df2.columns:
  237. df["belief_horizon"] = (
  238. df1["belief_horizon"]
  239. .rename("belief_horizon1")
  240. .to_frame()
  241. .join(df2["belief_horizon"], how="outer")
  242. .min(axis=1)
  243. .rename("belief_horizon")
  244. )
  245. # Add existing belief_horizon information, keeping only the smaller horizon per row
  246. if result_source is not None:
  247. df["source"] = result_source # also for rows with nan event_value
  248. return df