time_series.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from __future__ import annotations
  2. from typing import Any
  3. from datetime import timedelta
  4. import inflect
  5. from flask import current_app
  6. import pandas as pd
  7. import timely_beliefs as tb
  8. from flexmeasures.data.queries.utils import simplify_index
  9. p = inflect.engine()
  10. def aggregate_values(bdf_dict: dict[Any, tb.BeliefsDataFrame]) -> tb.BeliefsDataFrame:
  11. # todo: test this function rigorously, e.g. with empty bdfs in bdf_dict
  12. # todo: consider 1 bdf with beliefs from source A, plus 1 bdf with beliefs from source B -> 1 bdf with sources A+B
  13. # todo: consider 1 bdf with beliefs from sources A and B, plus 1 bdf with beliefs from source C. -> 1 bdf with sources A+B and A+C
  14. # todo: consider 1 bdf with beliefs from sources A and B, plus 1 bdf with beliefs from source C and D. -> 1 bdf with sources A+B, A+C, B+C and B+D
  15. # Relevant issue: https://github.com/SeitaBV/timely-beliefs/issues/33
  16. # Nothing to aggregate
  17. if len(bdf_dict) == 1:
  18. return list(bdf_dict.values())[0]
  19. unique_source_ids: list[int] = []
  20. for bdf in bdf_dict.values():
  21. unique_source_ids.extend(bdf.lineage.sources)
  22. if not bdf.lineage.unique_beliefs_per_event_per_source:
  23. current_app.logger.warning(
  24. "Not implemented: only aggregation of deterministic uni-source beliefs (1 per event) is properly supported"
  25. )
  26. if bdf.lineage.number_of_sources > 1:
  27. current_app.logger.warning(
  28. "Not implemented: aggregating multi-source beliefs about the same sensor."
  29. )
  30. if len(set(unique_source_ids)) > 1:
  31. current_app.logger.warning(
  32. f"Not implemented: aggregating multi-source beliefs. Source {unique_source_ids[1:]} will be treated as if source {unique_source_ids[0]}"
  33. )
  34. data_as_bdf = tb.BeliefsDataFrame()
  35. for k, v in bdf_dict.items():
  36. if data_as_bdf.empty:
  37. data_as_bdf = v.copy()
  38. elif not v.empty:
  39. data_as_bdf["event_value"] = data_as_bdf["event_value"].add(
  40. simplify_index(v.copy())["event_value"],
  41. fill_value=0,
  42. level="event_start",
  43. ) # we only look at the event_start index level and sum up duplicates that level
  44. return data_as_bdf
  45. def drop_unchanged_beliefs(bdf: tb.BeliefsDataFrame) -> tb.BeliefsDataFrame:
  46. """Drop beliefs that are already stored in the database with an earlier belief time.
  47. Also drop beliefs that are already in the data with an earlier belief time.
  48. Quite useful function to prevent cluttering up your database with beliefs that remain unchanged over time.
  49. """
  50. if bdf.empty:
  51. return bdf
  52. # Save the oldest ex-post beliefs explicitly, even if they do not deviate from the most recent ex-ante beliefs
  53. ex_ante_bdf = bdf[bdf.belief_horizons > timedelta(0)]
  54. ex_post_bdf = bdf[bdf.belief_horizons <= timedelta(0)]
  55. if not ex_ante_bdf.empty and not ex_post_bdf.empty:
  56. # We treat each part separately to avoid that ex-post knowledge would be lost
  57. ex_ante_bdf = drop_unchanged_beliefs(ex_ante_bdf)
  58. ex_post_bdf = drop_unchanged_beliefs(ex_post_bdf)
  59. bdf = pd.concat([ex_ante_bdf, ex_post_bdf])
  60. return bdf
  61. # Remove unchanged beliefs from within the new data itself
  62. index_names = bdf.index.names
  63. bdf = (
  64. bdf.sort_index()
  65. .reset_index()
  66. .drop_duplicates(
  67. ["event_start", "source", "cumulative_probability", "event_value"],
  68. keep="first",
  69. )
  70. .set_index(index_names)
  71. )
  72. # Remove unchanged beliefs with respect to what is already stored in the database
  73. return (
  74. bdf.convert_index_from_belief_horizon_to_time()
  75. .groupby(level=["belief_time", "source"], group_keys=False, as_index=False)
  76. .apply(_drop_unchanged_beliefs_compared_to_db)
  77. )
  78. def _drop_unchanged_beliefs_compared_to_db(
  79. bdf: tb.BeliefsDataFrame,
  80. ) -> tb.BeliefsDataFrame:
  81. """Drop beliefs that are already stored in the database with an earlier belief time.
  82. Assumes a BeliefsDataFrame with a unique belief time and unique source,
  83. and either all ex-ante beliefs or all ex-post beliefs.
  84. It is preferable to call the public function drop_unchanged_beliefs instead.
  85. """
  86. if bdf.belief_horizons[0] > timedelta(0):
  87. # Look up only ex-ante beliefs (horizon > 0)
  88. kwargs = dict(horizons_at_least=timedelta(0))
  89. else:
  90. # Look up only ex-post beliefs (horizon <= 0)
  91. kwargs = dict(horizons_at_most=timedelta(0))
  92. previous_most_recent_beliefs_in_db = bdf.sensor.search_beliefs(
  93. event_starts_after=bdf.event_starts[0],
  94. event_ends_before=bdf.event_ends[-1],
  95. beliefs_before=bdf.lineage.belief_times[0], # unique belief time
  96. source=bdf.lineage.sources[0], # unique source
  97. most_recent_beliefs_only=True,
  98. **kwargs,
  99. )
  100. compare_fields = ["event_start", "source", "cumulative_probability", "event_value"]
  101. a = bdf.reset_index().set_index(compare_fields)
  102. b = previous_most_recent_beliefs_in_db.reset_index().set_index(compare_fields)
  103. bdf = a.drop(
  104. b.index,
  105. errors="ignore",
  106. axis=0,
  107. )
  108. # Keep whole probabilistic beliefs, not just the parts that changed
  109. c = bdf.reset_index().set_index(["event_start", "source"])
  110. d = a.reset_index().set_index(["event_start", "source"])
  111. bdf = d[d.index.isin(c.index)]
  112. bdf = bdf.reset_index().set_index(
  113. ["event_start", "belief_time", "source", "cumulative_probability"]
  114. )
  115. return bdf