-
Notifications
You must be signed in to change notification settings - Fork 30
/
data_sources.py
69 lines (63 loc) · 2.27 KB
/
data_sources.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from __future__ import annotations
from flask import current_app
from flexmeasures import User
from flexmeasures.data import db
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.user import is_user
def get_or_create_source(
source: User | str,
source_type: str | None = None,
model: str | None = None,
version: str | None = None,
attributes: dict | None = None,
flush: bool = True,
) -> DataSource:
if is_user(source):
source_type = "user"
query = DataSource.query.filter(DataSource.type == source_type)
if model is not None:
query = query.filter(DataSource.model == model)
if version is not None:
query = query.filter(DataSource.version == version)
if attributes is not None:
query = query.filter(
DataSource.attributes_hash == DataSource.hash_attributes(attributes)
)
if is_user(source):
query = query.filter(DataSource.user == source)
elif isinstance(source, str):
query = query.filter(DataSource.name == source)
else:
raise TypeError("source should be of type User or str")
_source = query.one_or_none()
if not _source:
if is_user(source):
_source = DataSource(user=source, model=model, version=version)
else:
if source_type is None:
raise TypeError("Please specify a source type")
_source = DataSource(
name=source,
model=model,
version=version,
type=source_type,
attributes=attributes,
)
current_app.logger.info(f"Setting up {_source} as new data source...")
db.session.add(_source)
if flush:
# assigns id so that we can reference the new object in the current db session
db.session.flush()
return _source
def get_source_or_none(
source: int | str, source_type: str | None = None
) -> DataSource | None:
"""
:param source: source id
:param source_type: optionally, filter by source type
"""
query = DataSource.query
if source_type is not None:
query = query.filter(DataSource.type == source_type)
query = query.filter(DataSource.id == int(source))
return query.one_or_none()