-
Notifications
You must be signed in to change notification settings - Fork 2
/
db.py
285 lines (260 loc) · 7.89 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
""" Methods for interacting with the puzzbost REST api and SQL database """
import aiohttp
import discord
import logging
import pymysql
import re
from config import config
from discord_info import is_puzzle_channel
from munch import munchify
class REST:
@staticmethod
async def post(path, data=None):
url = config.puzzledb["rest_url"] + path
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
logging.info(
"POST to {} ; Data = {} ; Response status = {}".format(
path, data, response.status
)
)
return response
class SQL:
# Class variable to store connection once established
connection = None
@staticmethod
def _get_db_connection():
if SQL.connection:
SQL.connection.ping(reconnect=True)
return SQL.connection
logging.info("[SQL] No connection found, creating new connection")
creds = config.puzzledb
connection = pymysql.connect(
host=creds["host"],
port=creds["port"],
user=creds["user"].lower(),
password=creds["passwd"],
db=creds["db"],
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
SQL.connection = connection
return connection
@staticmethod
def select_one(query: str, args=None):
assert query.split()[0].upper() == "SELECT", "SELECT only!"
connection = SQL._get_db_connection()
with connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()
@staticmethod
def select_all(query: str, args=None):
assert query.split()[0].upper() == "SELECT", "SELECT only!"
connection = SQL._get_db_connection()
with connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchall()
@staticmethod
def update(ctx, query: str, args=None):
assert query.split()[0].upper() == "UPDATE", "UPDATE only!"
connection = SQL._get_db_connection()
with connection.cursor() as cursor:
cursor.execute(query, args)
logging.info("{0.command}: Committing row".format(ctx))
connection.commit()
logging.info("{0.command}: Committed row successfully!".format(ctx))
@staticmethod
def get_hunt_config():
rows = SQL.select_all("SELECT `key`, val FROM config")
hunt_config = munchify(dict(row.values() for row in rows))
return hunt_config or config.hunt_config
@staticmethod
def get_puzzle_for_channel(channel):
rows = SQL.get_puzzles_for_channels([channel])
return rows[channel.id] if channel.id in rows else None
@staticmethod
def get_puzzles_for_channels(channels):
rows = SQL.select_all(
"""
SELECT
id,
name,
roundname AS round_name,
puzzle_uri,
drive_id,
drive_uri,
chat_channel_id AS channel_id,
chat_channel_link,
status,
answer,
xyzloc,
comments
FROM puzzle_view
WHERE chat_channel_id IN ({})
""".format(
",".join(["%s"] * len(channels))
),
tuple([c.id for c in channels]),
)
return {int(row["channel_id"]): row for row in rows}
@staticmethod
def get_puzzle_for_channel_fuzzy(ctx, channel_or_query):
if not channel_or_query:
if not is_puzzle_channel(ctx.channel):
return None
return SQL.get_puzzle_for_channel(ctx.channel)
if isinstance(channel_or_query, discord.TextChannel):
channel = channel_or_query
return SQL.get_puzzle_for_channel(channel)
query = channel_or_query
try:
regex = re.compile(query, re.IGNORECASE)
except Exception:
regex = re.compile(r"^$")
query = query.replace(" ", "").lower()
def puzzle_matches(name):
if not name:
return False
if query in name.lower():
return True
return regex.search(name) is not None
puzzles = SQL.select_all(
"""
SELECT
id,
name,
roundname AS round_name,
puzzle_uri,
drive_uri,
chat_channel_id AS channel_id,
status,
answer,
xyzloc,
comments,
cursolvers
FROM puzzle_view
""",
)
return next(
(puzzle for puzzle in puzzles if puzzle_matches(puzzle["name"])),
None,
)
@staticmethod
def get_solved_round_names():
rows = SQL.select_all(
"""
SELECT
id,
name
FROM round_view
WHERE round_uri LIKE '%%#solved'
AND name <> "mistakes"
""",
)
return [row["name"] for row in rows]
@staticmethod
def get_meta_ids():
rows = SQL.select_all(
"""
SELECT
id,
meta_id
FROM round_view
WHERE name <> "mistakes"
""",
)
return [row["meta_id"] for row in rows]
@staticmethod
def get_all_puzzles():
return SQL.select_all(
"""
SELECT
id,
name,
roundname AS round_name,
puzzle_uri,
drive_uri,
chat_channel_id AS channel_id,
status,
answer,
xyzloc,
comments
FROM puzzle_view
WHERE
roundname <> "mistakes"
AND status <> "[hidden]"
""",
)
@staticmethod
def get_hipri_puzzles():
return SQL.select_all(
"""
SELECT
id,
name,
roundname AS round_name,
chat_channel_id AS channel_id,
status,
xyzloc,
comments
FROM puzzle_view
WHERE roundname <> "mistakes"
AND status <> "[hidden]"
AND status IN ("Critical", "Needs eyes", "WTF")
ORDER BY status, id
""",
)
@staticmethod
def get_puzzles_at_table(table):
return SQL.select_all(
"""
SELECT
id,
name,
status,
chat_channel_id AS channel_id
FROM puzzle_view
WHERE xyzloc LIKE %s
""",
(table.name,),
)
@staticmethod
def get_solver_from_member(member):
row = SQL.select_one(
"""
SELECT
id,
name
FROM solver_view
WHERE chat_uid = %s
LIMIT 1
""",
(str(member.id),),
)
return row if row else None
@staticmethod
def get_all_solvers():
return SQL.select_all(
"""
SELECT
id AS solver_id,
name,
fullname,
chat_uid AS discord_id,
chat_name AS discord_name
FROM solver_view
ORDER BY name
""",
)
@staticmethod
def get_solver_ids_since(time):
rows = SQL.select_all(
"""
SELECT
DISTINCT solver_id
FROM activity
WHERE time > %s
""",
time,
)
return [row["solver_id"] for row in rows]