/
users.py
141 lines (114 loc) · 4.47 KB
/
users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from functools import wraps
from flask import current_app, abort
from marshmallow import fields
from sqlalchemy.exc import IntegrityError
from webargs.flaskparser import use_args
from flask_security import current_user
from flask_security.recoverable import send_reset_password_instructions
from flask_json import as_json
from flexmeasures.data.models.user import User as UserModel
from flexmeasures.data.schemas.users import UserSchema
from flexmeasures.data.services.users import (
get_users,
set_random_password,
remove_cookie_and_token_access,
)
from flexmeasures.data.auth_setup import unauthorized_handler
from flexmeasures.api.common.responses import required_info_missing
from flexmeasures.data.config import db
"""
API endpoints to manage users.
Both POST (to create) and DELETE are not accessible via the API, but as CLI functions.
"""
user_schema = UserSchema()
users_schema = UserSchema(many=True)
@use_args({"include_inactive": fields.Bool(missing=False)}, location="query")
@as_json
def get(args):
"""List all users."""
users = get_users(only_active=not args["include_inactive"])
return users_schema.dump(users), 200
def load_user(admins_only: bool = False):
"""Decorator which loads a user by the Id expected in the path.
Raises 400 if that is not possible due to wrong parameters.
Raises 404 if user is not found.
Raises 403 if unauthorized:
Only the user themselves or admins can access a user object.
The admins_only parameter can be used if not even the user themselves
should be allowed.
@app.route('/user/<id>')
@check_user
def get_user(user):
return user_schema.dump(user), 200
The route must specify one parameter ― id.
"""
def wrapper(fn):
@wraps(fn)
@as_json
def decorated_endpoint(*args, **kwargs):
args = list(args)
if len(args) == 0:
current_app.logger.warning("Request missing id.")
return required_info_missing(["id"])
if len(args) > 1:
return (
dict(
status="UNEXPECTED_PARAMS",
message="Only expected one parameter (id).",
),
400,
)
try:
id = int(args[0])
except ValueError:
current_app.logger.warning("Cannot parse ID argument from request.")
return required_info_missing(["id"], "Cannot parse ID arg as int.")
user: UserModel = UserModel.query.filter_by(id=int(id)).one_or_none()
if user is None:
raise abort(404, f"User {id} not found")
if not current_user.has_role("admin"):
if admins_only or user != current_user:
return unauthorized_handler(None, [])
args = (user,)
return fn(*args, **kwargs)
return decorated_endpoint
return wrapper
@load_user()
@as_json
def fetch_one(user: UserModel):
"""Fetch a given user"""
return user_schema.dump(user), 200
@load_user()
@use_args(UserSchema(partial=True))
@as_json
def patch(db_user: UserModel, user_data: dict):
"""Update a user given its identifier"""
allowed_fields = ["email", "username", "active", "timezone", "flexmeasures_roles"]
for k, v in [(k, v) for k, v in user_data.items() if k in allowed_fields]:
# Don't allow users who edit themselves to edit sensitive fields
if current_user.id == db_user.id and k in ("active", "flexmeasures_roles"):
return unauthorized_handler(None, [])
setattr(db_user, k, v)
if k == "active" and v is False:
remove_cookie_and_token_access(db_user)
db.session.add(db_user)
try:
db.session.commit()
except IntegrityError as ie:
return dict(message="Duplicate user already exists", detail=ie._message()), 400
return user_schema.dump(db_user), 200
@load_user()
@as_json
def reset_password(user):
"""
Reset the user's current password, cookies and auth tokens.
Send a password reset link to the user.
"""
# Don't allow non-admins to reset passwords of other users
if current_user.id != user.id and not current_user.has_role("admin"):
return unauthorized_handler(None, [])
set_random_password(user)
remove_cookie_and_token_access(user)
send_reset_password_instructions(user)
# commit only if sending instructions worked, as well
db.session.commit()