task_runs.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from datetime import datetime, timezone
  2. from sqlalchemy import select
  3. from flexmeasures.data import db
  4. class LatestTaskRun(db.Model):
  5. """ "
  6. Log the (latest) running of a task.
  7. This is intended to be used for live monitoring. For a full analysis,
  8. there are log files.
  9. """
  10. name = db.Column(db.String(80), primary_key=True)
  11. datetime = db.Column(db.DateTime(timezone=True), default=datetime.now(timezone.utc))
  12. status = db.Column(db.Boolean, default=True)
  13. def __repr__(self):
  14. return "<TaskRun [%s] at %s (status: %s)>" % (
  15. self.name,
  16. self.datetime,
  17. {True: "ok", False: "err"}[self.status],
  18. )
  19. @staticmethod
  20. def record_run(task_name: str, status: bool):
  21. """
  22. Record the latest task run (overwriting previous ones).
  23. If the row is not yet in the table, create it first.
  24. Does not commit.
  25. """
  26. task_run = db.session.execute(
  27. select(LatestTaskRun).filter(LatestTaskRun.name == task_name)
  28. ).scalar_one_or_none()
  29. if task_run is None:
  30. task_run = LatestTaskRun(name=task_name)
  31. db.session.add(task_run)
  32. task_run.datetime = datetime.now(timezone.utc)
  33. task_run.status = status