/
utils.py
140 lines (114 loc) · 5.58 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from __future__ import annotations
from flask import current_app
from timely_beliefs import BeliefsDataFrame, BeliefsSeries
from flexmeasures.data import db
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.data.services.time_series import drop_unchanged_beliefs
def save_to_session(objects: list[db.Model], overwrite: bool = False):
"""Utility function to save to database, either efficiently with a bulk save, or inefficiently with a merge save."""
if not overwrite:
db.session.bulk_save_objects(objects)
else:
for o in objects:
db.session.merge(o)
def get_data_source(
data_source_name: str,
data_source_model: str | None = None,
data_source_version: str | None = None,
data_source_type: str = "script",
) -> DataSource:
"""Make sure we have a data source. Create one if it doesn't exist, and add to session.
Meant for scripts that may run for the first time.
"""
data_source = DataSource.query.filter_by(
name=data_source_name,
model=data_source_model,
version=data_source_version,
type=data_source_type,
).one_or_none()
if data_source is None:
data_source = DataSource(
name=data_source_name,
model=data_source_model,
version=data_source_version,
type=data_source_type,
)
db.session.add(data_source)
db.session.flush() # populate the primary key attributes (like id) without committing the transaction
current_app.logger.info(
f'Session updated with new {data_source_type} data source "{data_source.__repr__()}".'
)
return data_source
def save_to_db(
data: BeliefsDataFrame | BeliefsSeries | list[BeliefsDataFrame | BeliefsSeries],
bulk_save_objects: bool = False,
save_changed_beliefs_only: bool = True,
) -> str:
"""Save the timed beliefs to the database.
Note: This function does not commit. It does, however, flush the session. Best to keep transactions short.
We make the distinction between updating beliefs and replacing beliefs.
# Updating beliefs
An updated belief is a belief from the same source as some already saved belief, and about the same event,
but with a later belief time. If it has a different event value, then it represents a changed belief.
Note that it is possible to explicitly record unchanged beliefs (i.e. updated beliefs with a later belief time,
but with the same event value), by setting save_changed_beliefs_only to False.
# Replacing beliefs
A replaced belief is a belief from the same source as some already saved belief,
and about the same event and with the same belief time, but with a different event value.
Replacing beliefs is not allowed, because messing with the history corrupts data lineage.
Corrections should instead be recorded as updated beliefs.
Servers in 'play' mode are exempt from this rule, to facilitate replaying simulations.
:param data: BeliefsDataFrame (or a list thereof) to be saved
:param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(),
which is quite fast but has several caveats, see:
https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats
:param save_changed_beliefs_only: if True, unchanged beliefs are skipped (updated beliefs are only stored if they represent changed beliefs)
if False, all updated beliefs are stored
:returns: status string, one of the following:
- 'success': all beliefs were saved
- 'success_with_unchanged_beliefs_skipped': not all beliefs represented a state change
- 'success_but_nothing_new': no beliefs represented a state change
"""
# Convert to list
if not isinstance(data, list):
timed_values_list = [data]
else:
timed_values_list = data
status = "success"
values_saved = 0
for timed_values in timed_values_list:
if timed_values.empty:
# Nothing to save
continue
# Convert series to frame if needed
if isinstance(timed_values, BeliefsSeries):
timed_values = timed_values.rename("event_value").to_frame()
len_before = len(timed_values)
if save_changed_beliefs_only:
# Drop beliefs that haven't changed
timed_values = drop_unchanged_beliefs(timed_values)
len_after = len(timed_values)
if len_after < len_before:
status = "success_with_unchanged_beliefs_skipped"
# Work around bug in which groupby still introduces an index level, even though we asked it not to
if None in timed_values.index.names:
timed_values.index = timed_values.index.droplevel(None)
if timed_values.empty:
# No state changes among the beliefs
continue
current_app.logger.info("SAVING TO DB...")
TimedBelief.add_to_session(
session=db.session,
beliefs_data_frame=timed_values,
bulk_save_objects=bulk_save_objects,
allow_overwrite=current_app.config.get(
"FLEXMEASURES_ALLOW_DATA_OVERWRITE", False
),
)
values_saved += len(timed_values)
# Flush to bring up potential unique violations (due to attempting to replace beliefs)
db.session.flush()
if values_saved == 0:
status = "success_but_nothing_new"
return status