utils.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. from __future__ import annotations
  2. from packaging import version
  3. from datetime import date, datetime, timedelta
  4. from flask import current_app
  5. import pandas as pd
  6. from pandas.tseries.frequencies import to_offset
  7. import numpy as np
  8. import timely_beliefs as tb
  9. from flexmeasures.data.models.planning.exceptions import UnknownPricesException
  10. from flexmeasures.data.models.time_series import Sensor, TimedBelief
  11. from flexmeasures import Asset
  12. from flexmeasures.data.models.planning import StockCommitment
  13. from flexmeasures.data.queries.utils import simplify_index
  14. from flexmeasures.utils.flexmeasures_inflection import capitalize, pluralize
  15. from flexmeasures.utils.unit_utils import ur, convert_units
  16. from pint.errors import UndefinedUnitError, DimensionalityError
  17. def initialize_df(
  18. columns: list[str],
  19. start: datetime,
  20. end: datetime,
  21. resolution: timedelta,
  22. inclusive: str = "left",
  23. ) -> pd.DataFrame:
  24. df = pd.DataFrame(
  25. index=initialize_index(start, end, resolution, inclusive), columns=columns
  26. )
  27. return df
  28. def initialize_series(
  29. data: pd.Series | list[float] | np.ndarray | float | None,
  30. start: datetime,
  31. end: datetime,
  32. resolution: timedelta,
  33. inclusive: str = "left",
  34. ) -> pd.Series:
  35. s = pd.Series(index=initialize_index(start, end, resolution, inclusive), data=data)
  36. return s
  37. def initialize_index(
  38. start: date | datetime | str,
  39. end: date | datetime | str,
  40. resolution: timedelta | str,
  41. inclusive: str = "left",
  42. ) -> pd.DatetimeIndex:
  43. if version.parse(pd.__version__) >= version.parse("1.4.0"):
  44. return pd.date_range(
  45. start=start,
  46. end=end,
  47. freq=to_offset(resolution),
  48. inclusive=inclusive,
  49. name="datetime",
  50. )
  51. else:
  52. return pd.date_range(
  53. start=start,
  54. end=end,
  55. freq=to_offset(resolution),
  56. closed=inclusive,
  57. name="datetime",
  58. )
  59. def add_tiny_price_slope(
  60. orig_prices: pd.DataFrame, col_name: str = "event_value", d: float = 10**-3
  61. ) -> pd.DataFrame:
  62. """Add tiny price slope to col_name to represent e.g. inflation as a simple linear price increase.
  63. This is meant to break ties, when multiple time slots have equal prices, in favour of acting sooner.
  64. We penalise the future with at most d times the price spread (1 per thousand by default).
  65. """
  66. prices = orig_prices.copy()
  67. price_spread = prices[col_name].max() - prices[col_name].min()
  68. if price_spread > 0:
  69. max_penalty = price_spread * d
  70. else:
  71. max_penalty = d
  72. prices[col_name] = prices[col_name] + np.linspace(
  73. 0, max_penalty, prices[col_name].size
  74. )
  75. return prices
  76. def ensure_prices_are_not_empty(
  77. prices: pd.DataFrame,
  78. price_variable_quantity: Sensor | list[dict] | ur.Quantity | None,
  79. ):
  80. if prices.isnull().values.all() or prices.empty:
  81. error_message = "Prices unknown for planning window."
  82. if isinstance(price_variable_quantity, Sensor):
  83. error_message += f" (sensor {price_variable_quantity.id})"
  84. raise UnknownPricesException(error_message)
  85. def extend_to_edges(
  86. df: pd.DataFrame | pd.Series,
  87. query_window: tuple[datetime, datetime],
  88. resolution: timedelta,
  89. kind_of_values: str = "price",
  90. sensor: Sensor = None,
  91. allow_trimmed_query_window: bool = False,
  92. ):
  93. """Values are extended to the edges of the query window.
  94. - The first available value serves as a naive backcasts.
  95. - The last available value serves as a naive forecast.
  96. """
  97. nan_values = df.isnull().values
  98. if (
  99. nan_values.any()
  100. or pd.Timestamp(df.index[0]).tz_convert("UTC")
  101. != pd.Timestamp(query_window[0]).tz_convert("UTC")
  102. or pd.Timestamp(df.index[-1]).tz_convert("UTC") + resolution
  103. != pd.Timestamp(query_window[-1]).tz_convert("UTC")
  104. ):
  105. if allow_trimmed_query_window:
  106. first_event_start = df.first_valid_index()
  107. last_event_end = df.last_valid_index() + resolution
  108. sensor_info = (
  109. f" (sensor {sensor.id})" if sensor and hasattr(sensor, "id") else ""
  110. )
  111. current_app.logger.warning(
  112. f"Prices partially unknown for planning window ({sensor_info}). "
  113. f"Trimming planning window (from {query_window[0]} until {query_window[-1]}) to {first_event_start} until {last_event_end}."
  114. )
  115. else:
  116. sensor_info = (
  117. f" (sensor {df.sensor.id})"
  118. if hasattr(df, "sensor") and hasattr(df.sensor, "id")
  119. else ""
  120. )
  121. current_app.logger.warning(
  122. f"{capitalize(pluralize(kind_of_values))} partially unknown for planning window ({sensor_info}). "
  123. f"Assuming the first {kind_of_values} is valid from the start of the planning window ({query_window[0]}), "
  124. f"and the last {kind_of_values} is valid until the end of the planning window ({query_window[-1]})."
  125. )
  126. index = initialize_index(
  127. start=query_window[0],
  128. end=query_window[1],
  129. resolution=resolution,
  130. )
  131. df = df.reindex(index)
  132. # or to also forward fill intermediate NaN values, use: df = df.ffill().bfill()
  133. df[: df.first_valid_index()] = df[
  134. df.index == df.first_valid_index()
  135. ].values[0]
  136. df[df.last_valid_index() :] = df[df.index == df.last_valid_index()].values[
  137. 0
  138. ]
  139. return df
  140. def get_power_values(
  141. query_window: tuple[datetime, datetime],
  142. resolution: timedelta,
  143. beliefs_before: datetime | None,
  144. sensor: Sensor,
  145. ) -> np.ndarray:
  146. """Get measurements or forecasts of an inflexible device represented by a power or energy sensor as an array of power values in MW.
  147. If the requested schedule lies in the future, the returned data will consist of (the most recent) forecasts (if any exist).
  148. If the requested schedule lies in the past, the returned data will consist of (the most recent) measurements (if any exist).
  149. The latter amounts to answering "What if we could have scheduled under perfect foresight?".
  150. :param query_window: datetime window within which events occur (equal to the scheduling window)
  151. :param resolution: timedelta used to resample the forecasts to the resolution of the schedule
  152. :param beliefs_before: datetime used to indicate we are interested in the state of knowledge at that time
  153. :param sensor: power sensor representing an energy flow out of the device
  154. :returns: power measurements or forecasts (consumption is positive, production is negative)
  155. """
  156. bdf: tb.BeliefsDataFrame = TimedBelief.search(
  157. sensor,
  158. event_starts_after=query_window[0],
  159. event_ends_before=query_window[1],
  160. resolution=to_offset(resolution).freqstr,
  161. beliefs_before=beliefs_before,
  162. most_recent_beliefs_only=True,
  163. one_deterministic_belief_per_event=True,
  164. ) # consumption is negative, production is positive
  165. df = simplify_index(bdf)
  166. df = df.reindex(initialize_index(query_window[0], query_window[1], resolution))
  167. nan_values = df.isnull().values
  168. if nan_values.any() or df.empty:
  169. current_app.logger.warning(
  170. f"Assuming zero power values for (partially) unknown power values for planning window. (sensor {sensor.id})"
  171. )
  172. df = df.fillna(0)
  173. series = convert_units(df.values, sensor.unit, "MW")
  174. if sensor.get_attribute(
  175. "consumption_is_positive", False
  176. ): # FlexMeasures default is to store consumption as negative power values
  177. return series
  178. return -series
  179. def fallback_charging_policy(
  180. sensor: Sensor,
  181. device_constraints: pd.DataFrame,
  182. start: datetime,
  183. end: datetime,
  184. resolution: timedelta,
  185. ) -> pd.Series:
  186. """This fallback charging policy is to just start charging or discharging, or do neither,
  187. depending on the first target state of charge and the capabilities of the Charge Point.
  188. Note that this ignores any cause of the infeasibility and,
  189. while probably a decent policy for Charge Points,
  190. should not be considered a robust policy for other asset types.
  191. """
  192. max_charge_capacity = (
  193. device_constraints[["derivative max", "derivative equals"]].min().min()
  194. )
  195. max_discharge_capacity = (
  196. -device_constraints[["derivative min", "derivative equals"]].max().max()
  197. )
  198. charge_power = max_charge_capacity if sensor.get_attribute("is_consumer") else 0
  199. discharge_power = (
  200. -max_discharge_capacity if sensor.get_attribute("is_producer") else 0
  201. )
  202. charge_schedule = initialize_series(charge_power, start, end, resolution)
  203. discharge_schedule = initialize_series(discharge_power, start, end, resolution)
  204. idle_schedule = initialize_series(0, start, end, resolution)
  205. if (
  206. device_constraints["equals"].first_valid_index() is not None
  207. and device_constraints["equals"][
  208. device_constraints["equals"].first_valid_index()
  209. ]
  210. > 0
  211. ):
  212. # start charging to get as close as possible to the next target
  213. return idle_after_reaching_target(charge_schedule, device_constraints["equals"])
  214. if (
  215. device_constraints["equals"].first_valid_index() is not None
  216. and device_constraints["equals"][
  217. device_constraints["equals"].first_valid_index()
  218. ]
  219. < 0
  220. ):
  221. # start discharging to get as close as possible to the next target
  222. return idle_after_reaching_target(
  223. discharge_schedule, device_constraints["equals"]
  224. )
  225. if (
  226. device_constraints["max"].first_valid_index() is not None
  227. and device_constraints["max"][device_constraints["max"].first_valid_index()] < 0
  228. ):
  229. # start discharging to try and bring back the soc below the next max constraint
  230. return idle_after_reaching_target(discharge_schedule, device_constraints["max"])
  231. if (
  232. device_constraints["min"].first_valid_index() is not None
  233. and device_constraints["min"][device_constraints["min"].first_valid_index()] > 0
  234. ):
  235. # start charging to try and bring back the soc above the next min constraint
  236. return idle_after_reaching_target(charge_schedule, device_constraints["min"])
  237. # stand idle
  238. return idle_schedule
  239. def idle_after_reaching_target(
  240. schedule: pd.Series,
  241. target: pd.Series,
  242. initial_state: float = 0,
  243. ) -> pd.Series:
  244. """Stop planned (dis)charging after target is reached (or constraint is met)."""
  245. first_target = target[target.first_valid_index()]
  246. if first_target > initial_state:
  247. schedule[schedule.cumsum() > first_target] = 0
  248. else:
  249. schedule[schedule.cumsum() < first_target] = 0
  250. return schedule
  251. def get_quantity_from_attribute(
  252. entity: Asset | Sensor,
  253. attribute: str | None,
  254. unit: str | ur.Quantity,
  255. ) -> ur.Quantity:
  256. """Get the value (in the given unit) of a quantity stored as an entity attribute.
  257. :param entity: The entity (sensor or asset) containing the attribute to retrieve the value from.
  258. :param attribute: The attribute name to extract the value from.
  259. :param unit: The unit in which the value should be returned.
  260. :return: The retrieved quantity or the provided default.
  261. """
  262. if attribute is None:
  263. return np.nan * ur.Quantity(unit) # at least return result in the desired unit
  264. # Get the default value from the entity attribute
  265. value: str | float | int = entity.get_attribute(attribute, np.nan)
  266. # Try to convert it to a quantity in the desired unit
  267. try:
  268. q = ur.Quantity(value)
  269. q = q.to(unit)
  270. except (UndefinedUnitError, DimensionalityError, ValueError, AssertionError):
  271. try:
  272. # Fall back to interpreting the value in the given unit
  273. q = ur.Quantity(f"{value} {unit}")
  274. q = q.to(unit)
  275. except (UndefinedUnitError, DimensionalityError, ValueError, AssertionError):
  276. current_app.logger.warning(f"Couldn't convert {value} to `{unit}`")
  277. q = np.nan * ur.Quantity(unit) # at least return result in the desired unit
  278. return q
  279. def get_series_from_quantity_or_sensor(
  280. variable_quantity: Sensor | list[dict] | ur.Quantity,
  281. unit: ur.Quantity | str,
  282. query_window: tuple[datetime, datetime],
  283. resolution: timedelta,
  284. beliefs_before: datetime | None = None,
  285. as_instantaneous_events: bool = True,
  286. resolve_overlaps: str = "first",
  287. fill_sides: bool = False,
  288. ) -> pd.Series:
  289. """
  290. Get a time series given a quantity or sensor defined on a time window.
  291. :param variable_quantity: Variable quantity measuring e.g. power capacity or efficiency.
  292. One of the following types:
  293. - a timely-beliefs Sensor recording the data
  294. - a list of dictionaries representing a time series specification
  295. - a pint Quantity representing a fixed quantity
  296. :param unit: Unit of the output data.
  297. :param query_window: Tuple representing the start and end of the requested data.
  298. :param resolution: Time resolution of the requested data.
  299. :param beliefs_before: Optional datetime used to indicate we are interested in the state of knowledge
  300. at that time.
  301. :param as_instantaneous_events: Optionally, convert to instantaneous events, in which case the passed resolution is
  302. interpreted as the desired frequency of the data.
  303. :param resolve_overlaps: If time series segments overlap (e.g. when upsampling to instantaneous events),
  304. take the 'max', 'min' or 'first' value during overlapping time spans
  305. (or at instantaneous moments, such as at event boundaries).
  306. :param fill_sides If True, values are extended to the edges of the query window:
  307. - The first available value serves as a naive backcast.
  308. - The last available value serves as a naive forecast.
  309. :return: Pandas Series with the requested time series data.
  310. """
  311. start, end = query_window
  312. index = initialize_index(start=start, end=end, resolution=resolution)
  313. if isinstance(variable_quantity, ur.Quantity):
  314. if np.isnan(variable_quantity.magnitude):
  315. magnitude = np.nan
  316. else:
  317. magnitude = convert_units(
  318. variable_quantity.magnitude,
  319. str(variable_quantity.units),
  320. unit,
  321. resolution,
  322. )
  323. time_series = pd.Series(magnitude, index=index, name="event_value")
  324. elif isinstance(variable_quantity, Sensor):
  325. bdf: tb.BeliefsDataFrame = TimedBelief.search(
  326. variable_quantity,
  327. event_starts_after=query_window[0],
  328. event_ends_before=query_window[1],
  329. resolution=resolution,
  330. # frequency=resolution,
  331. beliefs_before=beliefs_before,
  332. most_recent_beliefs_only=True,
  333. one_deterministic_belief_per_event=True,
  334. )
  335. if as_instantaneous_events:
  336. bdf = bdf.resample_events(timedelta(0), boundary_policy=resolve_overlaps)
  337. time_series = simplify_index(bdf).reindex(index).squeeze()
  338. time_series = convert_units(
  339. time_series, variable_quantity.unit, unit, resolution
  340. )
  341. elif isinstance(variable_quantity, list):
  342. time_series = process_time_series_segments(
  343. index=index,
  344. variable_quantity=variable_quantity,
  345. unit=unit,
  346. resolution=resolution if not as_instantaneous_events else timedelta(0),
  347. resolve_overlaps=resolve_overlaps,
  348. fill_sides=fill_sides,
  349. )
  350. else:
  351. raise TypeError(
  352. f"quantity_or_sensor {variable_quantity} should be a pint Quantity or timely-beliefs Sensor"
  353. )
  354. return time_series
  355. def process_time_series_segments(
  356. index: pd.DatetimeIndex,
  357. variable_quantity: list[dict],
  358. unit: str,
  359. resolution: timedelta,
  360. resolve_overlaps: str,
  361. fill_sides: bool = False,
  362. ) -> pd.Series:
  363. """
  364. Process a time series defined by a list of dicts, while resolving overlapping segments.
  365. Parameters:
  366. index: The index for the time series DataFrame.
  367. variable_quantity: List of events, where each event is a dictionary containing:
  368. - 'value': The value of the event (can be a Quantity or scalar).
  369. - 'start': The start datetime of the event.
  370. - 'end': The end datetime of the event.
  371. unit: The unit to convert the value into if it's a Quantity.
  372. resolution: The resolution to subtract from the 'end' to avoid overlap.
  373. resolve_overlaps: How to handle overlaps (e.g., 'first', 'last', 'mean', etc.).
  374. fill_sides: Whether to extend values to cover the whole index.
  375. Returns: A time series with resolved event values.
  376. """
  377. # Initialize a DataFrame to hold the segments
  378. time_series_segments = pd.DataFrame(
  379. np.nan, index=index, columns=list(range(len(variable_quantity)))
  380. )
  381. # Fill in the DataFrame with event values
  382. for segment, event in enumerate(variable_quantity):
  383. value = event["value"]
  384. if isinstance(value, ur.Quantity):
  385. # Convert value to the specified unit if it's a Quantity
  386. if np.isnan(value.magnitude):
  387. value = np.nan
  388. else:
  389. value = convert_units(
  390. value.magnitude, str(value.units), unit, resolution
  391. )
  392. start = event["start"]
  393. end = event["end"]
  394. # Assign the value to the corresponding segment in the DataFrame
  395. time_series_segments.loc[start : end - resolution, segment] = value
  396. # Resolve overlaps using the specified method
  397. if resolve_overlaps == "first":
  398. # Use backfill to fill NaNs with the first non-NaN value
  399. time_series = time_series_segments.fillna(method="bfill", axis=1).iloc[:, 0]
  400. else:
  401. # Use the specified method to resolve overlaps (e.g., mean, max)
  402. time_series = getattr(time_series_segments, resolve_overlaps)(axis=1)
  403. if fill_sides:
  404. time_series = extend_to_edges(
  405. df=time_series,
  406. query_window=(index[0], index[-1] + resolution),
  407. resolution=resolution,
  408. )
  409. return time_series.rename("event_value")
  410. def get_continuous_series_sensor_or_quantity(
  411. variable_quantity: Sensor | list[dict] | ur.Quantity | pd.Series | None,
  412. actuator: Sensor | Asset,
  413. unit: ur.Quantity | str,
  414. query_window: tuple[datetime, datetime],
  415. resolution: timedelta,
  416. beliefs_before: datetime | None = None,
  417. fallback_attribute: str | None = None,
  418. min_value: float | int = np.nan,
  419. max_value: float | int | pd.Series = np.nan,
  420. as_instantaneous_events: bool = False,
  421. resolve_overlaps: str = "first",
  422. fill_sides: bool = False,
  423. ) -> pd.Series:
  424. """Creates a time series from a sensor, time series specification, or quantity within a specified window,
  425. falling back to a given `fallback_attribute` and making sure values stay within the domain [min_value, max_value].
  426. :param variable_quantity: A sensor recording the data, a time series specification or a fixed quantity.
  427. :param actuator: The actuator from which relevant defaults are retrieved.
  428. :param unit: The desired unit of the data.
  429. :param query_window: The time window (start, end) to query the data.
  430. :param resolution: The resolution or time interval for the data.
  431. :param beliefs_before: Timestamp for prior beliefs or knowledge.
  432. :param fallback_attribute: Attribute serving as a fallback default in case no quantity or sensor is given.
  433. :param min_value: Minimum value.
  434. :param max_value: Maximum value (also replacing NaN values).
  435. :param as_instantaneous_events: optionally, convert to instantaneous events, in which case the passed resolution is
  436. interpreted as the desired frequency of the data.
  437. :param resolve_overlaps: If time series segments overlap (e.g. when upsampling to instantaneous events),
  438. take the 'max', 'min' or 'first' value during overlapping time spans
  439. (or at instantaneous moments, such as at event boundaries).
  440. :param fill_sides If True, values are extended to the edges of the query window:
  441. - The first available value serves as a naive backcast.
  442. - The last available value serves as a naive forecast.
  443. :returns: time series data with missing values handled based on the chosen method.
  444. """
  445. if isinstance(variable_quantity, pd.Series):
  446. return variable_quantity
  447. if variable_quantity is None:
  448. variable_quantity = get_quantity_from_attribute(
  449. entity=actuator,
  450. attribute=fallback_attribute,
  451. unit=unit,
  452. )
  453. time_series = get_series_from_quantity_or_sensor(
  454. variable_quantity=variable_quantity,
  455. unit=unit,
  456. query_window=query_window,
  457. resolution=resolution,
  458. beliefs_before=beliefs_before,
  459. as_instantaneous_events=as_instantaneous_events,
  460. resolve_overlaps=resolve_overlaps,
  461. fill_sides=fill_sides,
  462. )
  463. # Apply upper limit
  464. time_series = nanmin_of_series_and_value(time_series, max_value)
  465. # Apply lower limit
  466. time_series = time_series.clip(lower=min_value)
  467. return time_series
  468. def nanmin_of_series_and_value(s: pd.Series, value: float | pd.Series) -> pd.Series:
  469. """Perform a nanmin between a Series and a float."""
  470. if isinstance(value, pd.Series):
  471. # Avoid strange InvalidIndexError on .clip due to different "dtype"
  472. # pd.testing.assert_index_equal(value.index, s.index)
  473. # [left]: datetime64[ns, +0000]
  474. # [right]: datetime64[ns, UTC]
  475. value = value.tz_convert("UTC")
  476. return s.fillna(value).clip(upper=value)
  477. def initialize_energy_commitment(
  478. start: pd.Timestamp,
  479. end: pd.Timestamp,
  480. resolution: timedelta,
  481. market_prices: list[float],
  482. ) -> pd.DataFrame:
  483. """Model energy contract for the site."""
  484. commitment = initialize_df(
  485. columns=[
  486. "quantity",
  487. "downwards deviation price",
  488. "upwards deviation price",
  489. "group",
  490. ],
  491. start=start,
  492. end=end,
  493. resolution=resolution,
  494. )
  495. commitment["quantity"] = 0
  496. commitment["downwards deviation price"] = market_prices
  497. commitment["upwards deviation price"] = market_prices
  498. commitment["group"] = list(range(len(commitment)))
  499. return commitment
  500. def initialize_device_commitment(
  501. start: pd.Timestamp,
  502. end: pd.Timestamp,
  503. resolution: timedelta,
  504. device: int,
  505. target_datetime: str,
  506. target_value: float,
  507. soc_at_start: float,
  508. soc_target_penalty: float,
  509. ) -> pd.DataFrame:
  510. """Model penalties for demand unmet per device."""
  511. stock_commitment = initialize_df(
  512. columns=[
  513. "quantity",
  514. "downwards deviation price",
  515. "upwards deviation price",
  516. "group",
  517. ],
  518. start=start,
  519. end=end,
  520. resolution=resolution,
  521. )
  522. stock_commitment.loc[target_datetime, "quantity"] = target_value - soc_at_start
  523. stock_commitment["downwards deviation price"] = -soc_target_penalty
  524. stock_commitment["upwards deviation price"] = soc_target_penalty
  525. stock_commitment["group"] = list(range(len(stock_commitment)))
  526. stock_commitment["device"] = device
  527. stock_commitment["class"] = StockCommitment
  528. return stock_commitment