/
app_utils.py
175 lines (147 loc) · 5.78 KB
/
app_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from typing import Union, Tuple, List, Optional
import os
import sys
import click
from flask import Flask, current_app, redirect
from flask.cli import FlaskGroup, with_appcontext
from flask_security import current_user
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.rq import RqIntegration
from pkg_resources import get_distribution
from flexmeasures.app import create as create_app
@click.group(cls=FlaskGroup, create_app=create_app)
@with_appcontext
def flexmeasures_cli():
"""
Management scripts for the FlexMeasures platform.
"""
# We use @app_context above, so things from the app setup are initialised
# only once! This is crucial for Sentry, for example.
pass
def init_sentry(app: Flask):
"""
Configure Sentry.
We need the app to read the Sentry DSN from configuration, and also
to send some additional meta information.
"""
sentry_dsn = app.config.get("SENTRY_DSN")
if not sentry_dsn:
app.logger.info(
"[FLEXMEASURES] No SENTRY_DSN setting found, so initialising Sentry cannot happen ..."
)
return
app.logger.info("[FLEXMEASURES] Initialising Sentry ...")
sentry_sdk.init(
dsn=sentry_dsn,
integrations=[FlaskIntegration(), RqIntegration()],
debug=app.debug,
release=f"flexmeasures@{get_distribution('flexmeasures').version}",
send_default_pii=True, # user data (current user id, email address, username) is attached to the event.
environment=app.env,
**app.config["FLEXMEASURES_SENTRY_CONFIG"],
)
sentry_sdk.set_tag("mode", app.config.get("FLEXMEASURES_MODE"))
sentry_sdk.set_tag("platform-name", app.config.get("FLEXMEASURES_PLATFORM_NAME"))
def set_secret_key(app, filename="secret_key"):
"""Set the SECRET_KEY or exit.
We first check if it is already in the config.
Then we look for it in environment var SECRET_KEY.
Finally, we look for `filename` in the app's instance directory.
If nothing is found, we print instructions
to create the secret and then exit.
"""
secret_key = app.config.get("SECRET_KEY", None)
if secret_key is not None:
return
secret_key = os.environ.get("SECRET_KEY", None)
if secret_key is not None:
app.config["SECRET_KEY"] = secret_key
return
filename = os.path.join(app.instance_path, filename)
try:
app.config["SECRET_KEY"] = open(filename, "rb").read()
except IOError:
app.logger.error(
"""
Error: No secret key set.
You can add the SECRET_KEY setting to your conf file (this example works only on Unix):
echo "SECRET_KEY=\\"`head -c 24 /dev/urandom`\\"" >> your-flexmeasures.cfg
OR you can add an env var:
export SECRET_KEY=xxxxxxxxxxxxxxx
(on windows, use "set" instead of "export")
OR you can create a secret key file (this example works only on Unix):
mkdir -p %s
head -c 24 /dev/urandom > %s
You can also use Python to create a good secret:
python -c "import secrets; print(secrets.token_urlsafe())"
"""
% (os.path.dirname(filename), filename)
)
sys.exit(2)
def root_dispatcher():
"""
Re-routes to root views fitting for the current user,
depending on the FLEXMEASURES_ROOT_VIEW setting.
"""
default_root_view = "/dashboard"
root_view = default_root_view
configs = current_app.config.get("FLEXMEASURES_ROOT_VIEW", [])
root_view = find_first_applicable_config_entry(configs, "FLEXMEASURES_ROOT_VIEW")
if root_view in ("", "/", None):
root_view = default_root_view
if not root_view.startswith("/"):
root_view = f"/{root_view}"
current_app.logger.info(f"Redirecting root view to {root_view} ...")
return redirect(root_view)
def find_first_applicable_config_entry(
configs: list, setting_name: str, app: Optional[Flask] = None
) -> Optional[str]:
if app is None:
app = current_app
if isinstance(configs, str):
configs = [configs] # ignore: type
for config in configs:
entry = parse_config_entry_by_account_roles(config, setting_name, app)
if entry is not None:
return entry
return None
def parse_config_entry_by_account_roles(
config: Union[str, Tuple[str, List[str]]],
setting_name: str,
app: Optional[Flask] = None,
) -> Optional[str]:
"""
Parse a config entry (which can be a string, e.g. "dashboard" or a tuple, e.g. ("dashboard", ["MDC"])).
In the latter case, return the first item (a string) only if the current user's account roles match with the
list of roles in the second item. Otherwise return None.
"""
if app is None:
app = current_app
if isinstance(config, str):
return config
elif isinstance(config, tuple) and len(config) == 2:
entry, account_role_names = config
if not isinstance(entry, str):
app.logger.warning(
f"View name setting '{entry}' in {setting_name} is not a string. Ignoring ..."
)
return None
if not isinstance(account_role_names, list):
app.logger.warning(
f"Role names setting '{account_role_names}' in {setting_name} is not a list. Ignoring ..."
)
return None
if not hasattr(current_user, "account"):
# e.g. AnonymousUser
return None
for account_role_name in account_role_names:
if account_role_name in [
role.name for role in current_user.account.account_roles
]:
return entry
else:
app.logger.warning(
f"Setting '{config}' in {setting_name} is neither a string nor two-part tuple. Ignoring ..."
)
return None