123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591 |
- from __future__ import annotations
- from packaging import version
- from datetime import date, datetime, timedelta
- from flask import current_app
- import pandas as pd
- from pandas.tseries.frequencies import to_offset
- import numpy as np
- import timely_beliefs as tb
- from flexmeasures.data.models.planning.exceptions import UnknownPricesException
- from flexmeasures.data.models.time_series import Sensor, TimedBelief
- from flexmeasures import Asset
- from flexmeasures.data.models.planning import StockCommitment
- from flexmeasures.data.queries.utils import simplify_index
- from flexmeasures.utils.flexmeasures_inflection import capitalize, pluralize
- from flexmeasures.utils.unit_utils import ur, convert_units
- from pint.errors import UndefinedUnitError, DimensionalityError
- def initialize_df(
- columns: list[str],
- start: datetime,
- end: datetime,
- resolution: timedelta,
- inclusive: str = "left",
- ) -> pd.DataFrame:
- df = pd.DataFrame(
- index=initialize_index(start, end, resolution, inclusive), columns=columns
- )
- return df
- def initialize_series(
- data: pd.Series | list[float] | np.ndarray | float | None,
- start: datetime,
- end: datetime,
- resolution: timedelta,
- inclusive: str = "left",
- ) -> pd.Series:
- s = pd.Series(index=initialize_index(start, end, resolution, inclusive), data=data)
- return s
- def initialize_index(
- start: date | datetime | str,
- end: date | datetime | str,
- resolution: timedelta | str,
- inclusive: str = "left",
- ) -> pd.DatetimeIndex:
- if version.parse(pd.__version__) >= version.parse("1.4.0"):
- return pd.date_range(
- start=start,
- end=end,
- freq=to_offset(resolution),
- inclusive=inclusive,
- name="datetime",
- )
- else:
- return pd.date_range(
- start=start,
- end=end,
- freq=to_offset(resolution),
- closed=inclusive,
- name="datetime",
- )
- def add_tiny_price_slope(
- orig_prices: pd.DataFrame, col_name: str = "event_value", d: float = 10**-3
- ) -> pd.DataFrame:
- """Add tiny price slope to col_name to represent e.g. inflation as a simple linear price increase.
- This is meant to break ties, when multiple time slots have equal prices, in favour of acting sooner.
- We penalise the future with at most d times the price spread (1 per thousand by default).
- """
- prices = orig_prices.copy()
- price_spread = prices[col_name].max() - prices[col_name].min()
- if price_spread > 0:
- max_penalty = price_spread * d
- else:
- max_penalty = d
- prices[col_name] = prices[col_name] + np.linspace(
- 0, max_penalty, prices[col_name].size
- )
- return prices
- def ensure_prices_are_not_empty(
- prices: pd.DataFrame,
- price_variable_quantity: Sensor | list[dict] | ur.Quantity | None,
- ):
- if prices.isnull().values.all() or prices.empty:
- error_message = "Prices unknown for planning window."
- if isinstance(price_variable_quantity, Sensor):
- error_message += f" (sensor {price_variable_quantity.id})"
- raise UnknownPricesException(error_message)
- def extend_to_edges(
- df: pd.DataFrame | pd.Series,
- query_window: tuple[datetime, datetime],
- resolution: timedelta,
- kind_of_values: str = "price",
- sensor: Sensor = None,
- allow_trimmed_query_window: bool = False,
- ):
- """Values are extended to the edges of the query window.
- - The first available value serves as a naive backcasts.
- - The last available value serves as a naive forecast.
- """
- nan_values = df.isnull().values
- if (
- nan_values.any()
- or pd.Timestamp(df.index[0]).tz_convert("UTC")
- != pd.Timestamp(query_window[0]).tz_convert("UTC")
- or pd.Timestamp(df.index[-1]).tz_convert("UTC") + resolution
- != pd.Timestamp(query_window[-1]).tz_convert("UTC")
- ):
- if allow_trimmed_query_window:
- first_event_start = df.first_valid_index()
- last_event_end = df.last_valid_index() + resolution
- sensor_info = (
- f" (sensor {sensor.id})" if sensor and hasattr(sensor, "id") else ""
- )
- current_app.logger.warning(
- f"Prices partially unknown for planning window ({sensor_info}). "
- f"Trimming planning window (from {query_window[0]} until {query_window[-1]}) to {first_event_start} until {last_event_end}."
- )
- else:
- sensor_info = (
- f" (sensor {df.sensor.id})"
- if hasattr(df, "sensor") and hasattr(df.sensor, "id")
- else ""
- )
- current_app.logger.warning(
- f"{capitalize(pluralize(kind_of_values))} partially unknown for planning window ({sensor_info}). "
- f"Assuming the first {kind_of_values} is valid from the start of the planning window ({query_window[0]}), "
- f"and the last {kind_of_values} is valid until the end of the planning window ({query_window[-1]})."
- )
- index = initialize_index(
- start=query_window[0],
- end=query_window[1],
- resolution=resolution,
- )
- df = df.reindex(index)
- # or to also forward fill intermediate NaN values, use: df = df.ffill().bfill()
- df[: df.first_valid_index()] = df[
- df.index == df.first_valid_index()
- ].values[0]
- df[df.last_valid_index() :] = df[df.index == df.last_valid_index()].values[
- 0
- ]
- return df
- def get_power_values(
- query_window: tuple[datetime, datetime],
- resolution: timedelta,
- beliefs_before: datetime | None,
- sensor: Sensor,
- ) -> np.ndarray:
- """Get measurements or forecasts of an inflexible device represented by a power or energy sensor as an array of power values in MW.
- If the requested schedule lies in the future, the returned data will consist of (the most recent) forecasts (if any exist).
- If the requested schedule lies in the past, the returned data will consist of (the most recent) measurements (if any exist).
- The latter amounts to answering "What if we could have scheduled under perfect foresight?".
- :param query_window: datetime window within which events occur (equal to the scheduling window)
- :param resolution: timedelta used to resample the forecasts to the resolution of the schedule
- :param beliefs_before: datetime used to indicate we are interested in the state of knowledge at that time
- :param sensor: power sensor representing an energy flow out of the device
- :returns: power measurements or forecasts (consumption is positive, production is negative)
- """
- bdf: tb.BeliefsDataFrame = TimedBelief.search(
- sensor,
- event_starts_after=query_window[0],
- event_ends_before=query_window[1],
- resolution=to_offset(resolution).freqstr,
- beliefs_before=beliefs_before,
- most_recent_beliefs_only=True,
- one_deterministic_belief_per_event=True,
- ) # consumption is negative, production is positive
- df = simplify_index(bdf)
- df = df.reindex(initialize_index(query_window[0], query_window[1], resolution))
- nan_values = df.isnull().values
- if nan_values.any() or df.empty:
- current_app.logger.warning(
- f"Assuming zero power values for (partially) unknown power values for planning window. (sensor {sensor.id})"
- )
- df = df.fillna(0)
- series = convert_units(df.values, sensor.unit, "MW")
- if sensor.get_attribute(
- "consumption_is_positive", False
- ): # FlexMeasures default is to store consumption as negative power values
- return series
- return -series
- def fallback_charging_policy(
- sensor: Sensor,
- device_constraints: pd.DataFrame,
- start: datetime,
- end: datetime,
- resolution: timedelta,
- ) -> pd.Series:
- """This fallback charging policy is to just start charging or discharging, or do neither,
- depending on the first target state of charge and the capabilities of the Charge Point.
- Note that this ignores any cause of the infeasibility and,
- while probably a decent policy for Charge Points,
- should not be considered a robust policy for other asset types.
- """
- max_charge_capacity = (
- device_constraints[["derivative max", "derivative equals"]].min().min()
- )
- max_discharge_capacity = (
- -device_constraints[["derivative min", "derivative equals"]].max().max()
- )
- charge_power = max_charge_capacity if sensor.get_attribute("is_consumer") else 0
- discharge_power = (
- -max_discharge_capacity if sensor.get_attribute("is_producer") else 0
- )
- charge_schedule = initialize_series(charge_power, start, end, resolution)
- discharge_schedule = initialize_series(discharge_power, start, end, resolution)
- idle_schedule = initialize_series(0, start, end, resolution)
- if (
- device_constraints["equals"].first_valid_index() is not None
- and device_constraints["equals"][
- device_constraints["equals"].first_valid_index()
- ]
- > 0
- ):
- # start charging to get as close as possible to the next target
- return idle_after_reaching_target(charge_schedule, device_constraints["equals"])
- if (
- device_constraints["equals"].first_valid_index() is not None
- and device_constraints["equals"][
- device_constraints["equals"].first_valid_index()
- ]
- < 0
- ):
- # start discharging to get as close as possible to the next target
- return idle_after_reaching_target(
- discharge_schedule, device_constraints["equals"]
- )
- if (
- device_constraints["max"].first_valid_index() is not None
- and device_constraints["max"][device_constraints["max"].first_valid_index()] < 0
- ):
- # start discharging to try and bring back the soc below the next max constraint
- return idle_after_reaching_target(discharge_schedule, device_constraints["max"])
- if (
- device_constraints["min"].first_valid_index() is not None
- and device_constraints["min"][device_constraints["min"].first_valid_index()] > 0
- ):
- # start charging to try and bring back the soc above the next min constraint
- return idle_after_reaching_target(charge_schedule, device_constraints["min"])
- # stand idle
- return idle_schedule
- def idle_after_reaching_target(
- schedule: pd.Series,
- target: pd.Series,
- initial_state: float = 0,
- ) -> pd.Series:
- """Stop planned (dis)charging after target is reached (or constraint is met)."""
- first_target = target[target.first_valid_index()]
- if first_target > initial_state:
- schedule[schedule.cumsum() > first_target] = 0
- else:
- schedule[schedule.cumsum() < first_target] = 0
- return schedule
- def get_quantity_from_attribute(
- entity: Asset | Sensor,
- attribute: str | None,
- unit: str | ur.Quantity,
- ) -> ur.Quantity:
- """Get the value (in the given unit) of a quantity stored as an entity attribute.
- :param entity: The entity (sensor or asset) containing the attribute to retrieve the value from.
- :param attribute: The attribute name to extract the value from.
- :param unit: The unit in which the value should be returned.
- :return: The retrieved quantity or the provided default.
- """
- if attribute is None:
- return np.nan * ur.Quantity(unit) # at least return result in the desired unit
- # Get the default value from the entity attribute
- value: str | float | int = entity.get_attribute(attribute, np.nan)
- # Try to convert it to a quantity in the desired unit
- try:
- q = ur.Quantity(value)
- q = q.to(unit)
- except (UndefinedUnitError, DimensionalityError, ValueError, AssertionError):
- try:
- # Fall back to interpreting the value in the given unit
- q = ur.Quantity(f"{value} {unit}")
- q = q.to(unit)
- except (UndefinedUnitError, DimensionalityError, ValueError, AssertionError):
- current_app.logger.warning(f"Couldn't convert {value} to `{unit}`")
- q = np.nan * ur.Quantity(unit) # at least return result in the desired unit
- return q
- def get_series_from_quantity_or_sensor(
- variable_quantity: Sensor | list[dict] | ur.Quantity,
- unit: ur.Quantity | str,
- query_window: tuple[datetime, datetime],
- resolution: timedelta,
- beliefs_before: datetime | None = None,
- as_instantaneous_events: bool = True,
- resolve_overlaps: str = "first",
- fill_sides: bool = False,
- ) -> pd.Series:
- """
- Get a time series given a quantity or sensor defined on a time window.
- :param variable_quantity: Variable quantity measuring e.g. power capacity or efficiency.
- One of the following types:
- - a timely-beliefs Sensor recording the data
- - a list of dictionaries representing a time series specification
- - a pint Quantity representing a fixed quantity
- :param unit: Unit of the output data.
- :param query_window: Tuple representing the start and end of the requested data.
- :param resolution: Time resolution of the requested data.
- :param beliefs_before: Optional datetime used to indicate we are interested in the state of knowledge
- at that time.
- :param as_instantaneous_events: Optionally, convert to instantaneous events, in which case the passed resolution is
- interpreted as the desired frequency of the data.
- :param resolve_overlaps: If time series segments overlap (e.g. when upsampling to instantaneous events),
- take the 'max', 'min' or 'first' value during overlapping time spans
- (or at instantaneous moments, such as at event boundaries).
- :param fill_sides If True, values are extended to the edges of the query window:
- - The first available value serves as a naive backcast.
- - The last available value serves as a naive forecast.
- :return: Pandas Series with the requested time series data.
- """
- start, end = query_window
- index = initialize_index(start=start, end=end, resolution=resolution)
- if isinstance(variable_quantity, ur.Quantity):
- if np.isnan(variable_quantity.magnitude):
- magnitude = np.nan
- else:
- magnitude = convert_units(
- variable_quantity.magnitude,
- str(variable_quantity.units),
- unit,
- resolution,
- )
- time_series = pd.Series(magnitude, index=index, name="event_value")
- elif isinstance(variable_quantity, Sensor):
- bdf: tb.BeliefsDataFrame = TimedBelief.search(
- variable_quantity,
- event_starts_after=query_window[0],
- event_ends_before=query_window[1],
- resolution=resolution,
- # frequency=resolution,
- beliefs_before=beliefs_before,
- most_recent_beliefs_only=True,
- one_deterministic_belief_per_event=True,
- )
- if as_instantaneous_events:
- bdf = bdf.resample_events(timedelta(0), boundary_policy=resolve_overlaps)
- time_series = simplify_index(bdf).reindex(index).squeeze()
- time_series = convert_units(
- time_series, variable_quantity.unit, unit, resolution
- )
- elif isinstance(variable_quantity, list):
- time_series = process_time_series_segments(
- index=index,
- variable_quantity=variable_quantity,
- unit=unit,
- resolution=resolution if not as_instantaneous_events else timedelta(0),
- resolve_overlaps=resolve_overlaps,
- fill_sides=fill_sides,
- )
- else:
- raise TypeError(
- f"quantity_or_sensor {variable_quantity} should be a pint Quantity or timely-beliefs Sensor"
- )
- return time_series
- def process_time_series_segments(
- index: pd.DatetimeIndex,
- variable_quantity: list[dict],
- unit: str,
- resolution: timedelta,
- resolve_overlaps: str,
- fill_sides: bool = False,
- ) -> pd.Series:
- """
- Process a time series defined by a list of dicts, while resolving overlapping segments.
- Parameters:
- index: The index for the time series DataFrame.
- variable_quantity: List of events, where each event is a dictionary containing:
- - 'value': The value of the event (can be a Quantity or scalar).
- - 'start': The start datetime of the event.
- - 'end': The end datetime of the event.
- unit: The unit to convert the value into if it's a Quantity.
- resolution: The resolution to subtract from the 'end' to avoid overlap.
- resolve_overlaps: How to handle overlaps (e.g., 'first', 'last', 'mean', etc.).
- fill_sides: Whether to extend values to cover the whole index.
- Returns: A time series with resolved event values.
- """
- # Initialize a DataFrame to hold the segments
- time_series_segments = pd.DataFrame(
- np.nan, index=index, columns=list(range(len(variable_quantity)))
- )
- # Fill in the DataFrame with event values
- for segment, event in enumerate(variable_quantity):
- value = event["value"]
- if isinstance(value, ur.Quantity):
- # Convert value to the specified unit if it's a Quantity
- if np.isnan(value.magnitude):
- value = np.nan
- else:
- value = convert_units(
- value.magnitude, str(value.units), unit, resolution
- )
- start = event["start"]
- end = event["end"]
- # Assign the value to the corresponding segment in the DataFrame
- time_series_segments.loc[start : end - resolution, segment] = value
- # Resolve overlaps using the specified method
- if resolve_overlaps == "first":
- # Use backfill to fill NaNs with the first non-NaN value
- time_series = time_series_segments.fillna(method="bfill", axis=1).iloc[:, 0]
- else:
- # Use the specified method to resolve overlaps (e.g., mean, max)
- time_series = getattr(time_series_segments, resolve_overlaps)(axis=1)
- if fill_sides:
- time_series = extend_to_edges(
- df=time_series,
- query_window=(index[0], index[-1] + resolution),
- resolution=resolution,
- )
- return time_series.rename("event_value")
- def get_continuous_series_sensor_or_quantity(
- variable_quantity: Sensor | list[dict] | ur.Quantity | pd.Series | None,
- actuator: Sensor | Asset,
- unit: ur.Quantity | str,
- query_window: tuple[datetime, datetime],
- resolution: timedelta,
- beliefs_before: datetime | None = None,
- fallback_attribute: str | None = None,
- min_value: float | int = np.nan,
- max_value: float | int | pd.Series = np.nan,
- as_instantaneous_events: bool = False,
- resolve_overlaps: str = "first",
- fill_sides: bool = False,
- ) -> pd.Series:
- """Creates a time series from a sensor, time series specification, or quantity within a specified window,
- falling back to a given `fallback_attribute` and making sure values stay within the domain [min_value, max_value].
- :param variable_quantity: A sensor recording the data, a time series specification or a fixed quantity.
- :param actuator: The actuator from which relevant defaults are retrieved.
- :param unit: The desired unit of the data.
- :param query_window: The time window (start, end) to query the data.
- :param resolution: The resolution or time interval for the data.
- :param beliefs_before: Timestamp for prior beliefs or knowledge.
- :param fallback_attribute: Attribute serving as a fallback default in case no quantity or sensor is given.
- :param min_value: Minimum value.
- :param max_value: Maximum value (also replacing NaN values).
- :param as_instantaneous_events: optionally, convert to instantaneous events, in which case the passed resolution is
- interpreted as the desired frequency of the data.
- :param resolve_overlaps: If time series segments overlap (e.g. when upsampling to instantaneous events),
- take the 'max', 'min' or 'first' value during overlapping time spans
- (or at instantaneous moments, such as at event boundaries).
- :param fill_sides If True, values are extended to the edges of the query window:
- - The first available value serves as a naive backcast.
- - The last available value serves as a naive forecast.
- :returns: time series data with missing values handled based on the chosen method.
- """
- if isinstance(variable_quantity, pd.Series):
- return variable_quantity
- if variable_quantity is None:
- variable_quantity = get_quantity_from_attribute(
- entity=actuator,
- attribute=fallback_attribute,
- unit=unit,
- )
- time_series = get_series_from_quantity_or_sensor(
- variable_quantity=variable_quantity,
- unit=unit,
- query_window=query_window,
- resolution=resolution,
- beliefs_before=beliefs_before,
- as_instantaneous_events=as_instantaneous_events,
- resolve_overlaps=resolve_overlaps,
- fill_sides=fill_sides,
- )
- # Apply upper limit
- time_series = nanmin_of_series_and_value(time_series, max_value)
- # Apply lower limit
- time_series = time_series.clip(lower=min_value)
- return time_series
- def nanmin_of_series_and_value(s: pd.Series, value: float | pd.Series) -> pd.Series:
- """Perform a nanmin between a Series and a float."""
- if isinstance(value, pd.Series):
- # Avoid strange InvalidIndexError on .clip due to different "dtype"
- # pd.testing.assert_index_equal(value.index, s.index)
- # [left]: datetime64[ns, +0000]
- # [right]: datetime64[ns, UTC]
- value = value.tz_convert("UTC")
- return s.fillna(value).clip(upper=value)
- def initialize_energy_commitment(
- start: pd.Timestamp,
- end: pd.Timestamp,
- resolution: timedelta,
- market_prices: list[float],
- ) -> pd.DataFrame:
- """Model energy contract for the site."""
- commitment = initialize_df(
- columns=[
- "quantity",
- "downwards deviation price",
- "upwards deviation price",
- "group",
- ],
- start=start,
- end=end,
- resolution=resolution,
- )
- commitment["quantity"] = 0
- commitment["downwards deviation price"] = market_prices
- commitment["upwards deviation price"] = market_prices
- commitment["group"] = list(range(len(commitment)))
- return commitment
- def initialize_device_commitment(
- start: pd.Timestamp,
- end: pd.Timestamp,
- resolution: timedelta,
- device: int,
- target_datetime: str,
- target_value: float,
- soc_at_start: float,
- soc_target_penalty: float,
- ) -> pd.DataFrame:
- """Model penalties for demand unmet per device."""
- stock_commitment = initialize_df(
- columns=[
- "quantity",
- "downwards deviation price",
- "upwards deviation price",
- "group",
- ],
- start=start,
- end=end,
- resolution=resolution,
- )
- stock_commitment.loc[target_datetime, "quantity"] = target_value - soc_at_start
- stock_commitment["downwards deviation price"] = -soc_target_penalty
- stock_commitment["upwards deviation price"] = soc_target_penalty
- stock_commitment["group"] = list(range(len(stock_commitment)))
- stock_commitment["device"] = device
- stock_commitment["class"] = StockCommitment
- return stock_commitment
|