123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630 |
- from __future__ import annotations
- from flask import current_app
- import pandas as pd
- import numpy as np
- from pandas.tseries.frequencies import to_offset
- from pyomo.core import (
- ConcreteModel,
- Var,
- RangeSet,
- Set,
- Param,
- Reals,
- NonNegativeReals,
- NonPositiveReals,
- Binary,
- Constraint,
- Objective,
- minimize,
- )
- from pyomo.environ import UnknownSolver # noqa F401
- from pyomo.environ import value
- from pyomo.opt import SolverFactory, SolverResults
- from flexmeasures.data.models.planning import (
- Commitment,
- FlowCommitment,
- StockCommitment,
- )
- from flexmeasures.data.models.planning.utils import initialize_series, initialize_df
- from flexmeasures.utils.calculations import apply_stock_changes_and_losses
- infinity = float("inf")
- def device_scheduler( # noqa C901
- device_constraints: list[pd.DataFrame],
- ems_constraints: pd.DataFrame,
- commitment_quantities: list[pd.Series] | None = None,
- commitment_downwards_deviation_price: list[pd.Series] | list[float] | None = None,
- commitment_upwards_deviation_price: list[pd.Series] | list[float] | None = None,
- commitments: list[pd.DataFrame] | list[Commitment] | None = None,
- initial_stock: float | list[float] = 0,
- ) -> tuple[list[pd.Series], float, SolverResults, ConcreteModel]:
- """This generic device scheduler is able to handle an EMS with multiple devices,
- with various types of constraints on the EMS level and on the device level,
- and with multiple market commitments on the EMS level.
- A typical example is a house with many devices.
- The commitments are assumed to be with regard to the flow of energy to the device (positive for consumption,
- negative for production). The solver minimises the costs of deviating from the commitments.
- :param device_constraints: Device constraints are on a device level. Handled constraints (listed by column name):
- max: maximum stock assuming an initial stock of zero (e.g. in MWh or boxes)
- min: minimum stock assuming an initial stock of zero
- equal: exact amount of stock (we do this by clamping min and max)
- efficiency: amount of stock left at the next datetime (the rest is lost)
- derivative max: maximum flow (e.g. in MW or boxes/h)
- derivative min: minimum flow
- derivative equals: exact amount of flow (we do this by clamping derivative min and derivative max)
- derivative down efficiency: conversion efficiency of flow out of a device (flow out : stock decrease)
- derivative up efficiency: conversion efficiency of flow into a device (stock increase : flow in)
- stock delta: predefined stock delta to apply to the storage device. Positive values cause an increase and negative values a decrease
- :param ems_constraints: EMS constraints are on an EMS level. Handled constraints (listed by column name):
- derivative max: maximum flow
- derivative min: minimum flow
- :param commitments: Commitments are on an EMS level by default. Handled parameters (listed by column name):
- quantity: for example, 5.5
- downwards deviation price: 10.1
- upwards deviation price: 10.2
- group: 1 (defaults to the enumerate time step j)
- device: 0 (corresponds to device d; if not set, commitment is on an EMS level)
- :param initial_stock: initial stock for each device. Use a list with the same number of devices as device_constraints,
- or use a single value to set the initial stock to be the same for all devices.
- Potentially deprecated arguments:
- commitment_quantities: amounts of flow specified in commitments (both previously ordered and newly requested)
- - e.g. in MW or boxes/h
- commitment_downwards_deviation_price: penalty for downwards deviations of the flow
- - e.g. in EUR/MW or EUR/(boxes/h)
- - either a single value (same value for each flow value) or a Series (different value for each flow value)
- commitment_upwards_deviation_price: penalty for upwards deviations of the flow
- Separate costs for each commitment are stored in a dictionary under `model.commitment_costs` (indexed by commitment).
- All Series and DataFrames should have the same resolution.
- For now, we pass in the various constraints and prices as separate variables, from which we make a MultiIndex
- DataFrame. Later we could pass in a MultiIndex DataFrame directly.
- """
- model = ConcreteModel()
- # If the EMS has no devices, don't bother
- if len(device_constraints) == 0:
- return [], 0, SolverResults(), model
- # Get timing from first device
- start = device_constraints[0].index.to_pydatetime()[0]
- # Workaround for https://github.com/pandas-dev/pandas/issues/53643. Was: resolution = pd.to_timedelta(device_constraints[0].index.freq)
- resolution = pd.to_timedelta(device_constraints[0].index.freq).to_pytimedelta()
- end = device_constraints[0].index.to_pydatetime()[-1] + resolution
- # Move commitments from old structure to new
- if commitments is None:
- commitments = []
- else:
- commitments = [
- c.to_frame() if isinstance(c, Commitment) else c for c in commitments
- ]
- if commitment_quantities is not None:
- for quantity, down, up in zip(
- commitment_quantities,
- commitment_downwards_deviation_price,
- commitment_upwards_deviation_price,
- ):
- # Turn prices per commitment into prices per commitment flow
- if all(isinstance(price, float) for price in down) or isinstance(
- down, float
- ):
- down = initialize_series(down, start, end, resolution)
- if all(isinstance(price, float) for price in up) or isinstance(up, float):
- up = initialize_series(up, start, end, resolution)
- group = initialize_series(list(range(len(down))), start, end, resolution)
- df = initialize_df(
- ["quantity", "downwards deviation price", "upwards deviation price"],
- start,
- end,
- resolution,
- )
- df["quantity"] = quantity
- df["downwards deviation price"] = down
- df["upwards deviation price"] = up
- df["group"] = group
- commitments.append(df)
- # Check if commitments have the same time window and resolution as the constraints
- for commitment in commitments:
- start_c = commitment.index.to_pydatetime()[0]
- resolution_c = pd.to_timedelta(commitment.index.freq)
- end_c = commitment.index.to_pydatetime()[-1] + resolution
- if not (start_c == start and end_c == end):
- raise Exception(
- "Not implemented for different time windows.\n(%s,%s)\n(%s,%s)"
- % (start, end, start_c, end_c)
- )
- if resolution_c != resolution:
- raise Exception(
- "Not implemented for different resolutions.\n%s\n%s"
- % (resolution, resolution_c)
- )
- def convert_commitments_to_subcommitments(
- dfs: list[pd.DataFrame],
- ) -> tuple[list[pd.DataFrame], dict[int, int]]:
- """Transform commitments, each specifying a group for each time step, to sub-commitments, one per group.
- 'Groups' are a commitment concept (grouping time slots of a commitment),
- making it possible that deviations/breaches can be accounted for properly within this group
- (e.g. highest breach per calendar month defines the penalty).
- Here, we define sub-commitments, by separating commitments by group and by direction of deviation (up, down).
- We also enumerate the time steps in a new column "j".
- For example, given contracts A and B (represented by 2 DataFrames), each with 3 groups,
- we return (sub)commitments A1, A2, A3, B1, B2 and B3,
- where A,B,C is the enumerated contract and 1,2,3 is the enumerated group.
- """
- commitment_mapping = {}
- sub_commitments = []
- for c, df in enumerate(dfs):
- # Make sure each commitment has "device" (default NaN) and "class" (default FlowCommitment) columns
- if "device" not in df.columns:
- df["device"] = np.nan
- if "class" not in df.columns:
- df["class"] = FlowCommitment
- df["j"] = range(len(df.index))
- groups = list(df["group"].unique())
- for group in groups:
- sub_commitment = df[df["group"] == group].drop(columns=["group"])
- # Catch non-uniqueness
- if len(sub_commitment["upwards deviation price"].unique()) > 1:
- raise ValueError(
- "Commitment groups cannot have non-unique upwards deviation prices."
- )
- if len(sub_commitment["downwards deviation price"].unique()) > 1:
- raise ValueError(
- "Commitment groups cannot have non-unique downwards deviation prices."
- )
- if len(sub_commitment) == 1:
- commitment_mapping[len(sub_commitments)] = c
- sub_commitments.append(sub_commitment)
- else:
- down_commitment = sub_commitment.copy().drop(
- columns="upwards deviation price"
- )
- up_commitment = sub_commitment.copy().drop(
- columns="downwards deviation price"
- )
- commitment_mapping[len(sub_commitments)] = c
- commitment_mapping[len(sub_commitments) + 1] = c
- sub_commitments.extend([down_commitment, up_commitment])
- return sub_commitments, commitment_mapping
- commitments, commitment_mapping = convert_commitments_to_subcommitments(commitments)
- bigM_columns = ["derivative max", "derivative min", "derivative equals"]
- # Compute a good value for M
- M = np.nanmax([np.nanmax(d[bigM_columns].abs()) for d in device_constraints])
- # M has to be 1 MW, at least
- M = max(M, 1)
- for d in range(len(device_constraints)):
- if "stock delta" not in device_constraints[d].columns:
- device_constraints[d]["stock delta"] = 0
- else:
- device_constraints[d]["stock delta"] = (
- device_constraints[d]["stock delta"].astype(float).fillna(0)
- )
- # Add indices for devices (d), datetimes (j) and commitments (c)
- model.d = RangeSet(0, len(device_constraints) - 1, doc="Set of devices")
- model.j = RangeSet(
- 0, len(device_constraints[0].index.to_pydatetime()) - 1, doc="Set of datetimes"
- )
- model.c = RangeSet(0, len(commitments) - 1, doc="Set of commitments")
- # Add 2D indices for commitment datetimes (cj)
- def commitments_init(m):
- return ((c, j) for c in m.c for j in commitments[c]["j"])
- model.cj = Set(dimen=2, initialize=commitments_init)
- # Add parameters
- def price_down_select(m, c):
- if "downwards deviation price" not in commitments[c].columns:
- return 0
- price = commitments[c]["downwards deviation price"].iloc[0]
- if np.isnan(price):
- return 0
- return price
- def price_up_select(m, c):
- if "upwards deviation price" not in commitments[c].columns:
- return 0
- price = commitments[c]["upwards deviation price"].iloc[0]
- if np.isnan(price):
- return 0
- return price
- def commitment_quantity_select(m, c, j):
- quantity = commitments[c][commitments[c]["j"] == j]["quantity"].values[0]
- if np.isnan(quantity):
- return -infinity
- return quantity
- def device_max_select(m, d, j):
- min_v = device_constraints[d]["min"].iloc[j]
- max_v = device_constraints[d]["max"].iloc[j]
- equal_v = device_constraints[d]["equals"].iloc[j]
- if np.isnan(max_v) and np.isnan(equal_v):
- return infinity
- else:
- if not np.isnan(equal_v):
- # make min_v < equal_v
- equal_v = np.nanmax([equal_v, min_v])
- return np.nanmin([max_v, equal_v])
- def device_min_select(m, d, j):
- min_v = device_constraints[d]["min"].iloc[j]
- max_v = device_constraints[d]["max"].iloc[j]
- equal_v = device_constraints[d]["equals"].iloc[j]
- if np.isnan(min_v) and np.isnan(equal_v):
- return -infinity
- else:
- if not np.isnan(equal_v):
- # make equal_v <= max_v
- equal_v = np.nanmin([equal_v, max_v])
- return np.nanmax([min_v, equal_v])
- def device_derivative_max_select(m, d, j):
- max_v = device_constraints[d]["derivative max"].iloc[j]
- equal_v = device_constraints[d]["derivative equals"].iloc[j]
- if np.isnan(max_v) and np.isnan(equal_v):
- return infinity
- else:
- return np.nanmin([max_v, equal_v])
- def device_derivative_min_select(m, d, j):
- min_v = device_constraints[d]["derivative min"].iloc[j]
- equal_v = device_constraints[d]["derivative equals"].iloc[j]
- if np.isnan(min_v) and np.isnan(equal_v):
- return -infinity
- else:
- return np.nanmax([min_v, equal_v])
- def ems_derivative_max_select(m, j):
- v = ems_constraints["derivative max"].iloc[j]
- if np.isnan(v):
- return infinity
- else:
- return v
- def ems_derivative_min_select(m, j):
- v = ems_constraints["derivative min"].iloc[j]
- if np.isnan(v):
- return -infinity
- else:
- return v
- def device_efficiency(m, d, j):
- """Assume perfect efficiency if no efficiency information is available."""
- try:
- eff = device_constraints[d]["efficiency"].iloc[j]
- except KeyError:
- return 1
- if np.isnan(eff):
- return 1
- return eff
- def device_derivative_down_efficiency(m, d, j):
- """Assume perfect efficiency if no efficiency information is available."""
- try:
- eff = device_constraints[d]["derivative down efficiency"].iloc[j]
- except KeyError:
- return 1
- if np.isnan(eff):
- return 1
- return eff
- def device_derivative_up_efficiency(m, d, j):
- """Assume perfect efficiency if no efficiency information is available."""
- try:
- eff = device_constraints[d]["derivative up efficiency"].iloc[j]
- except KeyError:
- return 1
- if np.isnan(eff):
- return 1
- return eff
- def device_stock_delta(m, d, j):
- return device_constraints[d]["stock delta"].iloc[j]
- model.up_price = Param(model.c, initialize=price_up_select)
- model.down_price = Param(model.c, initialize=price_down_select)
- model.commitment_quantity = Param(
- model.cj, domain=Reals, initialize=commitment_quantity_select
- )
- model.device_max = Param(model.d, model.j, initialize=device_max_select)
- model.device_min = Param(model.d, model.j, initialize=device_min_select)
- model.device_derivative_max = Param(
- model.d, model.j, initialize=device_derivative_max_select
- )
- model.device_derivative_min = Param(
- model.d, model.j, initialize=device_derivative_min_select
- )
- model.ems_derivative_max = Param(model.j, initialize=ems_derivative_max_select)
- model.ems_derivative_min = Param(model.j, initialize=ems_derivative_min_select)
- model.device_efficiency = Param(model.d, model.j, initialize=device_efficiency)
- model.device_derivative_down_efficiency = Param(
- model.d, model.j, initialize=device_derivative_down_efficiency
- )
- model.device_derivative_up_efficiency = Param(
- model.d, model.j, initialize=device_derivative_up_efficiency
- )
- model.stock_delta = Param(model.d, model.j, initialize=device_stock_delta)
- # Add variables
- model.ems_power = Var(model.d, model.j, domain=Reals, initialize=0)
- model.device_power_down = Var(
- model.d, model.j, domain=NonPositiveReals, initialize=0
- )
- model.device_power_up = Var(model.d, model.j, domain=NonNegativeReals, initialize=0)
- model.device_power_sign = Var(model.d, model.j, domain=Binary, initialize=0)
- model.commitment_downwards_deviation = Var(
- model.c,
- domain=NonPositiveReals,
- initialize=0,
- # bounds=[-1000, None], # useful for debugging, to distinguish between infeasible and unbounded problems
- )
- model.commitment_upwards_deviation = Var(
- model.c,
- domain=NonNegativeReals,
- initialize=0,
- # bounds=[None, 1000],
- )
- def _get_stock_change(m, d, j):
- """Determine final stock change of device d until time j.
- Apply conversion efficiencies to conversion from flow to stock change and vice versa,
- and apply storage efficiencies to stock levels from one datetime to the next.
- """
- if isinstance(initial_stock, list):
- # No initial stock defined for inflexible device
- initial_stock_d = initial_stock[d] if d < len(initial_stock) else 0
- else:
- initial_stock_d = initial_stock
- stock_changes = [
- (
- m.device_power_down[d, k] / m.device_derivative_down_efficiency[d, k]
- + m.device_power_up[d, k] * m.device_derivative_up_efficiency[d, k]
- + m.stock_delta[d, k]
- )
- for k in range(0, j + 1)
- ]
- efficiencies = [m.device_efficiency[d, k] for k in range(0, j + 1)]
- final_stock_change = [
- stock - initial_stock_d
- for stock in apply_stock_changes_and_losses(
- initial_stock_d, stock_changes, efficiencies
- )
- ][-1]
- return final_stock_change
- # Add constraints as a tuple of (lower bound, value, upper bound)
- def device_bounds(m, d, j):
- """Constraints on the device's stock."""
- return (
- m.device_min[d, j],
- _get_stock_change(m, d, j),
- m.device_max[d, j],
- )
- def device_derivative_bounds(m, d, j):
- return (
- m.device_derivative_min[d, j],
- m.device_power_down[d, j] + m.device_power_up[d, j],
- m.device_derivative_max[d, j],
- )
- def device_down_derivative_bounds(m, d, j):
- """Strictly non-positive."""
- return (
- min(m.device_derivative_min[d, j], 0),
- m.device_power_down[d, j],
- 0,
- )
- def device_up_derivative_bounds(m, d, j):
- """Strictly non-negative."""
- return (
- 0,
- m.device_power_up[d, j],
- max(0, m.device_derivative_max[d, j]),
- )
- def device_up_derivative_sign(m, d, j):
- """Derivative up if sign points up, derivative not up if sign points down."""
- return m.device_power_up[d, j] <= M * m.device_power_sign[d, j]
- def device_down_derivative_sign(m, d, j):
- """Derivative down if sign points down, derivative not down if sign points up."""
- return -m.device_power_down[d, j] <= M * (1 - m.device_power_sign[d, j])
- def ems_derivative_bounds(m, j):
- return m.ems_derivative_min[j], sum(m.ems_power[:, j]), m.ems_derivative_max[j]
- def device_stock_commitment_equalities(m, c, j, d):
- """Couple device stocks to each commitment."""
- if (
- "device" not in commitments[c].columns
- or (commitments[c]["device"] != d).all()
- or m.commitment_quantity[c, j] == -infinity
- ):
- # Commitment c does not concern device d
- return Constraint.Skip
- # Determine center part of the lhs <= center part <= rhs constraint
- center_part = (
- m.commitment_quantity[c, j]
- + m.commitment_downwards_deviation[c]
- + m.commitment_upwards_deviation[c]
- )
- if commitments[c]["class"].apply(lambda cl: cl == StockCommitment).all():
- center_part -= _get_stock_change(m, d, j)
- elif commitments[c]["class"].apply(lambda cl: cl == FlowCommitment).all():
- center_part -= m.ems_power[d, j]
- else:
- raise NotImplementedError("Unknown commitment class")
- return (
- 0 if "upwards deviation price" in commitments[c].columns else None,
- center_part,
- 0 if "downwards deviation price" in commitments[c].columns else None,
- )
- def ems_flow_commitment_equalities(m, c, j):
- """Couple EMS flows (sum over devices) to each commitment.
- - Creates an inequality for one-sided commitments.
- - Creates an equality for two-sided commitments and for groups of size 1.
- """
- if (
- "device" in commitments[c].columns
- and not pd.isnull(commitments[c]["device"]).all()
- ) or m.commitment_quantity[c, j] == -infinity:
- # Commitment c does not concern EMS
- return Constraint.Skip
- if (
- "class" in commitments[c].columns
- and not (
- commitments[c]["class"].apply(lambda cl: cl == FlowCommitment)
- ).all()
- ):
- raise NotImplementedError(
- "StockCommitment on an EMS level has not been implemented. Please file a GitHub ticket explaining your use case."
- )
- return (
- (
- 0
- if len(commitments[c]) == 1
- or "upwards deviation price" in commitments[c].columns
- else None
- ),
- # 0 if "upwards deviation price" in commitments[c].columns else None, # todo: possible simplification
- m.commitment_quantity[c, j]
- + m.commitment_downwards_deviation[c]
- + m.commitment_upwards_deviation[c]
- - sum(m.ems_power[:, j]),
- (
- 0
- if len(commitments[c]) == 1
- or "downwards deviation price" in commitments[c].columns
- else None
- ),
- # 0 if "downwards deviation price" in commitments[c].columns else None, # todo: possible simplification
- )
- def device_derivative_equalities(m, d, j):
- """Couple device flows to EMS flows per device."""
- return (
- 0,
- m.device_power_up[d, j] + m.device_power_down[d, j] - m.ems_power[d, j],
- 0,
- )
- model.device_energy_bounds = Constraint(model.d, model.j, rule=device_bounds)
- model.device_power_bounds = Constraint(
- model.d, model.j, rule=device_derivative_bounds
- )
- model.device_power_down_bounds = Constraint(
- model.d, model.j, rule=device_down_derivative_bounds
- )
- model.device_power_up_bounds = Constraint(
- model.d, model.j, rule=device_up_derivative_bounds
- )
- model.device_power_up_sign = Constraint(
- model.d, model.j, rule=device_up_derivative_sign
- )
- model.device_power_down_sign = Constraint(
- model.d, model.j, rule=device_down_derivative_sign
- )
- model.ems_power_bounds = Constraint(model.j, rule=ems_derivative_bounds)
- model.ems_power_commitment_equalities = Constraint(
- model.cj, rule=ems_flow_commitment_equalities
- )
- model.device_energy_commitment_equalities = Constraint(
- model.cj, model.d, rule=device_stock_commitment_equalities
- )
- model.device_power_equalities = Constraint(
- model.d, model.j, rule=device_derivative_equalities
- )
- # Add objective
- def cost_function(m):
- costs = 0
- m.commitment_costs = {
- c: m.commitment_downwards_deviation[c] * m.down_price[c]
- + m.commitment_upwards_deviation[c] * m.up_price[c]
- for c in m.c
- }
- for c in m.c:
- costs += m.commitment_costs[c]
- return costs
- model.costs = Objective(rule=cost_function, sense=minimize)
- # Solve
- solver_name = current_app.config.get("FLEXMEASURES_LP_SOLVER")
- solver = SolverFactory(solver_name)
- # disable logs for the HiGHS solver in case that LOGGING_LEVEL is INFO
- if current_app.config["LOGGING_LEVEL"] == "INFO" and (
- "highs" in solver_name.lower()
- ):
- solver.options["output_flag"] = "false"
- # load_solutions=False to avoid a RuntimeError exception in appsi solvers when solving an infeasible problem.
- results = solver.solve(model, load_solutions=False)
- # load the results only if a feasible solution has been found
- if len(results.solution) > 0:
- model.solutions.load_from(results)
- planned_costs = value(model.costs)
- subcommitment_costs = {g: value(cost) for g, cost in model.commitment_costs.items()}
- commitment_costs = {}
- # Map subcommitment costs to commitments
- for g, v in subcommitment_costs.items():
- c = commitment_mapping[g]
- commitment_costs[c] = commitment_costs.get(c, 0) + v
- planned_power_per_device = []
- for d in model.d:
- planned_device_power = [model.ems_power[d, j].value for j in model.j]
- planned_power_per_device.append(
- initialize_series(
- data=planned_device_power,
- start=start,
- end=end,
- resolution=to_offset(resolution),
- )
- )
- model.commitment_costs = commitment_costs
- # model.pprint()
- # model.display()
- # print(results.solver.termination_condition)
- # print(planned_costs)
- return planned_power_per_device, planned_costs, results, model
|