annotations.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. from __future__ import annotations
  2. from datetime import timedelta
  3. import pandas as pd
  4. from sqlalchemy import select
  5. from flexmeasures.data import db
  6. class Annotation(db.Model):
  7. """An annotation is a nominal value that applies to a specific time or time span.
  8. Examples of annotation types:
  9. - user annotation: annotation.type == "label" and annotation.source.type == "user"
  10. - unresolved alert: annotation.type == "alert"
  11. - resolved alert: annotation.type == "label" and annotation.source.type == "alerting script"
  12. - organisation holiday: annotation.type == "holiday" and annotation.source.type == "user"
  13. - public holiday: annotation.type == "holiday" and annotation.source.name == "workalendar"
  14. """
  15. id = db.Column(db.Integer, nullable=False, autoincrement=True, primary_key=True)
  16. start = db.Column(db.DateTime(timezone=True), nullable=False)
  17. end = db.Column(db.DateTime(timezone=True), nullable=False)
  18. belief_time = db.Column(db.DateTime(timezone=True), nullable=True)
  19. source_id = db.Column(db.Integer, db.ForeignKey("data_source.id"), nullable=False)
  20. source = db.relationship(
  21. "DataSource",
  22. foreign_keys=[source_id],
  23. backref=db.backref("annotations", lazy=True),
  24. )
  25. type = db.Column(
  26. db.Enum(
  27. "alert",
  28. "holiday",
  29. "label",
  30. "feedback",
  31. "warning",
  32. "error",
  33. name="annotation_type",
  34. ),
  35. nullable=False,
  36. )
  37. content = db.Column(db.String(1024), nullable=False)
  38. __table_args__ = (
  39. db.UniqueConstraint(
  40. "content",
  41. "start",
  42. "belief_time",
  43. "source_id",
  44. "type",
  45. name="annotation_content_key",
  46. ),
  47. )
  48. @property
  49. def duration(self) -> timedelta:
  50. return self.end - self.start
  51. @classmethod
  52. def add(
  53. cls,
  54. df: pd.DataFrame,
  55. annotation_type: str,
  56. expunge_session: bool = False,
  57. allow_overwrite: bool = False,
  58. bulk_save_objects: bool = False,
  59. commit_transaction: bool = False,
  60. ) -> list["Annotation"]:
  61. """Add a data frame describing annotations to the database and return the Annotation objects.
  62. :param df: Data frame describing annotations.
  63. Expects the following columns (or multi-index levels):
  64. - start
  65. - end or duration
  66. - content
  67. - belief_time
  68. - source
  69. :param annotation_type: One of the possible Enum values for annotation.type
  70. :param expunge_session: if True, all non-flushed instances are removed from the session before adding annotations.
  71. Expunging can resolve problems you might encounter with states of objects in your session.
  72. When using this option, you might want to flush newly-created objects which are not annotations
  73. (e.g. a sensor or data source object).
  74. :param allow_overwrite: if True, new objects are merged
  75. if False, objects are added to the session or bulk saved
  76. :param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(),
  77. which is quite fast but has several caveats, see:
  78. https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats
  79. if False, objects are added to the session with session.add_all()
  80. :param commit_transaction: if True, the session is committed
  81. if False, you can still add other data to the session
  82. and commit it all within an atomic transaction
  83. """
  84. df = df.reset_index()
  85. starts = df["start"]
  86. if "end" in df.columns:
  87. ends = df["end"]
  88. elif "start" in df.columns and "duration" in df.columns:
  89. ends = df["start"] + df["duration"]
  90. else:
  91. raise ValueError(
  92. "Missing 'end' column cannot be derived from columns 'start' and 'duration'."
  93. )
  94. values = df["content"]
  95. belief_times = df["belief_time"]
  96. sources = df["source"]
  97. annotations = [
  98. cls(
  99. content=row[0],
  100. start=row[1],
  101. end=row[2],
  102. belief_time=row[3],
  103. source=row[4],
  104. type=annotation_type,
  105. )
  106. for row in zip(values, starts, ends, belief_times, sources)
  107. ]
  108. # Deal with the database session
  109. if expunge_session:
  110. db.session.expunge_all()
  111. if not allow_overwrite:
  112. if bulk_save_objects:
  113. db.session.bulk_save_objects(annotations)
  114. else:
  115. db.session.add_all(annotations)
  116. else:
  117. for annotation in annotations:
  118. db.session.merge(annotation)
  119. if commit_transaction:
  120. db.session.commit()
  121. return annotations
  122. def __repr__(self) -> str:
  123. return f"<Annotation {self.id}: {self.content} ({self.type}), start: {self.start} end: {self.end}, source: {self.source}>"
  124. class AccountAnnotationRelationship(db.Model):
  125. """Links annotations to accounts."""
  126. __tablename__ = "annotations_accounts"
  127. id = db.Column(db.Integer(), primary_key=True)
  128. account_id = db.Column(db.Integer, db.ForeignKey("account.id", ondelete="CASCADE"))
  129. annotation_id = db.Column(
  130. db.Integer, db.ForeignKey("annotation.id", ondelete="CASCADE")
  131. )
  132. __table_args__ = (
  133. db.UniqueConstraint(
  134. "annotation_id",
  135. "account_id",
  136. name="annotations_accounts_annotation_id_key",
  137. ),
  138. )
  139. class GenericAssetAnnotationRelationship(db.Model):
  140. """Links annotations to generic assets."""
  141. __tablename__ = "annotations_assets"
  142. id = db.Column(db.Integer(), primary_key=True)
  143. generic_asset_id = db.Column(
  144. db.Integer, db.ForeignKey("generic_asset.id", ondelete="CASCADE")
  145. )
  146. annotation_id = db.Column(
  147. db.Integer, db.ForeignKey("annotation.id", ondelete="CASCADE")
  148. )
  149. __table_args__ = (
  150. db.UniqueConstraint(
  151. "annotation_id",
  152. "generic_asset_id",
  153. name="annotations_assets_annotation_id_key",
  154. ),
  155. )
  156. class SensorAnnotationRelationship(db.Model):
  157. """Links annotations to sensors."""
  158. __tablename__ = "annotations_sensors"
  159. id = db.Column(db.Integer(), primary_key=True)
  160. sensor_id = db.Column(db.Integer, db.ForeignKey("sensor.id", ondelete="CASCADE"))
  161. annotation_id = db.Column(
  162. db.Integer, db.ForeignKey("annotation.id", ondelete="CASCADE")
  163. )
  164. __table_args__ = (
  165. db.UniqueConstraint(
  166. "annotation_id",
  167. "sensor_id",
  168. name="annotations_sensors_annotation_id_key",
  169. ),
  170. )
  171. def get_or_create_annotation(
  172. annotation: Annotation,
  173. ) -> Annotation:
  174. """Add annotation to db session if it doesn't exist in the session already.
  175. Return the old annotation object if it exists (and expunge the new one). Otherwise, return the new one.
  176. """
  177. with db.session.no_autoflush:
  178. existing_annotation = db.session.execute(
  179. select(Annotation).filter(
  180. Annotation.content == annotation.content,
  181. Annotation.start == annotation.start,
  182. Annotation.end == annotation.end,
  183. Annotation.source == annotation.source,
  184. Annotation.type == annotation.type,
  185. )
  186. ).scalar_one_or_none()
  187. if existing_annotation is None:
  188. db.session.add(annotation)
  189. return annotation
  190. if annotation in db.session:
  191. db.session.expunge(annotation)
  192. return existing_annotation
  193. def to_annotation_frame(annotations: list[Annotation]) -> pd.DataFrame:
  194. """Transform a list of annotations into a DataFrame.
  195. We don't use a BeliefsDataFrame here, because they are designed for quantitative data only.
  196. """
  197. return pd.DataFrame(
  198. [
  199. [a.start, a.end, a.belief_time, a.source, a.type, a.content]
  200. for a in annotations
  201. ],
  202. columns=["start", "end", "belief_time", "source", "type", "content"],
  203. )