time_utils.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. """
  2. Utils for dealing with time
  3. """
  4. from __future__ import annotations
  5. import re
  6. from datetime import datetime, timedelta, timezone
  7. from flask import current_app
  8. from flask_security.core import current_user
  9. from humanize import naturaldate, naturaltime
  10. import pandas as pd
  11. from pandas.tseries.frequencies import to_offset
  12. import pytz
  13. from dateutil import tz
  14. def server_now() -> datetime:
  15. """The current time (timezone aware), converted to the timezone of the FlexMeasures platform."""
  16. return datetime.now(get_timezone())
  17. def ensure_local_timezone(
  18. dt: pd.Timestamp | datetime, tz_name: str = "Europe/Amsterdam"
  19. ) -> pd.Timestamp | datetime:
  20. """If no timezone is given, assume the datetime is in the given timezone and make it explicit.
  21. Otherwise, if a timezone is given, convert to that timezone."""
  22. if isinstance(dt, datetime):
  23. if dt.tzinfo is not None:
  24. return dt.astimezone(tz.gettz(tz_name))
  25. else:
  26. return dt.replace(tzinfo=tz.gettz(tz_name))
  27. if dt.tzinfo is not None:
  28. return dt.tz_convert(tz_name)
  29. else:
  30. return dt.tz_localize(tz_name)
  31. def as_server_time(dt: datetime) -> datetime:
  32. """The datetime represented in the timezone of the FlexMeasures platform.
  33. If dt is naive, we assume it is UTC time.
  34. """
  35. return naive_utc_from(dt).replace(tzinfo=pytz.utc).astimezone(get_timezone())
  36. def localized_datetime(dt: datetime) -> datetime:
  37. """
  38. Localise a datetime to the timezone of the FlexMeasures platform.
  39. Note: this will change nothing but the tzinfo field.
  40. """
  41. return get_timezone().localize(naive_utc_from(dt))
  42. def naive_utc_from(dt: datetime) -> datetime:
  43. """
  44. Return a naive datetime, that is localised to UTC if it has a timezone.
  45. If dt is naive, we assume it is already in UTC time.
  46. """
  47. if not hasattr(dt, "tzinfo") or dt.tzinfo is None:
  48. # let's hope this is the UTC time you expect
  49. return dt
  50. else:
  51. return dt.astimezone(pytz.utc).replace(tzinfo=None)
  52. def tz_index_naively(
  53. data: pd.DataFrame | pd.Series | pd.DatetimeIndex,
  54. ) -> pd.DataFrame | pd.Series | pd.DatetimeIndex:
  55. """Turn any DatetimeIndex into a tz-naive one, then return. Useful for bokeh, for instance."""
  56. if isinstance(data, pd.DatetimeIndex):
  57. return data.tz_localize(tz=None)
  58. if hasattr(data, "index") and isinstance(data.index, pd.DatetimeIndex):
  59. # TODO: if index is already naive, don't
  60. data.index = data.index.tz_localize(tz=None)
  61. return data
  62. def localized_datetime_str(dt: datetime, dt_format: str = "%Y-%m-%d %I:%M %p") -> str:
  63. """
  64. Localise a datetime to the timezone of the FlexMeasures platform.
  65. If no datetime is passed in, use server_now() as basis.
  66. Hint: This can be set as a jinja filter, so we can display local time in the app, e.g.:
  67. app.jinja_env.filters['localized_datetime'] = localized_datetime_str
  68. """
  69. if dt is None:
  70. dt = server_now()
  71. local_tz = get_timezone()
  72. local_dt = naive_utc_from(dt).astimezone(local_tz)
  73. return local_dt.strftime(dt_format)
  74. def naturalized_datetime_str(
  75. dt: datetime | str | None, now: datetime | None = None
  76. ) -> str:
  77. """
  78. Naturalise a datetime object (into a human-friendly string).
  79. The dt parameter (as well as the now parameter if you use it)
  80. can be either naive or tz-aware. We assume UTC in the naive case.
  81. If dt parameter is a string it is expected to look like 'Sun, 28 Apr 2024 08:55:58 GMT'.
  82. String format is supported for case when we make json from internal api response
  83. e.g ui/crud/users auditlog router
  84. We use the `humanize` library to generate a human-friendly string.
  85. If dt is not longer ago than 24 hours, we use humanize.naturaltime (e.g. "3 hours ago"),
  86. otherwise humanize.naturaldate (e.g. "one week ago")
  87. Hint: This can be set as a jinja filter, so we can display local time in the app, e.g.:
  88. app.jinja_env.filters['naturalized_datetime'] = naturalized_datetime_str
  89. """
  90. if dt is None:
  91. return "never"
  92. if isinstance(dt, str):
  93. dt = datetime.strptime(dt, "%a, %d %b %Y %H:%M:%S %Z")
  94. if now is None:
  95. now = datetime.now(timezone.utc)
  96. naive_utc_now = naive_utc_from(now)
  97. # Convert or localize to utc
  98. if dt.tzinfo is None:
  99. utc_dt = pd.Timestamp(dt).tz_localize("utc")
  100. else:
  101. utc_dt = pd.Timestamp(dt).tz_convert("utc")
  102. # decide which humanize call to use for naturalization
  103. if naive_utc_from(utc_dt) >= naive_utc_now - timedelta(hours=24):
  104. # return natural time (naive utc dt with respect to naive utc now)
  105. return naturaltime(
  106. utc_dt.replace(tzinfo=None),
  107. when=naive_utc_now,
  108. )
  109. else:
  110. # return natural date in the user's timezone
  111. local_dt = utc_dt.tz_convert(get_timezone(of_user=True))
  112. return naturaldate(local_dt)
  113. def resolution_to_hour_factor(resolution: str | timedelta) -> float:
  114. """Return the factor with which a value needs to be multiplied in order to get the value per hour,
  115. e.g. 10 MW at a resolution of 15min are 2.5 MWh per time step.
  116. :param resolution: timedelta or pandas offset such as "15min" or "h"
  117. """
  118. if isinstance(resolution, timedelta):
  119. return resolution / timedelta(hours=1)
  120. return pd.Timedelta(resolution).to_pytimedelta() / timedelta(hours=1)
  121. def decide_resolution(start: datetime | None, end: datetime | None) -> str:
  122. """
  123. Decide on a practical resolution given the length of the selected time period.
  124. Useful for querying or plotting.
  125. """
  126. if start is None or end is None:
  127. return "15min" # default if we cannot tell period
  128. period_length = end - start
  129. if period_length > timedelta(weeks=16):
  130. resolution = "168h" # So upon switching from days to weeks, you get at least 16 data points
  131. elif period_length > timedelta(days=14):
  132. resolution = "24h" # So upon switching from hours to days, you get at least 14 data points
  133. elif period_length > timedelta(hours=48):
  134. resolution = "1h" # So upon switching from 15min to hours, you get at least 48 data points
  135. elif period_length > timedelta(hours=8):
  136. resolution = "15min"
  137. else:
  138. resolution = "5min" # we are (currently) not going lower than 5 minutes
  139. return resolution
  140. def get_timezone(of_user=False) -> pytz.BaseTzInfo:
  141. """Return the FlexMeasures timezone, or if desired try to return the timezone of the current user."""
  142. default_timezone = pytz.timezone(
  143. current_app.config.get("FLEXMEASURES_TIMEZONE", "")
  144. )
  145. if not of_user:
  146. return default_timezone
  147. if current_user.is_anonymous:
  148. return default_timezone
  149. if current_user.timezone not in pytz.common_timezones:
  150. return default_timezone
  151. return pytz.timezone(current_user.timezone)
  152. def round_to_closest_quarter(dt: datetime) -> datetime:
  153. new_hour = dt.hour
  154. new_minute = 15 * round((float(dt.minute) + float(dt.second) / 60) / 15)
  155. if new_minute == 60:
  156. new_hour += 1
  157. new_minute = 0
  158. return datetime(dt.year, dt.month, dt.day, new_hour, new_minute, tzinfo=dt.tzinfo)
  159. def round_to_closest_hour(dt: datetime) -> datetime:
  160. if dt.minute >= 30:
  161. return datetime(dt.year, dt.month, dt.day, dt.hour + 1, tzinfo=dt.tzinfo)
  162. else:
  163. return datetime(dt.year, dt.month, dt.day, dt.hour, tzinfo=dt.tzinfo)
  164. def get_most_recent_quarter() -> datetime:
  165. now = server_now()
  166. return now.replace(minute=now.minute - (now.minute % 15), second=0, microsecond=0)
  167. def get_most_recent_hour() -> datetime:
  168. now = server_now()
  169. return now.replace(minute=now.minute - (now.minute % 60), second=0, microsecond=0)
  170. def get_most_recent_clocktime_window(
  171. window_size_in_minutes: int,
  172. now: datetime | None = None,
  173. grace_period_in_seconds: int | None = 0,
  174. ) -> tuple[datetime, datetime]:
  175. """
  176. Calculate a recent time window, returning a start and end minute so that
  177. a full hour can be filled with such windows, e.g.:
  178. Calling this function at 15:01:xx with window size 5 -> (14:55:00, 15:00:00)
  179. Calling this function at 03:36:xx with window size 15 -> (03:15:00, 03:30:00)
  180. We can demand a grace period (of x seconds) to have passed before we are ready to accept that we're in a new window:
  181. Calling this function at 15:00:16 with window size 5 and grace period of 30 seconds -> (14:50:00, 14:55:00)
  182. window_size_in_minutes is assumed to > 0 and < = 60, and a divisor of 60 (1, 2, ..., 30, 60).
  183. If now is not given, the current server time is used.
  184. if now / the current time lies within a boundary minute (e.g. 15 when window_size_in_minutes=5),
  185. then the window is not deemed over and the previous one is returned (in this case, [5, 10])
  186. Returns two datetime objects. They'll be in the timezone (if given) of the now parameter,
  187. or in the server timezone (see FLEXMEASURES_TIMEZONE setting).
  188. """
  189. assert window_size_in_minutes > 0
  190. assert 60 % window_size_in_minutes == 0
  191. if now is None:
  192. now = server_now()
  193. last_full_minute = now.replace(second=0, microsecond=0)
  194. if (
  195. grace_period_in_seconds is not None
  196. and grace_period_in_seconds > 0
  197. and (now - last_full_minute).seconds < grace_period_in_seconds
  198. ):
  199. last_full_minute -= timedelta(minutes=1 + grace_period_in_seconds // 60)
  200. last_round_minute = last_full_minute.minute - (
  201. last_full_minute.minute % window_size_in_minutes
  202. )
  203. begin_time = last_full_minute.replace(minute=last_round_minute) - timedelta(
  204. minutes=window_size_in_minutes
  205. )
  206. end_time = begin_time + timedelta(minutes=window_size_in_minutes)
  207. return begin_time, end_time
  208. def get_first_day_of_next_month() -> datetime:
  209. return (datetime.now().replace(day=1) + timedelta(days=32)).replace(day=1)
  210. def forecast_horizons_for(resolution: str | timedelta) -> list[str] | list[timedelta]:
  211. """Return a list of horizons that are supported per resolution.
  212. Return values or of the same type as the input."""
  213. if isinstance(resolution, timedelta):
  214. resolution_str = timedelta_to_pandas_freq_str(resolution)
  215. else:
  216. resolution_str = resolution
  217. horizons = []
  218. if resolution_str in ("5T", "5min", "10T", "10min"):
  219. horizons = ["1h", "6h", "24h"]
  220. elif resolution_str in ("15T", "15min", "1h", "H"):
  221. horizons = ["1h", "6h", "24h", "48h"]
  222. elif resolution_str in ("24h", "D"):
  223. horizons = ["24h", "48h"]
  224. elif resolution_str in ("168h", "7D"):
  225. horizons = ["168h"]
  226. if isinstance(resolution, timedelta):
  227. return [pd.to_timedelta(to_offset(h)) for h in horizons]
  228. else:
  229. return horizons
  230. def supported_horizons() -> list[timedelta]:
  231. return [
  232. timedelta(hours=1),
  233. timedelta(hours=6),
  234. timedelta(hours=24),
  235. timedelta(hours=48),
  236. ]
  237. def timedelta_to_pandas_freq_str(resolution: timedelta) -> str:
  238. return to_offset(resolution).freqstr
  239. def duration_isoformat(duration: timedelta):
  240. """Adapted version of isodate.duration_isoformat for formatting a datetime.timedelta.
  241. The difference is that absolute days are not formatted as nominal days.
  242. Workaround for https://github.com/gweis/isodate/issues/74.
  243. """
  244. ret = []
  245. usecs = abs(
  246. (duration.days * 24 * 60 * 60 + duration.seconds) * 1000000
  247. + duration.microseconds
  248. )
  249. seconds, usecs = divmod(usecs, 1000000)
  250. minutes, seconds = divmod(seconds, 60)
  251. hours, minutes = divmod(minutes, 60)
  252. if hours or minutes or seconds or usecs:
  253. ret.append("T")
  254. if hours:
  255. ret.append("%sH" % hours)
  256. if minutes:
  257. ret.append("%sM" % minutes)
  258. if seconds or usecs:
  259. if usecs:
  260. ret.append(("%d.%06d" % (seconds, usecs)).rstrip("0"))
  261. else:
  262. ret.append("%d" % seconds)
  263. ret.append("S")
  264. # at least one component has to be there.
  265. repl = ret and "".join(ret) or "T0H"
  266. return re.sub("%P", repl, "P%P")
  267. def determine_minimum_resampling_resolution(
  268. event_resolutions: list[timedelta],
  269. ) -> timedelta:
  270. """Return minimum non-zero event resolution, or zero resolution if none of the event resolutions is non-zero."""
  271. condition = list(
  272. event_resolution
  273. for event_resolution in event_resolutions
  274. if event_resolution > timedelta(0)
  275. )
  276. return min(condition) if any(condition) else timedelta(0)
  277. def to_http_time(dt: pd.Timestamp | datetime) -> str:
  278. """Formats datetime using the Internet Message Format fixdate.
  279. >>> to_http_time(pd.Timestamp("2022-12-13 14:06:23Z"))
  280. 'Tue, 13 Dec 2022 14:06:23 GMT'
  281. References
  282. ----------
  283. IMF-fixdate: https://www.rfc-editor.org/rfc/rfc7231#section-7.1.1.1
  284. """
  285. return dt.strftime("%a, %d %b %Y %H:%M:%S GMT")
  286. def get_max_planning_horizon(resolution: timedelta) -> timedelta | None:
  287. """Determine the maximum planning horizon for the given sensor resolution."""
  288. max_planning_horizon = current_app.config.get("FLEXMEASURES_MAX_PLANNING_HORIZON")
  289. if isinstance(max_planning_horizon, int):
  290. # Config setting specifies maximum number of planning steps
  291. max_planning_horizon *= resolution
  292. return max_planning_horizon
  293. def apply_offset_chain(
  294. dt: pd.Timestamp | datetime, offset_chain: str
  295. ) -> pd.Timestamp | datetime:
  296. """Apply an offset chain to a date.
  297. An offset chain consist of multiple (pandas) offset strings separated by commas. Moreover,
  298. this function implements the offset string "DB", which stands for Day Begin, to
  299. get a date from a datetime, i.e. removing time details finer than a day.
  300. Args:
  301. dt (pd.Timestamp | datetime)
  302. offset_chain (str)
  303. Returns:
  304. pd.Timestamp | datetime (same type as given dt)
  305. """
  306. if len(offset_chain) == 0:
  307. return dt
  308. # We need a Pandas structure to apply the offsets
  309. if isinstance(dt, datetime):
  310. _dt = pd.Timestamp(dt)
  311. elif isinstance(dt, pd.Timestamp):
  312. _dt = dt
  313. else:
  314. raise TypeError()
  315. # Apply the offsets
  316. for offset in offset_chain.split(","):
  317. try:
  318. _dt += to_offset(offset.strip())
  319. except ValueError:
  320. if offset.strip().lower() == "db": # db = day begin
  321. _dt = _dt.floor("D")
  322. elif offset.strip().lower() == "hb": # hb = hour begin
  323. _dt = _dt.floor("h")
  324. # Return output in the same type as the input
  325. if isinstance(dt, datetime):
  326. return _dt.to_pydatetime()
  327. return _dt
  328. def to_utc_timestamp(value):
  329. """
  330. Convert a datetime object or string to a UTC timestamp (seconds since epoch).
  331. The dt parameter can be either naive or tz-aware. We assume UTC in the naive case.
  332. If dt parameter is a string, it is expected to look like 'Sun, 28 Apr 2024 08:55:58 GMT'.
  333. String format is supported for cases when we process JSON from internal API responses,
  334. e.g., ui/crud/users auditlog router.
  335. Returns:
  336. - Float: seconds since Unix epoch (1970-01-01 00:00:00 UTC)
  337. - None: if input is None
  338. Hint: This can be set as a jinja filter to display UTC timestamps in the app, e.g.:
  339. app.jinja_env.filters['to_utc_timestamp'] = to_utc_timestamp
  340. Example usage:
  341. >>> to_utc_timestamp(datetime(2024, 4, 28, 8, 55, 58))
  342. 1714294558.0
  343. >>> to_utc_timestamp("Sun, 28 Apr 2024 08:55:58 GMT")
  344. 1714294558.0
  345. >>> to_utc_timestamp(None)
  346. """
  347. if value is None:
  348. return None
  349. if isinstance(value, str):
  350. # Parse string datetime in the format 'Tue, 13 Dec 2022 14:06:23 GMT'
  351. try:
  352. value = datetime.strptime(value, "%a, %d %b %Y %H:%M:%S %Z")
  353. except ValueError:
  354. # Return None or raise an error if the string is in an unexpected format
  355. return None
  356. if isinstance(value, datetime):
  357. if value.tzinfo is None:
  358. # If naive, assume UTC
  359. value = pd.Timestamp(value).tz_localize("utc")
  360. else:
  361. # Convert to UTC if already timezone-aware
  362. value = pd.Timestamp(value).tz_convert("utc")
  363. # Return Unix timestamp (seconds since epoch)
  364. return value.timestamp()