calculations.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """ Various calculations """
  2. from __future__ import annotations
  3. from datetime import timedelta
  4. import math
  5. import numpy as np
  6. import pandas as pd
  7. def mean_absolute_error(y_true: np.ndarray, y_forecast: np.ndarray):
  8. y_true, y_forecast = drop_nan_rows(y_true, y_forecast)
  9. if y_true.size == 0 or y_forecast.size == 0:
  10. return np.nan
  11. else:
  12. return np.mean(np.abs((y_true - y_forecast)))
  13. def mean_absolute_percentage_error(y_true: np.ndarray, y_forecast: np.ndarray):
  14. y_true, y_forecast = drop_nan_rows(y_true, y_forecast)
  15. if y_true.size == 0 or y_forecast.size == 0 or 0 in y_true:
  16. return np.nan
  17. else:
  18. return np.mean(np.abs((y_true - y_forecast) / y_true))
  19. def weighted_absolute_percentage_error(y_true: np.ndarray, y_forecast: np.ndarray):
  20. y_true, y_forecast = drop_nan_rows(y_true, y_forecast)
  21. if y_true.size == 0 or y_forecast.size == 0 or sum(y_true) == 0:
  22. return np.nan
  23. else:
  24. return np.sum(np.abs((y_true - y_forecast))) / np.abs(np.sum(y_true))
  25. def drop_nan_rows(a, b):
  26. d = np.array(list(zip(a, b)))
  27. d = d[~np.any(np.isnan(d), axis=1)]
  28. return d[:, 0], d[:, 1]
  29. def apply_stock_changes_and_losses(
  30. initial: float,
  31. changes: list[float],
  32. storage_efficiency: float | list[float],
  33. how: str = "linear",
  34. decimal_precision: int | None = None,
  35. ) -> list[float]:
  36. r"""Assign stock changes and determine losses from storage efficiency.
  37. The initial stock is exponentially decayed, as with each consecutive (constant-resolution) time step,
  38. some constant percentage of the previous stock remains. For example:
  39. .. math::
  40. 100 \rightarrow 90 \rightarrow 81 \rightarrow 72.9 \rightarrow ...
  41. For computing the decay of the changes, we make an assumption on how a delta :math:`d` is distributed within a given time step.
  42. In case it happens at a constant rate, this leads to a linear stock change from one time step to the next.
  43. An :math:`e` is introduced when we apply exponential decay to that.
  44. To see that, imagine we cut one time step in :math:`n` pieces (each with a stock change :math:`\frac{d}{n}` ),
  45. apply the efficiency to each piece :math:`k` (for the corresponding fraction of the time step :math:`k/n`),
  46. and then take the limit :math:`n \rightarrow \infty`:
  47. .. math::
  48. \lim_{n \rightarrow \infty} \sum_{k=0}^{n}{\frac{d}{n} \eta^{k/n}}
  49. `which is <https://www.wolframalpha.com/input?i=Limit%5BSum%5B%5Ceta%5E%28k%2Fn%29%2Fn%2C+%7Bk%2C+0%2C+n%7D%5D%2C+n+-%3E+Infinity%5D&assumption=%22LimitHead%22+-%3E+%7B%22Discrete%22%7D>`_:
  50. .. math::
  51. d \cdot \frac{\eta - 1}{e^{\eta}}
  52. :param initial: initial stock
  53. :param changes: stock change for each step
  54. :param storage_efficiency: ratio of stock left after a step (constant ratio or one per step)
  55. :param how: left, right or linear; how stock changes should be applied, which affects how losses are applied
  56. :param decimal_precision: Optional decimal precision to round off results (useful for tests failing over machine precision)
  57. """
  58. stocks = [initial]
  59. if not isinstance(storage_efficiency, list):
  60. storage_efficiency = [storage_efficiency] * len(changes)
  61. for d, e in zip(changes, storage_efficiency):
  62. s = stocks[-1]
  63. if e == 1:
  64. next_stock = s + d
  65. elif how == "left":
  66. # First apply the stock change, then apply the losses (i.e. the stock changes on the left side of the time interval in which the losses apply)
  67. next_stock = (s + d) * e
  68. elif how == "right":
  69. # First apply the losses, then apply the stock change (i.e. the stock changes on the right side of the time interval in which the losses apply)
  70. next_stock = s * e + d
  71. elif how == "linear":
  72. # Assume the change happens at a constant rate, leading to a linear stock change, and exponential decay, within the current interval
  73. next_stock = s * e + d * (e - 1) / math.log(e)
  74. else:
  75. raise NotImplementedError(f"Missing implementation for how='{how}'.")
  76. stocks.append(next_stock)
  77. if decimal_precision is not None:
  78. stocks = [round(s, decimal_precision) for s in stocks]
  79. return stocks
  80. def integrate_time_series(
  81. series: pd.Series,
  82. initial_stock: float,
  83. stock_delta: float | pd.Series = 0,
  84. up_efficiency: float | pd.Series = 1,
  85. down_efficiency: float | pd.Series = 1,
  86. storage_efficiency: float | pd.Series = 1,
  87. decimal_precision: int | None = None,
  88. ) -> pd.Series:
  89. """Integrate time series of length n and inclusive="left" (representing a flow)
  90. to a time series of length n+1 and inclusive="both" (representing a stock),
  91. given an initial stock (i.e. the constant of integration).
  92. The unit of time is hours: i.e. the stock unit is flow unit times hours (e.g. a flow in kW becomes a stock in kWh).
  93. Optionally, set a decimal precision to round off the results (useful for tests failing over machine precision).
  94. >>> s = pd.Series([1, 2, 3, 4], index=pd.date_range("2001-01-01T05:00", "2001-01-01T06:00", freq=timedelta(minutes=15), inclusive="left"))
  95. >>> integrate_time_series(s, 10)
  96. 2001-01-01 05:00:00 10.00
  97. 2001-01-01 05:15:00 10.25
  98. 2001-01-01 05:30:00 10.75
  99. 2001-01-01 05:45:00 11.50
  100. 2001-01-01 06:00:00 12.50
  101. dtype: float64
  102. >>> s = pd.Series([1, 2, 3, 4], index=pd.date_range("2001-01-01T05:00", "2001-01-01T07:00", freq=timedelta(minutes=30), inclusive="left"))
  103. >>> integrate_time_series(s, 10)
  104. 2001-01-01 05:00:00 10.0
  105. 2001-01-01 05:30:00 10.5
  106. 2001-01-01 06:00:00 11.5
  107. 2001-01-01 06:30:00 13.0
  108. 2001-01-01 07:00:00 15.0
  109. dtype: float64
  110. """
  111. resolution = pd.to_timedelta(series.index.freq)
  112. storage_efficiency = (
  113. storage_efficiency
  114. if isinstance(storage_efficiency, pd.Series)
  115. else pd.Series(storage_efficiency, index=series.index)
  116. )
  117. # Convert from flow to stock change, applying conversion efficiencies
  118. stock_change = pd.Series(data=np.NaN, index=series.index)
  119. stock_change.loc[series > 0] = (
  120. series[series > 0]
  121. * (
  122. up_efficiency[series > 0]
  123. if isinstance(up_efficiency, pd.Series)
  124. else up_efficiency
  125. )
  126. * (resolution / timedelta(hours=1))
  127. )
  128. stock_change.loc[series <= 0] = (
  129. series[series <= 0]
  130. / (
  131. down_efficiency[series <= 0]
  132. if isinstance(down_efficiency, pd.Series)
  133. else down_efficiency
  134. )
  135. * (resolution / timedelta(hours=1))
  136. )
  137. stock_change += stock_delta
  138. stocks = apply_stock_changes_and_losses(
  139. initial_stock, stock_change.tolist(), storage_efficiency.tolist()
  140. )
  141. stocks = pd.concat(
  142. [
  143. pd.Series(initial_stock, index=pd.date_range(series.index[0], periods=1)),
  144. pd.Series(stocks[1:], index=series.index).shift(1, freq=resolution),
  145. ]
  146. )
  147. if decimal_precision is not None:
  148. stocks = stocks.round(decimal_precision)
  149. stocks = stocks.mask(stocks == -0.0, 0.0)
  150. return stocks