-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
362 lines (318 loc) · 13 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
from flask import Flask, request
from sqlalchemy import func, and_
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.exc import SQLAlchemyError
from flask_jwt_extended import create_access_token,get_jwt_identity,jwt_required,JWTManager
from werkzeug.security import safe_str_cmp
import pandas as pd
from datetime import datetime
from db import Demand, Supply, Raw, Matches, UserLog, Auth, Contact, Locations, Volunteer
from db import get_session
from sqlalchemy.sql.expression import desc, nulls_last
from credentials import JWT_KEY
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = JWT_KEY
app.config["JWT_TOKEN_LOCATION"] = ["headers", "query_string"]
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = False
app.config['JWT_REFRESH_TOKEN_EXPIRES'] = False
app.config['APPLICATION_ROOT'] = "/core"
jwt = JWTManager(app)
def obj_to_dict(obj):
return dict([(k,v) for (k,v) in vars(obj).items() if not k.startswith("_")])
def search(table):
args = request.args.to_dict()
# special query parameters that are not table columns:
# verified_after, (updated) after, (updated) before, limit
verified_after = args.pop("verified_after", None)
after = args.pop("after", None)
before = args.pop("before", None)
# validate special fields
try:
# `pd.to_datetime(None)` returns `None` so this is okay
verified_after = pd.to_datetime(verified_after)
after = pd.to_datetime(after)
before = pd.to_datetime(before)
except:
return "Invalid datetime format"
limit = args.pop("limit", "")
if limit and not limit.isdigit():
return "limit must be an integer"
try:
# construct the query
results = []
with get_session() as session:
s = session.query(table).filter_by(**args)
# optional/special query parameters
if after:
s = s.filter(table.last_updated>=after)
if before:
s = s.filter(table.last_updated<before)
if verified_after:
s = s.filter(table.last_verified_on>=verified_after)
# sensible defaults for ordering: verified, last_verified_on, last_updated
if table == Supply:
s = s.order_by(nulls_last(desc(Supply.verified)), nulls_last(desc(Supply.last_verified_on)), nulls_last(desc(Supply.last_updated)))
# NOTE: limit has to be applied after ordering
if limit:
s = s.limit(limit)
# run query
results = s.all()
results = [obj_to_dict(result) for result in results]
return results
except (SQLAlchemyError, ValueError) as e:
return str(e)
def location_search():
args = request.args.to_dict()
limit = args.pop("limit", "")
if limit and not limit.isdigit():
return "limit must be an integer"
name = args.pop("name", "")
try:
# construct the query
results = []
with get_session() as session:
s = session.query(Locations).filter_by(**args)
# optional/special query parameters
if name:
s = s.filter(Locations.name.ilike(name))
# NOTE: limit has to be applied after ordering
if limit:
s = s.limit(limit)
# run query
results = s.all()
results = [obj_to_dict(result) for result in results]
return results
except (SQLAlchemyError, ValueError) as e:
return str(e)
def insert_or_update(table):
"""Checks whether there is an id. If there is an id, """
data = request.get_json()
if type(data) == list:
resp = []
for d in data:
identifier = d.pop('id', None)
if identifier:
# user is attempting to update a record
resp.append(update(table, d, identifier))
else:
# user is attempting to insert a record
resp.append(insert(table, d))
return resp
else:
data.pop('jwt', None)
identifier = data.pop('id', None)
if identifier:
# user is attempting to update a record
return update(table, data, identifier)
else:
# user is attempting to insert a record
return insert(table, data)
def insert(table, data):
source = data.pop("source", "").lower()
contact_lookup = {}
if source == "telegram":
tg_user_id = data.pop("tg_user_id", None)
tg_user_handle = data.pop("tg_user_handle", None)
if tg_user_id:
contact_lookup['tg_user_id'] = tg_user_id
if tg_user_handle:
contact_lookup['user_handle'] = tg_user_handle
# add source only if there's lookup data - otherwise skip
try:
results = []
with get_session() as session:
user = session.query(Auth).filter_by(id=get_jwt_identity()).first()
if not user:
return "Authentication failed"
if contact_lookup:
contact = session.query(Contact).filter_by(**contact_lookup).first()
if not contact:
return f"Contact not found: {contact_lookup}"
data.update(contact=contact)
if table == Demand:
data['group_handle'] = "-1001367739196"
record = table(**data)
## create User Log
# {foreign key column: id of updated record}
fk = {"{}_id".format(table.__name__.lower()): record.id}
# update info (username, timestamp)
log = {"username": user.username, "last_updated": datetime.now()}
# all update info
log.update(fk)
# find if last updated already exists for username, <table>_id
existing = session.query(UserLog).filter_by(username=user.username, **fk).first()
if existing:
session.query(UserLog).update(log)
else:
session.add(UserLog(**log))
session.commit()
## Insert new record into table
session.add(record)
session.commit()
session.refresh(record)
results = [obj_to_dict(record)]
return results
except (SQLAlchemyError, ValueError) as e:
return str(e)
def update(table, data, identifier):
try:
results = []
with get_session() as session:
user = session.query(Auth).filter_by(id=get_jwt_identity()).first()
query = session.query(table).filter_by(id=identifier)
record = query.first()
data.pop('jwt', None)
if record is None:
return "Could not find record"
## create User Log
# {foreign key column: id of updated record}
fk = {"{}_id".format(table.__name__.lower()): record.id}
# update info (username, timestamp)
log = {"username": user.username, "last_updated": datetime.now()}
# all update info
log.update(fk)
# find if last updated already exists for username, <table>_id
existing = session.query(UserLog).filter_by(username=user.username, **fk).first()
if existing:
session.query(UserLog).update(log)
else:
session.add(UserLog(**log))
session.commit()
session.flush()
## Update table with new data
query.update(data)
session.commit()
# fetch newly updated record
session.refresh(record)
results = [obj_to_dict(record)]
return results
except (SQLAlchemyError, ValueError) as e:
return str(e)
def find_matches():
args = request.args.to_dict()
source = args.get("source", "").lower()
contact = None
# if source is telegram, we need to find/create the user
if source == "telegram":
tg_user_id = args.get("tg_user_id", "")
tg_user_handle = args.get("tg_user_handle", "")
with get_session() as session:
# try to find telegram contact using tg_user_id
if tg_user_id:
contact = session.query(Contact).filter_by(tg_user_id=tg_user_id, source="telegram").first()
# try to find telegram contact using handle
if (not contact) and tg_user_handle:
contact = session.query(Contact).filter_by(user_handle=tg_user_handle, source="telegram").first()
# if still no contact found, create new contact
if not contact:
if not (tg_user_id or tg_user_handle):
return "Error: Not enough contact information"
contact = Contact(source="telegram", user_handle=tg_user_handle, tg_user_id=tg_user_id, bot_activated=True)
session.add(contact)
session.commit()
session.refresh(contact)
return []
if not contact:
return "Contact not found"
demands = session.query(Demand).filter_by(contact=contact).order_by(nulls_last(desc(Demand.datetime))).limit(100).all()
if not demands:
return "You have not submitted any requests. Please click on /find to submit a request"
new_matches = []
for demand in demands:
session.query(func.match_demand_to_new_supply(demand.id)).all()
matches = session.query(Matches).filter_by(demand=demand, sent=False).order_by(desc(Matches.created_on)).limit(10).all()
if not matches:
continue
supply_ids = [match.supply_id for match in matches]
supplies = session.query(Supply).filter(Supply.id.in_(supply_ids), Supply.phone.isnot(None)).all()
if not supplies:
return "No new results found"
key = ", ".join(filter(None, [demand.resource, demand.category, demand.location_raw, demand.phone]))
if not key:
return "Invalid request detected"
value = [": ".join(filter(None, [supply.title, supply.phone])) for supply in supplies]
new_matches.append(dict([(key, value)]))
for match in matches:
match.sent = True
session.bulk_save_objects(matches)
if new_matches:
return new_matches
else:
return "No new results found"
def generate_response(results, status_code=400):
"""Converts results (empty list, list of vars (incl. internal vars), or string) into a response-friendly object"""
# if result was a string, it's an error
if isinstance(results,str):
return {"error": results.replace("\"", "'")}, status_code
elif not isinstance(results, list):
return {"error": "Invalid data received from server", "data": results}, 503
# get all vars/children of the SQLAlchemy object
# this also contains internal Python variables, so we remove those
return {"data": results}
@jwt.user_identity_loader
def user_identity_lookup(user):
return user.id
@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
identity = jwt_data["sub"]
with get_session() as session:
return session.query(Auth).filter_by(id=identity).one_or_none()
@app.post("/login")
def login():
args = request.args.to_dict()
username = args.get("username")
password = args.get("password")
if not (username and password):
return {"error": "Please provide both Username and Password"}, 401
with get_session() as session:
user = session.query(Auth).filter_by(username=username).first()
if(user):
if(safe_str_cmp(user.password.encode('utf-8'),password.encode('utf-8'))):
access_token = create_access_token(identity=user)
return {"access_token": access_token}, 200
else:
return {"error": "Authentication failed"}, 401
else:
return {"error": "Authentication failed"}, 401
@app.get("/requests")
def get_demand():
results = search(Demand)
return generate_response(results)
@app.get("/supply")
def get_supply():
results = search(Supply)
return generate_response(results)
@app.get("/location")
def get_locations():
results = location_search()
return generate_response(results)
@app.get("/matches")
def get_matches():
results = find_matches()
return generate_response(results)
@app.get("/rawdata")
def get_raw():
results = search(Raw)
return generate_response(results)
@app.post("/requests")
@jwt_required()
def post_demand():
results = insert_or_update(Demand)
return generate_response(results)
@app.post("/supply")
@jwt_required()
def post_supply():
results = insert_or_update(Supply)
return generate_response(results)
@app.post("/volunteer")
@jwt_required()
def post_volunteer():
results = insert_or_update(Volunteer)
return generate_response(results)
@app.post("/rawdata")
@jwt_required()
def post_raw():
results = insert_or_update(Raw)
return generate_response(results)
if __name__=="__main__":
app.run(debug=True)