utils.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. """
  2. Utils around the data models and db sessions
  3. """
  4. from __future__ import annotations
  5. from flask import current_app
  6. from timely_beliefs import BeliefsDataFrame, BeliefsSeries
  7. from sqlalchemy import select
  8. from flexmeasures.data import db
  9. from flexmeasures.data.models.data_sources import DataSource
  10. from flexmeasures.data.models.time_series import TimedBelief
  11. from flexmeasures.data.services.time_series import drop_unchanged_beliefs
  12. def save_to_session(objects: list[db.Model], overwrite: bool = False):
  13. """Utility function to save to database, either efficiently with a bulk save, or inefficiently with a merge save."""
  14. if not overwrite:
  15. db.session.bulk_save_objects(objects)
  16. else:
  17. for o in objects:
  18. db.session.merge(o)
  19. def get_data_source(
  20. data_source_name: str,
  21. data_source_model: str | None = None,
  22. data_source_version: str | None = None,
  23. data_source_type: str = "script",
  24. ) -> DataSource:
  25. """Make sure we have a data source. Create one if it doesn't exist, and add to session.
  26. Meant for scripts that may run for the first time.
  27. """
  28. data_source = db.session.execute(
  29. select(DataSource).filter_by(
  30. name=data_source_name,
  31. model=data_source_model,
  32. version=data_source_version,
  33. type=data_source_type,
  34. )
  35. ).scalar_one_or_none()
  36. if data_source is None:
  37. data_source = DataSource(
  38. name=data_source_name,
  39. model=data_source_model,
  40. version=data_source_version,
  41. type=data_source_type,
  42. )
  43. db.session.add(data_source)
  44. db.session.flush() # populate the primary key attributes (like id) without committing the transaction
  45. current_app.logger.info(
  46. f'Session updated with new {data_source_type} data source "{data_source.__repr__()}".'
  47. )
  48. return data_source
  49. def save_to_db(
  50. data: BeliefsDataFrame | BeliefsSeries | list[BeliefsDataFrame | BeliefsSeries],
  51. bulk_save_objects: bool = True,
  52. save_changed_beliefs_only: bool = True,
  53. ) -> str:
  54. """Save the timed beliefs to the database.
  55. Note: This function does not commit. It does, however, flush the session. Best to keep transactions short.
  56. We make the distinction between updating beliefs and replacing beliefs.
  57. # Updating beliefs
  58. An updated belief is a belief from the same source as some already saved belief, and about the same event,
  59. but with a later belief time. If it has a different event value, then it represents a changed belief.
  60. Note that it is possible to explicitly record unchanged beliefs (i.e. updated beliefs with a later belief time,
  61. but with the same event value), by setting save_changed_beliefs_only to False.
  62. # Replacing beliefs
  63. A replaced belief is a belief from the same source as some already saved belief,
  64. and about the same event and with the same belief time, but with a different event value.
  65. Replacing beliefs is not allowed, because messing with the history corrupts data lineage.
  66. Corrections should instead be recorded as updated beliefs.
  67. Servers in 'play' mode are exempt from this rule, to facilitate replaying simulations.
  68. :param data: BeliefsDataFrame (or a list thereof) to be saved
  69. :param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(),
  70. which is quite fast but has several caveats, see:
  71. https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats
  72. :param save_changed_beliefs_only: if True, unchanged beliefs are skipped (updated beliefs are only stored if they represent changed beliefs)
  73. if False, all updated beliefs are stored
  74. :returns: status string, one of the following:
  75. - 'success': all beliefs were saved
  76. - 'success_with_unchanged_beliefs_skipped': not all beliefs represented a state change
  77. - 'success_but_nothing_new': no beliefs represented a state change
  78. """
  79. # Convert to list
  80. if not isinstance(data, list):
  81. timed_values_list = [data]
  82. else:
  83. timed_values_list = data
  84. status = "success"
  85. values_saved = 0
  86. for timed_values in timed_values_list:
  87. if timed_values.empty:
  88. # Nothing to save
  89. continue
  90. # Convert series to frame if needed
  91. if isinstance(timed_values, BeliefsSeries):
  92. timed_values = timed_values.rename("event_value").to_frame()
  93. len_before = len(timed_values)
  94. if save_changed_beliefs_only:
  95. # Drop beliefs that haven't changed
  96. timed_values = drop_unchanged_beliefs(timed_values)
  97. len_after = len(timed_values)
  98. if len_after < len_before:
  99. status = "success_with_unchanged_beliefs_skipped"
  100. # Work around bug in which groupby still introduces an index level, even though we asked it not to
  101. if None in timed_values.index.names:
  102. timed_values.index = timed_values.index.droplevel(None)
  103. if timed_values.empty:
  104. # No state changes among the beliefs
  105. continue
  106. current_app.logger.info("SAVING TO DB...")
  107. TimedBelief.add_to_session(
  108. session=db.session,
  109. beliefs_data_frame=timed_values,
  110. bulk_save_objects=bulk_save_objects,
  111. allow_overwrite=current_app.config.get(
  112. "FLEXMEASURES_ALLOW_DATA_OVERWRITE", False
  113. ),
  114. )
  115. values_saved += len(timed_values)
  116. # Flush to bring up potential unique violations (due to attempting to replace beliefs)
  117. db.session.flush()
  118. if values_saved == 0:
  119. status = "success_but_nothing_new"
  120. return status