transactional.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. """
  2. These, and only these, functions should help you with treating your own code
  3. in the context of one database transaction. Which makes our lives easier.
  4. """
  5. from __future__ import annotations
  6. import sys
  7. import click
  8. from flask import current_app
  9. from flask_sqlalchemy import SQLAlchemy
  10. from flexmeasures.data import db
  11. from flexmeasures.utils.error_utils import get_err_source_info
  12. from flexmeasures.utils.coding_utils import optional_arg_decorator
  13. from flexmeasures.data.models.task_runs import LatestTaskRun
  14. def as_transaction(db_function):
  15. """Decorator for handling any function which contains SQLAlchemy commands as one database transaction (ACID).
  16. Calls db operation function and when it is done, commits the db session.
  17. Rolls back the session if anything goes wrong.
  18. If useful, the first argument can be the db (SQLAlchemy) object and the rest of the args
  19. are sent through to the function. If this happened, the session is closed at the end.
  20. """
  21. def wrap(*args, **kwargs):
  22. close_session = False
  23. # negotiate which db object to use
  24. db_obj_passed = len(args) > 0 and isinstance(args[0], SQLAlchemy)
  25. if db_obj_passed:
  26. the_db = args[0]
  27. close_session = True
  28. else:
  29. the_db = db
  30. # run actual function, handle any exceptions and re-raise
  31. try:
  32. db_function(*args, **kwargs)
  33. the_db.session.commit()
  34. except Exception as e:
  35. current_app.logger.error(
  36. "[%s] Encountered Problem: %s" % (db_function.__name__, str(e))
  37. )
  38. the_db.session.rollback()
  39. raise
  40. finally:
  41. if close_session:
  42. the_db.session.close()
  43. return wrap
  44. def after_request_exception_rollback_session(exception):
  45. """
  46. Central place to handle transactions finally.
  47. So - usually your view code should not have to deal with
  48. rolling back.
  49. Our policy *is* that we don't auto-commit (we used to do that here).
  50. Some more reading is e.g. here https://github.com/pallets/flask-sqlalchemy/issues/216
  51. Register this on your app via the teardown_request setup method.
  52. We roll back the session if there was any error (which only has an effect if
  53. the session has not yet been committed).
  54. Flask-SQLAlchemy is closing the scoped sessions automatically."""
  55. if exception is not None:
  56. db.session.rollback()
  57. return
  58. class PartialTaskCompletionException(Exception):
  59. """By raising this Exception in a task, no rollback will happen even if not everything was successful
  60. and the data which was generated will get committed. The task status will still be False, so the non-successful
  61. parts can be inspected."""
  62. pass
  63. @optional_arg_decorator
  64. def task_with_status_report(task_function, task_name: str | None = None):
  65. """Decorator for tasks which should report their runtime and status in the db (as LatestTaskRun entries).
  66. Tasks decorated with this endpoint should also leave committing or rolling back the session to this
  67. decorator (for the reasons that it is nice to centralise that but also practically, this decorator
  68. still needs to add to the session).
  69. If the task wants to commit partial results, and at the same time report that some things did not run well,
  70. it can raise a PartialTaskCompletionException and we recommend to use save-points (db.session.being_nested) to
  71. do partial rollbacks (see https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html#using-savepoint).
  72. """
  73. task_name_to_report = (
  74. task_name # store this closure var somewhere else before we might assign to it
  75. )
  76. if task_name_to_report is None:
  77. task_name_to_report = task_function.__name__
  78. def wrap(*args, **kwargs):
  79. status: bool = True
  80. partial: bool = False
  81. try:
  82. task_function(*args, **kwargs)
  83. click.echo("[FLEXMEASURES] Task %s ran fine." % task_name_to_report)
  84. except Exception as e:
  85. exc_info = sys.exc_info()
  86. last_traceback = exc_info[2]
  87. click.echo(
  88. '[FLEXMEASURES] Task %s encountered a problem: "%s". More details: %s'
  89. % (task_name_to_report, str(e), get_err_source_info(last_traceback))
  90. )
  91. status = False
  92. if e.__class__ == PartialTaskCompletionException:
  93. partial = True
  94. finally:
  95. # make sure we roll back if there is no reason to commit
  96. if not (status is True or partial is True):
  97. db.session.rollback()
  98. # now save the status of the task
  99. db.session.begin_nested() # any failure here does not invalidate any task results we might commit
  100. try:
  101. LatestTaskRun.record_run(task_name_to_report, status)
  102. click.echo(
  103. "[FLEXMEASURES] Reported task %s status as %s"
  104. % (task_name_to_report, status)
  105. )
  106. db.session.commit()
  107. except Exception as e:
  108. click.echo(
  109. "[FLEXMEASURES] Could not report the running of task %s. Encountered the following problem: [%s]."
  110. " The task might have run fine." % (task_name_to_report, str(e))
  111. )
  112. db.session.rollback()
  113. # now the final commit
  114. db.session.commit()
  115. db.session.remove()
  116. return wrap