/
transactional.py
144 lines (124 loc) · 5.77 KB
/
transactional.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
These, and only these, functions should help you with treating your own code
in the context of one database transaction. Which makes our lives easier.
"""
import sys
from datetime import datetime
from typing import Optional
import pytz
import click
from flask import current_app
from flask_sqlalchemy import SQLAlchemy
from flexmeasures.data.config import db
from flexmeasures.utils.error_utils import get_err_source_info
from flexmeasures.utils.coding_utils import optional_arg_decorator
from flexmeasures.data.models.task_runs import LatestTaskRun
def as_transaction(db_function):
"""Decorator for handling any function which contains SQLAlchemy commands as one database transaction (ACID).
Calls db operation function and when it is done, commits the db session.
Rolls back the session if anything goes wrong.
If useful, the first argument can be the db (SQLAlchemy) object and the rest of the args
are sent through to the function. If this happened, the session is closed at the end.
"""
def wrap(*args, **kwargs):
close_session = False
# negotiate which db object to use
db_obj_passed = len(args) > 0 and isinstance(args[0], SQLAlchemy)
if db_obj_passed:
the_db = args[0]
close_session = True
else:
the_db = db
# run actual function, handle any exceptions and re-raise
try:
db_function(*args, **kwargs)
the_db.session.commit()
except Exception as e:
current_app.logger.error(
"[%s] Encountered Problem: %s" % (db_function.__name__, str(e))
)
the_db.session.rollback()
raise
finally:
if close_session:
the_db.session.close()
return wrap
def after_request_exception_rollback_session(exception):
"""
Central place to handle transactions finally.
So - usually your view code should not have to deal with
rolling back.
Our policy *is* that we don't auto-commit (we used to do that here).
Some more reading is e.g. here https://github.com/pallets/flask-sqlalchemy/issues/216
Register this on your app via the teardown_request setup method.
We roll back the session if there was any error (which only has an effect if
the session has not yet been committed).
Flask-SQLAlchemy is closing the scoped sessions automatically."""
if exception is not None:
db.session.rollback()
return
class PartialTaskCompletionException(Exception):
"""By raising this Exception in a task, no rollback will happen even if not everything was successful
and the data which was generated will get committed. The task status will still be False, so the non-successful
parts can be inspected."""
pass
@optional_arg_decorator
def task_with_status_report(task_function, task_name: Optional[str] = None):
"""Decorator for tasks which should report their runtime and status in the db (as LatestTaskRun entries).
Tasks decorated with this endpoint should also leave committing or rolling back the session to this
decorator (for the reasons that it is nice to centralise that but also practically, this decorator
still needs to add to the session).
If the task wants to commit partial results, and at the same time report that some things did not run well,
it can raise a PartialTaskCompletionException and we recommend to use save-points (db.session.being_nested) to
do partial rollbacks (see https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html#using-savepoint)."""
task_name_to_report = (
task_name # store this closure var somewhere else before we might assign to it
)
if task_name_to_report is None:
task_name_to_report = task_function.__name__
def wrap(*args, **kwargs):
status: bool = True
partial: bool = False
try:
task_function(*args, **kwargs)
click.echo("[FLEXMEASURES] Task %s ran fine." % task_name_to_report)
except Exception as e:
exc_info = sys.exc_info()
last_traceback = exc_info[2]
click.echo(
'[FLEXMEASURES] Task %s encountered a problem: "%s". More details: %s'
% (task_name_to_report, str(e), get_err_source_info(last_traceback))
)
status = False
if e.__class__ == PartialTaskCompletionException:
partial = True
finally:
# make sure we roll back if there is no reason to commit
if not (status is True or partial is True):
db.session.rollback()
# now save the status of the task
db.session.begin_nested() # any failure here does not invalidate any task results we might commit
try:
task_run = LatestTaskRun.query.filter(
LatestTaskRun.name == task_name_to_report
).one_or_none()
if task_run is None:
task_run = LatestTaskRun(name=task_name_to_report)
db.session.add(task_run)
task_run.datetime = datetime.utcnow().replace(tzinfo=pytz.utc)
task_run.status = status
click.echo(
"[FLEXMEASURES] Reported task %s status as %s"
% (task_name_to_report, status)
)
db.session.commit()
except Exception as e:
click.echo(
"[FLEXMEASURES] Could not report the running of task %s. Encountered the following problem: [%s]."
" The task might have run fine." % (task_name_to_report, str(e))
)
db.session.rollback()
# now the final commit
db.session.commit()
db.session.remove()
return wrap