123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- """
- These, and only these, functions should help you with treating your own code
- in the context of one database transaction. Which makes our lives easier.
- """
- from __future__ import annotations
- import sys
- import click
- from flask import current_app
- from flask_sqlalchemy import SQLAlchemy
- from flexmeasures.data import db
- from flexmeasures.utils.error_utils import get_err_source_info
- from flexmeasures.utils.coding_utils import optional_arg_decorator
- from flexmeasures.data.models.task_runs import LatestTaskRun
- def as_transaction(db_function):
- """Decorator for handling any function which contains SQLAlchemy commands as one database transaction (ACID).
- Calls db operation function and when it is done, commits the db session.
- Rolls back the session if anything goes wrong.
- If useful, the first argument can be the db (SQLAlchemy) object and the rest of the args
- are sent through to the function. If this happened, the session is closed at the end.
- """
- def wrap(*args, **kwargs):
- close_session = False
- # negotiate which db object to use
- db_obj_passed = len(args) > 0 and isinstance(args[0], SQLAlchemy)
- if db_obj_passed:
- the_db = args[0]
- close_session = True
- else:
- the_db = db
- # run actual function, handle any exceptions and re-raise
- try:
- db_function(*args, **kwargs)
- the_db.session.commit()
- except Exception as e:
- current_app.logger.error(
- "[%s] Encountered Problem: %s" % (db_function.__name__, str(e))
- )
- the_db.session.rollback()
- raise
- finally:
- if close_session:
- the_db.session.close()
- return wrap
- def after_request_exception_rollback_session(exception):
- """
- Central place to handle transactions finally.
- So - usually your view code should not have to deal with
- rolling back.
- Our policy *is* that we don't auto-commit (we used to do that here).
- Some more reading is e.g. here https://github.com/pallets/flask-sqlalchemy/issues/216
- Register this on your app via the teardown_request setup method.
- We roll back the session if there was any error (which only has an effect if
- the session has not yet been committed).
- Flask-SQLAlchemy is closing the scoped sessions automatically."""
- if exception is not None:
- db.session.rollback()
- return
- class PartialTaskCompletionException(Exception):
- """By raising this Exception in a task, no rollback will happen even if not everything was successful
- and the data which was generated will get committed. The task status will still be False, so the non-successful
- parts can be inspected."""
- pass
- @optional_arg_decorator
- def task_with_status_report(task_function, task_name: str | None = None):
- """Decorator for tasks which should report their runtime and status in the db (as LatestTaskRun entries).
- Tasks decorated with this endpoint should also leave committing or rolling back the session to this
- decorator (for the reasons that it is nice to centralise that but also practically, this decorator
- still needs to add to the session).
- If the task wants to commit partial results, and at the same time report that some things did not run well,
- it can raise a PartialTaskCompletionException and we recommend to use save-points (db.session.being_nested) to
- do partial rollbacks (see https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html#using-savepoint).
- """
- task_name_to_report = (
- task_name # store this closure var somewhere else before we might assign to it
- )
- if task_name_to_report is None:
- task_name_to_report = task_function.__name__
- def wrap(*args, **kwargs):
- status: bool = True
- partial: bool = False
- try:
- task_function(*args, **kwargs)
- click.echo("[FLEXMEASURES] Task %s ran fine." % task_name_to_report)
- except Exception as e:
- exc_info = sys.exc_info()
- last_traceback = exc_info[2]
- click.echo(
- '[FLEXMEASURES] Task %s encountered a problem: "%s". More details: %s'
- % (task_name_to_report, str(e), get_err_source_info(last_traceback))
- )
- status = False
- if e.__class__ == PartialTaskCompletionException:
- partial = True
- finally:
- # make sure we roll back if there is no reason to commit
- if not (status is True or partial is True):
- db.session.rollback()
- # now save the status of the task
- db.session.begin_nested() # any failure here does not invalidate any task results we might commit
- try:
- LatestTaskRun.record_run(task_name_to_report, status)
- click.echo(
- "[FLEXMEASURES] Reported task %s status as %s"
- % (task_name_to_report, status)
- )
- db.session.commit()
- except Exception as e:
- click.echo(
- "[FLEXMEASURES] Could not report the running of task %s. Encountered the following problem: [%s]."
- " The task might have run fine." % (task_name_to_report, str(e))
- )
- db.session.rollback()
- # now the final commit
- db.session.commit()
- db.session.remove()
- return wrap
|