unit_utils.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. """Utility module for unit conversion
  2. FlexMeasures stores units as strings in short scientific notation (such as 'kWh' to denote kilowatt-hour).
  3. We use the pint library to convert data between compatible units (such as 'm/s' to 'km/h').
  4. Three-letter currency codes (such as 'KRW' to denote South Korean Won) are valid units.
  5. Note that converting between currencies requires setting up a sensor that registers conversion rates over time.
  6. The preferred compact form for combinations of units can be derived automatically (such as 'kW*EUR/MWh' to 'EUR/h').
  7. Time series with fixed resolution can be converted from units of flow to units of stock (such as 'kW' to 'kWh'), and vice versa.
  8. Percentages can be converted to units of some physical capacity if a capacity is known (such as '%' to 'kWh').
  9. """
  10. from __future__ import annotations
  11. from datetime import timedelta
  12. from moneyed import list_all_currencies, Currency
  13. import numpy as np
  14. import pandas as pd
  15. import pint
  16. import timely_beliefs as tb
  17. # Create custom template
  18. custom_template = [f"{c} = [currency_{c}]" for c in list_all_currencies()]
  19. # Set up UnitRegistry with abbreviated scientific format
  20. ur = pint.UnitRegistry(
  21. # non_int_type=decimal.Decimal, # todo: switch to decimal unit registry, after https://github.com/hgrecco/pint/issues/1505
  22. preprocessors=[
  23. lambda s: s.replace("%", " percent "),
  24. lambda s: s.replace("‰", " permille "),
  25. ],
  26. )
  27. ur.load_definitions(custom_template)
  28. ur.default_format = "~P" # short pretty
  29. ur.define("percent = 1 / 100 = %")
  30. ur.define("permille = 1 / 1000 = ‰")
  31. PREFERRED_UNITS = [
  32. "m",
  33. "h",
  34. "kg",
  35. "m/h",
  36. "W",
  37. "N",
  38. "Wh",
  39. "m**2",
  40. "m**3",
  41. "V",
  42. "A",
  43. "dimensionless",
  44. ] + [
  45. str(c) for c in list_all_currencies()
  46. ] # todo: move to config setting, with these as a default (NB prefixes do not matter here, this is about SI base units, so km/h is equivalent to m/h)
  47. PREFERRED_UNITS_DICT = dict(
  48. [(ur.parse_expression(x).dimensionality, x) for x in PREFERRED_UNITS]
  49. )
  50. def to_preferred(x: pint.Quantity) -> pint.Quantity:
  51. """From https://github.com/hgrecco/pint/issues/676#issuecomment-689157693"""
  52. dim = x.dimensionality
  53. if dim in PREFERRED_UNITS_DICT:
  54. compact_unit = x.to(PREFERRED_UNITS_DICT[dim]).to_compact()
  55. # todo: switch to decimal unit registry and then swap out the if statements below
  56. # if len(f"{compact_unit.magnitude}" + "{:~P}".format(compact_unit.units)) < len(
  57. # f"{x.magnitude}" + "{:~P}".format(x.units)
  58. # ):
  59. # return compact_unit
  60. if len("{:~P}".format(compact_unit.units)) < len("{:~P}".format(x.units)):
  61. return compact_unit
  62. return x
  63. def is_valid_unit(unit: str) -> bool:
  64. """Return True if the pint library can work with this unit identifier."""
  65. try:
  66. ur.Quantity(unit)
  67. except Exception: # noqa B902
  68. # in practice, we encountered pint.errors.UndefinedUnitError, ValueError and AttributeError,
  69. # but since there may be more, here we simply catch them all
  70. return False
  71. return True
  72. def determine_unit_conversion_multiplier(
  73. from_unit: str, to_unit: str, duration: timedelta | None = None
  74. ):
  75. """Determine the value multiplier for a given unit conversion.
  76. If needed, requires a duration to convert from units of stock change to units of flow, or vice versa.
  77. """
  78. scalar = ur.Quantity(from_unit) / ur.Quantity(to_unit)
  79. if scalar.dimensionality == ur.Quantity("h").dimensionality:
  80. # Convert a stock change to a flow
  81. if duration is None:
  82. raise ValueError(
  83. f"Cannot convert units from {from_unit} to {to_unit} without known duration."
  84. )
  85. return scalar.to_timedelta() / duration
  86. elif scalar.dimensionality == ur.Quantity("1/h").dimensionality:
  87. # Convert a flow to a stock change
  88. if duration is None:
  89. raise ValueError(
  90. f"Cannot convert units from {from_unit} to {to_unit} without known duration."
  91. )
  92. return duration / (1 / scalar).to_timedelta()
  93. elif scalar.dimensionality != ur.Quantity("dimensionless").dimensionality:
  94. raise ValueError(
  95. f"Unit conversion from {from_unit} to {to_unit} doesn't seem possible."
  96. )
  97. return scalar.to_reduced_units().magnitude
  98. def determine_flow_unit(stock_unit: str, time_unit: str = "h"):
  99. """For example:
  100. >>> determine_flow_unit("m³")
  101. 'm³/h'
  102. >>> determine_flow_unit("kWh")
  103. 'kW'
  104. """
  105. flow = to_preferred(ur.Quantity(stock_unit) / ur.Quantity(time_unit))
  106. return "{:~P}".format(flow.units)
  107. def determine_stock_unit(flow_unit: str, time_unit: str = "h"):
  108. """Determine the shortest unit of stock, given a unit of flow.
  109. For example:
  110. >>> determine_stock_unit("m³/h")
  111. 'm³'
  112. >>> determine_stock_unit("kW")
  113. 'kWh'
  114. """
  115. stock = to_preferred(ur.Quantity(flow_unit) * ur.Quantity(time_unit))
  116. return "{:~P}".format(stock.units)
  117. def units_are_convertible(
  118. from_unit: str, to_unit: str, duration_known: bool = True
  119. ) -> bool:
  120. """For example, a sensor with W units allows data to be posted with units:
  121. >>> units_are_convertible("kW", "W") # units just have different prefixes
  122. True
  123. >>> units_are_convertible("J/s", "W") # units can be converted using some multiplier
  124. True
  125. >>> units_are_convertible("Wh", "W") # units that represent a stock delta can, knowing the duration, be converted to a flow
  126. True
  127. >>> units_are_convertible("°C", "W")
  128. False
  129. """
  130. if not is_valid_unit(from_unit) or not is_valid_unit(to_unit):
  131. return False
  132. scalar = (
  133. ur.Quantity(from_unit).to_base_units() / ur.Quantity(to_unit).to_base_units()
  134. )
  135. if duration_known:
  136. return scalar.dimensionality in (
  137. ur.Quantity("h").dimensionality,
  138. ur.Quantity("dimensionless").dimensionality,
  139. )
  140. return scalar.dimensionality == ur.Quantity("dimensionless").dimensionality
  141. def is_power_unit(unit: str) -> bool:
  142. """For example:
  143. >>> is_power_unit("kW")
  144. True
  145. >>> is_power_unit("°C")
  146. False
  147. >>> is_power_unit("kWh")
  148. False
  149. >>> is_power_unit("EUR/MWh")
  150. False
  151. """
  152. if not is_valid_unit(unit):
  153. return False
  154. return ur.Quantity(unit).dimensionality == ur.Quantity("W").dimensionality
  155. def is_energy_unit(unit: str) -> bool:
  156. """For example:
  157. >>> is_energy_unit("kW")
  158. False
  159. >>> is_energy_unit("°C")
  160. False
  161. >>> is_energy_unit("kWh")
  162. True
  163. >>> is_energy_unit("EUR/MWh")
  164. False
  165. """
  166. if not is_valid_unit(unit):
  167. return False
  168. return ur.Quantity(unit).dimensionality == ur.Quantity("Wh").dimensionality
  169. def is_currency_unit(unit: str | pint.Quantity | pint.Unit) -> bool:
  170. """For Example:
  171. >>> is_currency_unit("EUR")
  172. True
  173. >>> is_currency_unit("KRW")
  174. True
  175. >>> is_currency_unit("potatoe")
  176. False
  177. >>> is_currency_unit("MW")
  178. False
  179. """
  180. if isinstance(unit, pint.Quantity):
  181. return is_currency_unit(unit.units)
  182. if isinstance(unit, pint.Unit):
  183. return is_currency_unit(str(unit))
  184. return Currency(code=unit) in list_all_currencies()
  185. def is_price_unit(unit: str) -> bool:
  186. """For example:
  187. >>> is_price_unit("EUR/MWh")
  188. True
  189. >>> is_price_unit("KRW/MWh")
  190. True
  191. >>> is_price_unit("KRW/MW")
  192. True
  193. >>> is_price_unit("beans/MW")
  194. False
  195. """
  196. if (
  197. unit[:3] in [str(c) for c in list_all_currencies()]
  198. and len(unit) > 3
  199. and unit[3] == "/"
  200. ):
  201. return True
  202. return False
  203. def is_energy_price_unit(unit: str) -> bool:
  204. """For example:
  205. >>> is_energy_price_unit("EUR/MWh")
  206. True
  207. >>> is_energy_price_unit("KRW/MWh")
  208. True
  209. >>> is_energy_price_unit("KRW/MW")
  210. False
  211. >>> is_energy_price_unit("beans/MW")
  212. False
  213. """
  214. if is_price_unit(unit) and is_energy_unit(unit[4:]):
  215. return True
  216. return False
  217. def is_capacity_price_unit(unit: str) -> bool:
  218. """For example:
  219. >>> is_capacity_price_unit("EUR/MW")
  220. True
  221. >>> is_capacity_price_unit("KRW/MW")
  222. True
  223. >>> is_capacity_price_unit("KRW/MWh")
  224. False
  225. >>> is_capacity_price_unit("beans/MWh")
  226. False
  227. """
  228. if is_price_unit(unit) and is_power_unit(unit[4:]):
  229. return True
  230. return False
  231. def is_speed_unit(unit: str) -> bool:
  232. """For example:
  233. >>> is_speed_unit("m/s")
  234. True
  235. >>> is_speed_unit("km/h")
  236. True
  237. >>> is_speed_unit("W")
  238. False
  239. >>> is_speed_unit("m/s²")
  240. False
  241. """
  242. if not is_valid_unit(unit):
  243. return False
  244. return ur.Quantity(unit).dimensionality == ur.Quantity("m/s").dimensionality
  245. def get_unit_dimension(unit: str) -> str:
  246. """For example:
  247. >>> get_unit_dimension("kW")
  248. 'power'
  249. >>> get_unit_dimension("kWh")
  250. 'energy'
  251. >>> get_unit_dimension("EUR/MWh")
  252. 'energy price'
  253. >>> get_unit_dimension("EUR/kW") # e.g. a capacity price or a peak price
  254. 'price'
  255. >>> get_unit_dimension("AUD")
  256. 'currency'
  257. >>> get_unit_dimension("%")
  258. 'percentage'
  259. >>> get_unit_dimension("°C")
  260. 'temperature'
  261. >>> get_unit_dimension("m")
  262. 'length'
  263. >>> get_unit_dimension("h")
  264. 'time'
  265. >>> get_unit_dimension("m/s")
  266. 'speed'
  267. """
  268. if is_power_unit(unit):
  269. return "power"
  270. if is_energy_unit(unit):
  271. return "energy"
  272. if is_energy_price_unit(unit):
  273. return "energy price"
  274. if is_price_unit(unit):
  275. return "price"
  276. if is_currency_unit(unit):
  277. return "currency"
  278. if is_speed_unit(unit):
  279. return "speed"
  280. if unit == "%":
  281. return "percentage"
  282. dimensions = ur.Quantity(unit).dimensionality
  283. if len(dimensions) == 1:
  284. return list(dimensions.keys())[0][1:-1]
  285. return "value"
  286. def _convert_time_units(
  287. data: tb.BeliefsSeries | pd.Series | list[int | float] | int | float,
  288. from_unit: str,
  289. to_unit: str,
  290. ):
  291. """Convert data with datetime or timedelta dtypes to float values.
  292. Use Unix epoch or the requested time unit, respectively.
  293. """
  294. if not to_unit[0].isdigit():
  295. # unit abbreviations passed to pd.Timedelta need a number (so, for example, h becomes 1h)
  296. to_unit = f"1{to_unit}"
  297. if "datetime" in from_unit:
  298. dt_data = pd.to_datetime(
  299. data, dayfirst=True if "dayfirst" in from_unit else False
  300. )
  301. # localize timezone naive data to the sensor's timezone, if available
  302. if dt_data.dt.tz is None:
  303. timezone = data.sensor.timezone if hasattr(data, "sensor") else "utc"
  304. dt_data = dt_data.dt.tz_localize(timezone)
  305. return (dt_data - pd.Timestamp("1970-01-01", tz="utc")) // pd.Timedelta(to_unit)
  306. else:
  307. return data / pd.Timedelta(to_unit)
  308. def convert_units(
  309. data: tb.BeliefsSeries | pd.Series | list[int | float] | int | float,
  310. from_unit: str,
  311. to_unit: str,
  312. event_resolution: timedelta | None = None,
  313. capacity: str | None = None,
  314. ) -> pd.Series | list[int | float] | int | float:
  315. """Updates data values to reflect the given unit conversion.
  316. Handles units in short scientific notation (e.g. m³/h, kW, and ºC), as well as three special units to convert from:
  317. - from_unit="datetime" (with data point such as "2023-05-02", "2023-05-02 05:14:49" or "2023-05-02 05:14:49 +02:00")
  318. - from_unit="dayfirst datetime" (with data point such as "02-05-2023")
  319. - from_unit="timedelta" (with data point such as "0 days 01:18:25")
  320. """
  321. if from_unit in ("datetime", "dayfirst datetime", "timedelta"):
  322. return _convert_time_units(data, from_unit, to_unit)
  323. if from_unit != to_unit:
  324. from_magnitudes = (
  325. data.to_numpy()
  326. if isinstance(data, pd.Series)
  327. else np.asarray(data) if isinstance(data, list) else np.array([data])
  328. )
  329. try:
  330. from_quantities = ur.Quantity(from_magnitudes, from_unit)
  331. except ValueError as e:
  332. # Catch units like "-W" and "100km"
  333. if str(e) == "Unit expression cannot have a scaling factor.":
  334. from_quantities = ur.Quantity(from_unit) * from_magnitudes
  335. else:
  336. raise e # reraise
  337. try:
  338. to_magnitudes = from_quantities.to(ur.Quantity(to_unit)).magnitude
  339. except pint.errors.DimensionalityError as e:
  340. # Catch multiplicative conversions that rely on a capacity, like "%" to "kWh" and vice versa
  341. if "from 'percent'" in str(e):
  342. to_magnitudes = (
  343. (from_quantities * ur.Quantity(capacity))
  344. .to(ur.Quantity(to_unit))
  345. .magnitude
  346. )
  347. elif "to 'percent'" in str(e):
  348. to_magnitudes = (
  349. (from_quantities / ur.Quantity(capacity))
  350. .to(ur.Quantity(to_unit))
  351. .magnitude
  352. )
  353. else:
  354. # Catch multiplicative conversions that use the resolution, like "kWh/15min" to "kW"
  355. if event_resolution is None and isinstance(data, tb.BeliefsSeries):
  356. event_resolution = data.event_resolution
  357. multiplier = determine_unit_conversion_multiplier(
  358. from_unit, to_unit, event_resolution
  359. )
  360. to_magnitudes = from_magnitudes * multiplier
  361. # Output type should match input type
  362. if isinstance(data, pd.Series):
  363. # Pandas Series
  364. data = pd.Series(
  365. to_magnitudes,
  366. index=data.index,
  367. name=data.name,
  368. )
  369. elif isinstance(data, list):
  370. # list
  371. data = list(to_magnitudes)
  372. else:
  373. # int or float
  374. data = to_magnitudes[0]
  375. return data