process.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. from __future__ import annotations
  2. from math import ceil
  3. from datetime import timedelta
  4. import pytz
  5. import pandas as pd
  6. from flexmeasures.data.models.planning import Scheduler
  7. from flexmeasures.data.queries.utils import simplify_index
  8. from flexmeasures.data.models.time_series import Sensor
  9. from flexmeasures.data.schemas.scheduling.process import (
  10. ProcessSchedulerFlexModelSchema,
  11. ProcessType,
  12. OptimizationDirection,
  13. )
  14. from flexmeasures.data.schemas.scheduling import FlexContextSchema
  15. class ProcessScheduler(Scheduler):
  16. __version__ = "1"
  17. __author__ = "Seita"
  18. def compute(self) -> pd.Series | None:
  19. """Schedule a process, defined as a `power` and a `duration`, within the specified time window.
  20. To schedule a battery, please, refer to the StorageScheduler.
  21. For example, this scheduler can plan the start of a process of type `SHIFTABLE` that lasts 5h and requires a power of 10kW.
  22. In that case, the scheduler will find the best (as to minimize/maximize the cost) hour to start the process.
  23. This scheduler supports three types of `process_types`:
  24. - INFLEXIBLE: this process needs to be scheduled as soon as possible.
  25. - BREAKABLE: this process can be broken up into smaller segments with some idle time in between.
  26. - SHIFTABLE: this process can start at any time within the specified time window.
  27. The resulting schedule provides the power flow at each time period.
  28. Parameters
  29. ==========
  30. consumption_price_sensor: it defines the utility (economic, environmental, ) in each
  31. time period. It has units of quantity/energy, for example, EUR/kWh.
  32. power: nominal power of the process.
  33. duration: time that the process last.
  34. optimization_direction: objective of the scheduler, to maximize or minimize.
  35. time_restrictions: time periods in which the process cannot be schedule to.
  36. process_type: INFLEXIBLE, BREAKABLE or SHIFTABLE.
  37. :returns: The computed schedule.
  38. """
  39. if not self.config_deserialized:
  40. self.deserialize_config()
  41. start = self.start.astimezone(pytz.utc)
  42. end = self.end.astimezone(pytz.utc)
  43. resolution = self.resolution
  44. belief_time = self.belief_time
  45. sensor = self.sensor
  46. consumption_price_sensor: Sensor = self.flex_context.get(
  47. "consumption_price_sensor", self.flex_context.get("consumption_price")
  48. )
  49. duration: timedelta = self.flex_model.get("duration")
  50. power = self.flex_model.get("power")
  51. optimization_direction = self.flex_model.get("optimization_direction")
  52. process_type: ProcessType = self.flex_model.get("process_type")
  53. time_restrictions = self.flex_model.get("time_restrictions")
  54. # get cost data
  55. cost = consumption_price_sensor.search_beliefs(
  56. event_starts_after=start,
  57. event_ends_before=end,
  58. resolution=resolution,
  59. one_deterministic_belief_per_event=True,
  60. beliefs_before=belief_time,
  61. )
  62. cost = simplify_index(cost)
  63. # create an empty schedule
  64. schedule = pd.Series(
  65. index=pd.date_range(
  66. start,
  67. end,
  68. freq=sensor.event_resolution,
  69. inclusive="left",
  70. name="event_start",
  71. ),
  72. data=0,
  73. name="event_value",
  74. )
  75. # we can fill duration/resolution rows or, if the duration is larger than the schedule
  76. # window, fill the entire window.
  77. rows_to_fill = min(ceil(duration / self.resolution), len(schedule))
  78. # duration of the process exceeds the scheduling window
  79. if rows_to_fill == len(schedule):
  80. if time_restrictions.sum() > 0:
  81. raise ValueError(
  82. "Cannot handle time restrictions if the duration of the process exceeds that of the schedule window."
  83. )
  84. schedule[:] = power
  85. if self.return_multiple:
  86. return [
  87. {
  88. "name": "process_schedule",
  89. "sensor": sensor,
  90. "data": schedule,
  91. }
  92. ]
  93. else:
  94. return schedule
  95. if process_type in [ProcessType.INFLEXIBLE, ProcessType.SHIFTABLE]:
  96. start_time_restrictions = (
  97. self.block_invalid_starting_times_for_whole_process_scheduling(
  98. process_type, time_restrictions, duration, rows_to_fill
  99. )
  100. )
  101. else: # ProcessType.BREAKABLE
  102. if (~time_restrictions).sum() < rows_to_fill:
  103. raise ValueError(
  104. "Cannot allocate a block of time {duration} given the time restrictions provided."
  105. )
  106. # create schedule
  107. if process_type == ProcessType.INFLEXIBLE:
  108. self.compute_inflexible(
  109. schedule, start_time_restrictions, rows_to_fill, power
  110. )
  111. elif process_type == ProcessType.BREAKABLE:
  112. self.compute_breakable(
  113. schedule,
  114. optimization_direction,
  115. time_restrictions,
  116. cost,
  117. rows_to_fill,
  118. power,
  119. )
  120. elif process_type == ProcessType.SHIFTABLE:
  121. self.compute_shiftable(
  122. schedule,
  123. optimization_direction,
  124. start_time_restrictions,
  125. cost,
  126. rows_to_fill,
  127. power,
  128. )
  129. else:
  130. raise ValueError(f"Unknown process type '{process_type}'")
  131. if self.return_multiple:
  132. return [
  133. {
  134. "name": "process_schedule",
  135. "sensor": sensor,
  136. "data": schedule.tz_convert(self.start.tzinfo),
  137. }
  138. ]
  139. else:
  140. return schedule.tz_convert(self.start.tzinfo)
  141. def block_invalid_starting_times_for_whole_process_scheduling(
  142. self,
  143. process_type: ProcessType,
  144. time_restrictions: pd.Series,
  145. duration: timedelta,
  146. rows_to_fill: int,
  147. ) -> pd.Series:
  148. """Blocks time periods where the process cannot be schedule into, making
  149. sure no other time restrictions runs in the middle of the activation of the process
  150. More technically, this function applying an erosion of the time_restrictions array with a block of length duration.
  151. Then, the condition if time_restrictions.sum() == len(time_restrictions):, makes sure that at least we have a spot to place the process.
  152. For example:
  153. time_restriction = [1 0 0 1 1 1 0 0 1 0]
  154. # applying a dilation with duration = 2
  155. time_restriction = [1 0 1 1 1 1 0 1 1 1]
  156. We can only fit a block of duration = 2 in the positions 1 and 6. sum(start_time_restrictions) == 8,
  157. while the len(time_restriction) == 10, which means we have 10-8=2 positions.
  158. :param process_type: INFLEXIBLE, SHIFTABLE or BREAKABLE
  159. :param time_restrictions: boolean time series indicating time periods in which the process cannot be scheduled.
  160. :param duration: (datetime) duration of the length
  161. :param rows_to_fill: (int) time periods that the process lasts
  162. :return: filtered time restrictions
  163. """
  164. # get start time instants that are not feasible, i.e. some time during the ON period goes through
  165. # a time restriction interval
  166. start_time_restrictions = (
  167. time_restrictions.rolling(duration).max().shift(-rows_to_fill + 1)
  168. )
  169. start_time_restrictions = (
  170. start_time_restrictions == 1
  171. ) | start_time_restrictions.isna()
  172. if (~start_time_restrictions).sum() == 0:
  173. raise ValueError(
  174. "Cannot allocate a block of time {duration} given the time restrictions provided."
  175. )
  176. return start_time_restrictions
  177. def compute_inflexible(
  178. self,
  179. schedule: pd.Series,
  180. time_restrictions: pd.Series,
  181. rows_to_fill: int,
  182. power: float,
  183. ) -> None:
  184. """Schedule process as early as possible."""
  185. start = time_restrictions[~time_restrictions].index[0]
  186. schedule.loc[start : start + self.resolution * (rows_to_fill - 1)] = power
  187. def compute_breakable(
  188. self,
  189. schedule: pd.Series,
  190. optimization_direction: OptimizationDirection,
  191. time_restrictions: pd.Series,
  192. cost: pd.DataFrame,
  193. rows_to_fill: int,
  194. power: float,
  195. ) -> None:
  196. """Break up schedule and divide it over the time slots with the largest utility (max/min cost depending on optimization_direction)."""
  197. cost = cost[~time_restrictions].reset_index()
  198. if optimization_direction == OptimizationDirection.MIN:
  199. cost_ranking = cost.sort_values(
  200. by=["event_value", "event_start"], ascending=[True, True]
  201. )
  202. else:
  203. cost_ranking = cost.sort_values(
  204. by=["event_value", "event_start"], ascending=[False, True]
  205. )
  206. schedule.loc[cost_ranking.head(rows_to_fill).event_start] = power
  207. def compute_shiftable(
  208. self,
  209. schedule: pd.Series,
  210. optimization_direction: OptimizationDirection,
  211. time_restrictions: pd.Series,
  212. cost: pd.DataFrame,
  213. rows_to_fill: int,
  214. power: float,
  215. ) -> None:
  216. """Schedules a block of consumption/production of `rows_to_fill` periods to maximize a utility."""
  217. block_cost = simplify_index(
  218. cost.rolling(rows_to_fill).sum().shift(-rows_to_fill + 1)
  219. )
  220. if optimization_direction == OptimizationDirection.MIN:
  221. start = block_cost[~time_restrictions].idxmin()
  222. else:
  223. start = block_cost[~time_restrictions].idxmax()
  224. start = start.iloc[0]
  225. schedule.loc[start : start + self.resolution * (rows_to_fill - 1)] = power
  226. def deserialize_flex_config(self):
  227. """Deserialize flex_model using the schema ProcessSchedulerFlexModelSchema and
  228. flex_context using FlexContextSchema
  229. """
  230. if self.flex_model is None:
  231. self.flex_model = {}
  232. self.flex_model = ProcessSchedulerFlexModelSchema(
  233. start=self.start, end=self.end, sensor=self.sensor
  234. ).load(self.flex_model)
  235. self.flex_context = FlexContextSchema().load(self.flex_context)