123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- from __future__ import annotations
- from math import ceil
- from datetime import timedelta
- import pytz
- import pandas as pd
- from flexmeasures.data.models.planning import Scheduler
- from flexmeasures.data.queries.utils import simplify_index
- from flexmeasures.data.models.time_series import Sensor
- from flexmeasures.data.schemas.scheduling.process import (
- ProcessSchedulerFlexModelSchema,
- ProcessType,
- OptimizationDirection,
- )
- from flexmeasures.data.schemas.scheduling import FlexContextSchema
- class ProcessScheduler(Scheduler):
- __version__ = "1"
- __author__ = "Seita"
- def compute(self) -> pd.Series | None:
- """Schedule a process, defined as a `power` and a `duration`, within the specified time window.
- To schedule a battery, please, refer to the StorageScheduler.
- For example, this scheduler can plan the start of a process of type `SHIFTABLE` that lasts 5h and requires a power of 10kW.
- In that case, the scheduler will find the best (as to minimize/maximize the cost) hour to start the process.
- This scheduler supports three types of `process_types`:
- - INFLEXIBLE: this process needs to be scheduled as soon as possible.
- - BREAKABLE: this process can be broken up into smaller segments with some idle time in between.
- - SHIFTABLE: this process can start at any time within the specified time window.
- The resulting schedule provides the power flow at each time period.
- Parameters
- ==========
- consumption_price_sensor: it defines the utility (economic, environmental, ) in each
- time period. It has units of quantity/energy, for example, EUR/kWh.
- power: nominal power of the process.
- duration: time that the process last.
- optimization_direction: objective of the scheduler, to maximize or minimize.
- time_restrictions: time periods in which the process cannot be schedule to.
- process_type: INFLEXIBLE, BREAKABLE or SHIFTABLE.
- :returns: The computed schedule.
- """
- if not self.config_deserialized:
- self.deserialize_config()
- start = self.start.astimezone(pytz.utc)
- end = self.end.astimezone(pytz.utc)
- resolution = self.resolution
- belief_time = self.belief_time
- sensor = self.sensor
- consumption_price_sensor: Sensor = self.flex_context.get(
- "consumption_price_sensor", self.flex_context.get("consumption_price")
- )
- duration: timedelta = self.flex_model.get("duration")
- power = self.flex_model.get("power")
- optimization_direction = self.flex_model.get("optimization_direction")
- process_type: ProcessType = self.flex_model.get("process_type")
- time_restrictions = self.flex_model.get("time_restrictions")
- # get cost data
- cost = consumption_price_sensor.search_beliefs(
- event_starts_after=start,
- event_ends_before=end,
- resolution=resolution,
- one_deterministic_belief_per_event=True,
- beliefs_before=belief_time,
- )
- cost = simplify_index(cost)
- # create an empty schedule
- schedule = pd.Series(
- index=pd.date_range(
- start,
- end,
- freq=sensor.event_resolution,
- inclusive="left",
- name="event_start",
- ),
- data=0,
- name="event_value",
- )
- # we can fill duration/resolution rows or, if the duration is larger than the schedule
- # window, fill the entire window.
- rows_to_fill = min(ceil(duration / self.resolution), len(schedule))
- # duration of the process exceeds the scheduling window
- if rows_to_fill == len(schedule):
- if time_restrictions.sum() > 0:
- raise ValueError(
- "Cannot handle time restrictions if the duration of the process exceeds that of the schedule window."
- )
- schedule[:] = power
- if self.return_multiple:
- return [
- {
- "name": "process_schedule",
- "sensor": sensor,
- "data": schedule,
- }
- ]
- else:
- return schedule
- if process_type in [ProcessType.INFLEXIBLE, ProcessType.SHIFTABLE]:
- start_time_restrictions = (
- self.block_invalid_starting_times_for_whole_process_scheduling(
- process_type, time_restrictions, duration, rows_to_fill
- )
- )
- else: # ProcessType.BREAKABLE
- if (~time_restrictions).sum() < rows_to_fill:
- raise ValueError(
- "Cannot allocate a block of time {duration} given the time restrictions provided."
- )
- # create schedule
- if process_type == ProcessType.INFLEXIBLE:
- self.compute_inflexible(
- schedule, start_time_restrictions, rows_to_fill, power
- )
- elif process_type == ProcessType.BREAKABLE:
- self.compute_breakable(
- schedule,
- optimization_direction,
- time_restrictions,
- cost,
- rows_to_fill,
- power,
- )
- elif process_type == ProcessType.SHIFTABLE:
- self.compute_shiftable(
- schedule,
- optimization_direction,
- start_time_restrictions,
- cost,
- rows_to_fill,
- power,
- )
- else:
- raise ValueError(f"Unknown process type '{process_type}'")
- if self.return_multiple:
- return [
- {
- "name": "process_schedule",
- "sensor": sensor,
- "data": schedule.tz_convert(self.start.tzinfo),
- }
- ]
- else:
- return schedule.tz_convert(self.start.tzinfo)
- def block_invalid_starting_times_for_whole_process_scheduling(
- self,
- process_type: ProcessType,
- time_restrictions: pd.Series,
- duration: timedelta,
- rows_to_fill: int,
- ) -> pd.Series:
- """Blocks time periods where the process cannot be schedule into, making
- sure no other time restrictions runs in the middle of the activation of the process
- More technically, this function applying an erosion of the time_restrictions array with a block of length duration.
- Then, the condition if time_restrictions.sum() == len(time_restrictions):, makes sure that at least we have a spot to place the process.
- For example:
- time_restriction = [1 0 0 1 1 1 0 0 1 0]
- # applying a dilation with duration = 2
- time_restriction = [1 0 1 1 1 1 0 1 1 1]
- We can only fit a block of duration = 2 in the positions 1 and 6. sum(start_time_restrictions) == 8,
- while the len(time_restriction) == 10, which means we have 10-8=2 positions.
- :param process_type: INFLEXIBLE, SHIFTABLE or BREAKABLE
- :param time_restrictions: boolean time series indicating time periods in which the process cannot be scheduled.
- :param duration: (datetime) duration of the length
- :param rows_to_fill: (int) time periods that the process lasts
- :return: filtered time restrictions
- """
- # get start time instants that are not feasible, i.e. some time during the ON period goes through
- # a time restriction interval
- start_time_restrictions = (
- time_restrictions.rolling(duration).max().shift(-rows_to_fill + 1)
- )
- start_time_restrictions = (
- start_time_restrictions == 1
- ) | start_time_restrictions.isna()
- if (~start_time_restrictions).sum() == 0:
- raise ValueError(
- "Cannot allocate a block of time {duration} given the time restrictions provided."
- )
- return start_time_restrictions
- def compute_inflexible(
- self,
- schedule: pd.Series,
- time_restrictions: pd.Series,
- rows_to_fill: int,
- power: float,
- ) -> None:
- """Schedule process as early as possible."""
- start = time_restrictions[~time_restrictions].index[0]
- schedule.loc[start : start + self.resolution * (rows_to_fill - 1)] = power
- def compute_breakable(
- self,
- schedule: pd.Series,
- optimization_direction: OptimizationDirection,
- time_restrictions: pd.Series,
- cost: pd.DataFrame,
- rows_to_fill: int,
- power: float,
- ) -> None:
- """Break up schedule and divide it over the time slots with the largest utility (max/min cost depending on optimization_direction)."""
- cost = cost[~time_restrictions].reset_index()
- if optimization_direction == OptimizationDirection.MIN:
- cost_ranking = cost.sort_values(
- by=["event_value", "event_start"], ascending=[True, True]
- )
- else:
- cost_ranking = cost.sort_values(
- by=["event_value", "event_start"], ascending=[False, True]
- )
- schedule.loc[cost_ranking.head(rows_to_fill).event_start] = power
- def compute_shiftable(
- self,
- schedule: pd.Series,
- optimization_direction: OptimizationDirection,
- time_restrictions: pd.Series,
- cost: pd.DataFrame,
- rows_to_fill: int,
- power: float,
- ) -> None:
- """Schedules a block of consumption/production of `rows_to_fill` periods to maximize a utility."""
- block_cost = simplify_index(
- cost.rolling(rows_to_fill).sum().shift(-rows_to_fill + 1)
- )
- if optimization_direction == OptimizationDirection.MIN:
- start = block_cost[~time_restrictions].idxmin()
- else:
- start = block_cost[~time_restrictions].idxmax()
- start = start.iloc[0]
- schedule.loc[start : start + self.resolution * (rows_to_fill - 1)] = power
- def deserialize_flex_config(self):
- """Deserialize flex_model using the schema ProcessSchedulerFlexModelSchema and
- flex_context using FlexContextSchema
- """
- if self.flex_model is None:
- self.flex_model = {}
- self.flex_model = ProcessSchedulerFlexModelSchema(
- start=self.start, end=self.end, sensor=self.sensor
- ).load(self.flex_model)
- self.flex_context = FlexContextSchema().load(self.flex_context)
|