123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- """Utility module for unit conversion
- FlexMeasures stores units as strings in short scientific notation (such as 'kWh' to denote kilowatt-hour).
- We use the pint library to convert data between compatible units (such as 'm/s' to 'km/h').
- Three-letter currency codes (such as 'KRW' to denote South Korean Won) are valid units.
- Note that converting between currencies requires setting up a sensor that registers conversion rates over time.
- The preferred compact form for combinations of units can be derived automatically (such as 'kW*EUR/MWh' to 'EUR/h').
- Time series with fixed resolution can be converted from units of flow to units of stock (such as 'kW' to 'kWh'), and vice versa.
- Percentages can be converted to units of some physical capacity if a capacity is known (such as '%' to 'kWh').
- """
- from __future__ import annotations
- from datetime import timedelta
- from moneyed import list_all_currencies, Currency
- import numpy as np
- import pandas as pd
- import pint
- import timely_beliefs as tb
- # Create custom template
- custom_template = [f"{c} = [currency_{c}]" for c in list_all_currencies()]
- # Set up UnitRegistry with abbreviated scientific format
- ur = pint.UnitRegistry(
- # non_int_type=decimal.Decimal, # todo: switch to decimal unit registry, after https://github.com/hgrecco/pint/issues/1505
- preprocessors=[
- lambda s: s.replace("%", " percent "),
- lambda s: s.replace("‰", " permille "),
- ],
- )
- ur.load_definitions(custom_template)
- ur.default_format = "~P" # short pretty
- ur.define("percent = 1 / 100 = %")
- ur.define("permille = 1 / 1000 = ‰")
- PREFERRED_UNITS = [
- "m",
- "h",
- "kg",
- "m/h",
- "W",
- "N",
- "Wh",
- "m**2",
- "m**3",
- "V",
- "A",
- "dimensionless",
- ] + [
- str(c) for c in list_all_currencies()
- ] # todo: move to config setting, with these as a default (NB prefixes do not matter here, this is about SI base units, so km/h is equivalent to m/h)
- PREFERRED_UNITS_DICT = dict(
- [(ur.parse_expression(x).dimensionality, x) for x in PREFERRED_UNITS]
- )
- def to_preferred(x: pint.Quantity) -> pint.Quantity:
- """From https://github.com/hgrecco/pint/issues/676#issuecomment-689157693"""
- dim = x.dimensionality
- if dim in PREFERRED_UNITS_DICT:
- compact_unit = x.to(PREFERRED_UNITS_DICT[dim]).to_compact()
- # todo: switch to decimal unit registry and then swap out the if statements below
- # if len(f"{compact_unit.magnitude}" + "{:~P}".format(compact_unit.units)) < len(
- # f"{x.magnitude}" + "{:~P}".format(x.units)
- # ):
- # return compact_unit
- if len("{:~P}".format(compact_unit.units)) < len("{:~P}".format(x.units)):
- return compact_unit
- return x
- def is_valid_unit(unit: str) -> bool:
- """Return True if the pint library can work with this unit identifier."""
- try:
- ur.Quantity(unit)
- except Exception: # noqa B902
- # in practice, we encountered pint.errors.UndefinedUnitError, ValueError and AttributeError,
- # but since there may be more, here we simply catch them all
- return False
- return True
- def determine_unit_conversion_multiplier(
- from_unit: str, to_unit: str, duration: timedelta | None = None
- ):
- """Determine the value multiplier for a given unit conversion.
- If needed, requires a duration to convert from units of stock change to units of flow, or vice versa.
- """
- scalar = ur.Quantity(from_unit) / ur.Quantity(to_unit)
- if scalar.dimensionality == ur.Quantity("h").dimensionality:
- # Convert a stock change to a flow
- if duration is None:
- raise ValueError(
- f"Cannot convert units from {from_unit} to {to_unit} without known duration."
- )
- return scalar.to_timedelta() / duration
- elif scalar.dimensionality == ur.Quantity("1/h").dimensionality:
- # Convert a flow to a stock change
- if duration is None:
- raise ValueError(
- f"Cannot convert units from {from_unit} to {to_unit} without known duration."
- )
- return duration / (1 / scalar).to_timedelta()
- elif scalar.dimensionality != ur.Quantity("dimensionless").dimensionality:
- raise ValueError(
- f"Unit conversion from {from_unit} to {to_unit} doesn't seem possible."
- )
- return scalar.to_reduced_units().magnitude
- def determine_flow_unit(stock_unit: str, time_unit: str = "h"):
- """For example:
- >>> determine_flow_unit("m³")
- 'm³/h'
- >>> determine_flow_unit("kWh")
- 'kW'
- """
- flow = to_preferred(ur.Quantity(stock_unit) / ur.Quantity(time_unit))
- return "{:~P}".format(flow.units)
- def determine_stock_unit(flow_unit: str, time_unit: str = "h"):
- """Determine the shortest unit of stock, given a unit of flow.
- For example:
- >>> determine_stock_unit("m³/h")
- 'm³'
- >>> determine_stock_unit("kW")
- 'kWh'
- """
- stock = to_preferred(ur.Quantity(flow_unit) * ur.Quantity(time_unit))
- return "{:~P}".format(stock.units)
- def units_are_convertible(
- from_unit: str, to_unit: str, duration_known: bool = True
- ) -> bool:
- """For example, a sensor with W units allows data to be posted with units:
- >>> units_are_convertible("kW", "W") # units just have different prefixes
- True
- >>> units_are_convertible("J/s", "W") # units can be converted using some multiplier
- True
- >>> units_are_convertible("Wh", "W") # units that represent a stock delta can, knowing the duration, be converted to a flow
- True
- >>> units_are_convertible("°C", "W")
- False
- """
- if not is_valid_unit(from_unit) or not is_valid_unit(to_unit):
- return False
- scalar = (
- ur.Quantity(from_unit).to_base_units() / ur.Quantity(to_unit).to_base_units()
- )
- if duration_known:
- return scalar.dimensionality in (
- ur.Quantity("h").dimensionality,
- ur.Quantity("dimensionless").dimensionality,
- )
- return scalar.dimensionality == ur.Quantity("dimensionless").dimensionality
- def is_power_unit(unit: str) -> bool:
- """For example:
- >>> is_power_unit("kW")
- True
- >>> is_power_unit("°C")
- False
- >>> is_power_unit("kWh")
- False
- >>> is_power_unit("EUR/MWh")
- False
- """
- if not is_valid_unit(unit):
- return False
- return ur.Quantity(unit).dimensionality == ur.Quantity("W").dimensionality
- def is_energy_unit(unit: str) -> bool:
- """For example:
- >>> is_energy_unit("kW")
- False
- >>> is_energy_unit("°C")
- False
- >>> is_energy_unit("kWh")
- True
- >>> is_energy_unit("EUR/MWh")
- False
- """
- if not is_valid_unit(unit):
- return False
- return ur.Quantity(unit).dimensionality == ur.Quantity("Wh").dimensionality
- def is_currency_unit(unit: str | pint.Quantity | pint.Unit) -> bool:
- """For Example:
- >>> is_currency_unit("EUR")
- True
- >>> is_currency_unit("KRW")
- True
- >>> is_currency_unit("potatoe")
- False
- >>> is_currency_unit("MW")
- False
- """
- if isinstance(unit, pint.Quantity):
- return is_currency_unit(unit.units)
- if isinstance(unit, pint.Unit):
- return is_currency_unit(str(unit))
- return Currency(code=unit) in list_all_currencies()
- def is_price_unit(unit: str) -> bool:
- """For example:
- >>> is_price_unit("EUR/MWh")
- True
- >>> is_price_unit("KRW/MWh")
- True
- >>> is_price_unit("KRW/MW")
- True
- >>> is_price_unit("beans/MW")
- False
- """
- if (
- unit[:3] in [str(c) for c in list_all_currencies()]
- and len(unit) > 3
- and unit[3] == "/"
- ):
- return True
- return False
- def is_energy_price_unit(unit: str) -> bool:
- """For example:
- >>> is_energy_price_unit("EUR/MWh")
- True
- >>> is_energy_price_unit("KRW/MWh")
- True
- >>> is_energy_price_unit("KRW/MW")
- False
- >>> is_energy_price_unit("beans/MW")
- False
- """
- if is_price_unit(unit) and is_energy_unit(unit[4:]):
- return True
- return False
- def is_capacity_price_unit(unit: str) -> bool:
- """For example:
- >>> is_capacity_price_unit("EUR/MW")
- True
- >>> is_capacity_price_unit("KRW/MW")
- True
- >>> is_capacity_price_unit("KRW/MWh")
- False
- >>> is_capacity_price_unit("beans/MWh")
- False
- """
- if is_price_unit(unit) and is_power_unit(unit[4:]):
- return True
- return False
- def is_speed_unit(unit: str) -> bool:
- """For example:
- >>> is_speed_unit("m/s")
- True
- >>> is_speed_unit("km/h")
- True
- >>> is_speed_unit("W")
- False
- >>> is_speed_unit("m/s²")
- False
- """
- if not is_valid_unit(unit):
- return False
- return ur.Quantity(unit).dimensionality == ur.Quantity("m/s").dimensionality
- def get_unit_dimension(unit: str) -> str:
- """For example:
- >>> get_unit_dimension("kW")
- 'power'
- >>> get_unit_dimension("kWh")
- 'energy'
- >>> get_unit_dimension("EUR/MWh")
- 'energy price'
- >>> get_unit_dimension("EUR/kW") # e.g. a capacity price or a peak price
- 'price'
- >>> get_unit_dimension("AUD")
- 'currency'
- >>> get_unit_dimension("%")
- 'percentage'
- >>> get_unit_dimension("°C")
- 'temperature'
- >>> get_unit_dimension("m")
- 'length'
- >>> get_unit_dimension("h")
- 'time'
- >>> get_unit_dimension("m/s")
- 'speed'
- """
- if is_power_unit(unit):
- return "power"
- if is_energy_unit(unit):
- return "energy"
- if is_energy_price_unit(unit):
- return "energy price"
- if is_price_unit(unit):
- return "price"
- if is_currency_unit(unit):
- return "currency"
- if is_speed_unit(unit):
- return "speed"
- if unit == "%":
- return "percentage"
- dimensions = ur.Quantity(unit).dimensionality
- if len(dimensions) == 1:
- return list(dimensions.keys())[0][1:-1]
- return "value"
- def _convert_time_units(
- data: tb.BeliefsSeries | pd.Series | list[int | float] | int | float,
- from_unit: str,
- to_unit: str,
- ):
- """Convert data with datetime or timedelta dtypes to float values.
- Use Unix epoch or the requested time unit, respectively.
- """
- if not to_unit[0].isdigit():
- # unit abbreviations passed to pd.Timedelta need a number (so, for example, h becomes 1h)
- to_unit = f"1{to_unit}"
- if "datetime" in from_unit:
- dt_data = pd.to_datetime(
- data, dayfirst=True if "dayfirst" in from_unit else False
- )
- # localize timezone naive data to the sensor's timezone, if available
- if dt_data.dt.tz is None:
- timezone = data.sensor.timezone if hasattr(data, "sensor") else "utc"
- dt_data = dt_data.dt.tz_localize(timezone)
- return (dt_data - pd.Timestamp("1970-01-01", tz="utc")) // pd.Timedelta(to_unit)
- else:
- return data / pd.Timedelta(to_unit)
- def convert_units(
- data: tb.BeliefsSeries | pd.Series | list[int | float] | int | float,
- from_unit: str,
- to_unit: str,
- event_resolution: timedelta | None = None,
- capacity: str | None = None,
- ) -> pd.Series | list[int | float] | int | float:
- """Updates data values to reflect the given unit conversion.
- Handles units in short scientific notation (e.g. m³/h, kW, and ºC), as well as three special units to convert from:
- - from_unit="datetime" (with data point such as "2023-05-02", "2023-05-02 05:14:49" or "2023-05-02 05:14:49 +02:00")
- - from_unit="dayfirst datetime" (with data point such as "02-05-2023")
- - from_unit="timedelta" (with data point such as "0 days 01:18:25")
- """
- if from_unit in ("datetime", "dayfirst datetime", "timedelta"):
- return _convert_time_units(data, from_unit, to_unit)
- if from_unit != to_unit:
- from_magnitudes = (
- data.to_numpy()
- if isinstance(data, pd.Series)
- else np.asarray(data) if isinstance(data, list) else np.array([data])
- )
- try:
- from_quantities = ur.Quantity(from_magnitudes, from_unit)
- except ValueError as e:
- # Catch units like "-W" and "100km"
- if str(e) == "Unit expression cannot have a scaling factor.":
- from_quantities = ur.Quantity(from_unit) * from_magnitudes
- else:
- raise e # reraise
- try:
- to_magnitudes = from_quantities.to(ur.Quantity(to_unit)).magnitude
- except pint.errors.DimensionalityError as e:
- # Catch multiplicative conversions that rely on a capacity, like "%" to "kWh" and vice versa
- if "from 'percent'" in str(e):
- to_magnitudes = (
- (from_quantities * ur.Quantity(capacity))
- .to(ur.Quantity(to_unit))
- .magnitude
- )
- elif "to 'percent'" in str(e):
- to_magnitudes = (
- (from_quantities / ur.Quantity(capacity))
- .to(ur.Quantity(to_unit))
- .magnitude
- )
- else:
- # Catch multiplicative conversions that use the resolution, like "kWh/15min" to "kW"
- if event_resolution is None and isinstance(data, tb.BeliefsSeries):
- event_resolution = data.event_resolution
- multiplier = determine_unit_conversion_multiplier(
- from_unit, to_unit, event_resolution
- )
- to_magnitudes = from_magnitudes * multiplier
- # Output type should match input type
- if isinstance(data, pd.Series):
- # Pandas Series
- data = pd.Series(
- to_magnitudes,
- index=data.index,
- name=data.name,
- )
- elif isinstance(data, list):
- # list
- data = list(to_magnitudes)
- else:
- # int or float
- data = to_magnitudes[0]
- return data
|