/
users.py
212 lines (173 loc) · 7.37 KB
/
users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from typing import Dict, List, Union, Optional
import random
import string
from flask import current_app
from flask_security import current_user, SQLAlchemySessionUserDatastore
from flask_security.recoverable import update_password
from email_validator import (
validate_email,
validate_email_deliverability,
EmailNotValidError,
EmailUndeliverableError,
)
from flask_security.utils import hash_password
from werkzeug.exceptions import NotFound
from flexmeasures.data import db
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.user import User, Role, Account
class InvalidFlexMeasuresUser(Exception):
pass
def get_user(id: str) -> User:
"""Get a user, raise if not found."""
user: User = User.query.filter_by(id=int(id)).one_or_none()
if user is None:
raise NotFound
return user
def get_users(
account_name: Optional[str] = None,
role_name: Optional[str] = None,
account_role_name: Optional[str] = None,
only_active: bool = True,
) -> List[User]:
"""Return a list of User objects.
The role_name parameter allows to filter by role.
Set only_active to False if you also want non-active users.
"""
user_query = User.query
if account_name is not None:
account = Account.query.filter(Account.name == account_name).one_or_none()
if not account:
raise NotFound(f"There is no account named {account_name}!")
user_query = user_query.filter(User.account == account)
if only_active:
user_query = user_query.filter(User.active.is_(True))
if role_name is not None:
role = Role.query.filter(Role.name == role_name).one_or_none()
if role:
user_query = user_query.filter(User.flexmeasures_roles.contains(role))
users = user_query.all()
if account_role_name is not None:
users = [u for u in users if u.account.has_role(account_role_name)]
return users
def find_user_by_email(user_email: str, keep_in_session: bool = True) -> User:
user_datastore = SQLAlchemySessionUserDatastore(db.session, User, Role)
user = user_datastore.find_user(email=user_email)
if not keep_in_session:
# we might need this object persistent across requests
db.session.expunge(user)
return user
def create_user( # noqa: C901
password: str = None,
user_roles: Union[Dict[str, str], List[Dict[str, str]], str, List[str]] = None,
check_email_deliverability: bool = True,
account_name: Optional[str] = None,
**kwargs,
) -> User:
"""
Convenience wrapper to create a new User object.
It hashes the password.
In addition to the user, this function can create
- new Role objects (if user roles do not already exist)
- an Account object (if it does not exist yet)
- a new DataSource object that corresponds to the user
Remember to commit the session after calling this function!
"""
# Check necessary input explicitly before anything happens
if password is None or password == "":
raise InvalidFlexMeasuresUser("No password provided.")
if "email" not in kwargs:
raise InvalidFlexMeasuresUser("No email address provided.")
email = kwargs.pop("email").strip()
try:
email_info = validate_email(email, check_deliverability=False)
# The mx check talks to the SMTP server. During testing, we skip it because it
# takes a bit of time and without internet connection it fails.
if check_email_deliverability and not current_app.testing:
try:
validate_email_deliverability(
email_info.domain, email_info["domain_i18n"]
)
except EmailUndeliverableError as eue:
raise InvalidFlexMeasuresUser(
"The email address %s does not seem to be deliverable: %s"
% (email, str(eue))
)
except EmailNotValidError as enve:
raise InvalidFlexMeasuresUser(
"%s is not a valid email address: %s" % (email, str(enve))
)
if "username" not in kwargs:
username = email.split("@")[0]
else:
username = kwargs.pop("username").strip()
# Check integrity explicitly before anything happens
existing_user_by_email = User.query.filter_by(email=email).one_or_none()
if existing_user_by_email is not None:
raise InvalidFlexMeasuresUser("User with email %s already exists." % email)
existing_user_by_username = User.query.filter_by(username=username).one_or_none()
if existing_user_by_username is not None:
raise InvalidFlexMeasuresUser(
"User with username %s already exists." % username
)
# check if we can link/create an account
if account_name is None:
raise InvalidFlexMeasuresUser(
"Cannot create user without knowing the name of the account which this user is associated with."
)
account = db.session.query(Account).filter_by(name=account_name).one_or_none()
if account is None:
print(f"Creating account {account_name} ...")
account = Account(name=account_name)
db.session.add(account)
user_datastore = SQLAlchemySessionUserDatastore(db.session, User, Role)
kwargs.update(password=hash_password(password), email=email, username=username)
user = user_datastore.create_user(**kwargs)
user.account = account
# add roles to user (creating new roles if necessary)
if user_roles:
if not isinstance(user_roles, list):
user_roles = [user_roles] # type: ignore
for user_role in user_roles:
if isinstance(user_role, dict):
role = user_datastore.find_role(user_role["name"])
else:
role = user_datastore.find_role(user_role)
if role is None:
if isinstance(user_role, dict):
role = user_datastore.create_role(**user_role)
else:
role = user_datastore.create_role(name=user_role)
user_datastore.add_role_to_user(user, role)
# create data source
db.session.add(DataSource(user=user))
return user
def set_random_password(user: User):
"""
Randomise a user's password.
Remember to commit the session after calling this function!
"""
new_random_password = "".join(
[random.choice(string.ascii_lowercase) for _ in range(24)]
)
update_password(user, new_random_password)
def remove_cookie_and_token_access(user: User):
"""
Remove access of current cookies and auth tokens for a user.
This might be useful if you feel their password, cookie or tokens
are compromised. in the former case, you can also call `set_random_password`.
Remember to commit the session after calling this function!
"""
user_datastore = SQLAlchemySessionUserDatastore(db.session, User, Role)
user_datastore.reset_user_access(user)
def delete_user(user: User):
"""
Delete the user (and also his assets and power measurements!).
Deleting oneself is not allowed.
Remember to commit the session after calling this function!
"""
if hasattr(current_user, "id") and user.id == current_user.id:
raise Exception("You cannot delete yourself.")
user_datastore = SQLAlchemySessionUserDatastore(db.session, User, Role)
user_datastore.delete_user(user)
db.session.delete(user)
current_app.logger.info("Deleted %s." % user)