/
bitchan_daemon.py
4672 lines (4157 loc) · 219 KB
/
bitchan_daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import base64
import datetime
import fileinput
import grp
import hashlib
import html
import json
import logging
import os
import pwd
import random
import shutil
import sqlite3
import subprocess
import sys
import threading
import time
from binascii import hexlify
from collections import OrderedDict
from logging import handlers
from threading import Thread
import bleach
import gnupg
import ntplib
import qbittorrentapi
from Pyro5.api import expose
from Pyro5.api import serve
from daemonize import Daemonize
from sqlalchemy import and_
from sqlalchemy import func
from sqlalchemy import or_
from stem import Signal
from stem.control import Controller
import config
from database.models import AddressBook
from database.models import AdminMessageStore
from database.models import Captcha
from database.models import Chan
from database.models import Command
from database.models import DeletedMessages
from database.models import EndpointCount
from database.models import Games
from database.models import GlobalSettings
from database.models import Identity
from database.models import Messages
from database.models import PostCards
from database.models import PostDeletePasswordHashes
from database.models import SessionInfo
from database.models import Threads
from database.models import UploadProgress
from database.models import UploadTorrents
from database.utils import session_scope
from utils.database import get_db_table_daemon
from utils.download import allow_download
from utils.download import check_banned_file_hashes
from utils.download import generate_hash
from utils.download import validate_file
from utils.encryption_decrypt import decrypt_safe_size
from utils.files import LF
from utils.files import data_file_multiple_insert
from utils.files import delete_file
from utils.files import human_readable_size
from utils.game import initialize_game
from utils.gateway import api
from utils.gateway import chan_auto_clears_and_message_too_old
from utils.gateway import get_msg_address_from
from utils.gateway import log_age_and_expiration
from utils.gateway import timeout
from utils.general import get_random_alphanumeric_string
from utils.general import process_passphrase
from utils.general import set_clear_time_to_future
from utils.general import version_checker
from utils.message_admin_command import send_commands
from utils.parse_message import decrypt_and_process_attachments
from utils.parse_message import parse_message
from utils.parse_message import process_admin
from utils.posts import delete_post
from utils.replacements import replace_dict_keys_with_values
from utils.replacements import replace_lt_gt
from utils.shared import add_mod_log_entry
from utils.shared import diff_list_added_removed
from utils.shared import get_access
from utils.shared import get_msg_expires_time
from utils.shared import get_post_id
from utils.shared import regenerate_card_popup_post_html
from utils.tor import enable_custom_address
from utils.tor import enable_random_address
class BitChan:
def __init__(self):
self.logger = logging.getLogger('bitchan.daemon')
self.running = True
self.view_counter = {}
self.reset_view_counter()
self._non_bitchan_message_ids = []
self._all_chans = {}
self._address_book_dict = {}
self._identity_dict = {}
self._subscription_dict = {}
self._refresh = True
self._refresh_identities = False
self._refresh_address_book = True
# periodically-updated dictionaries
self.chans_boards_info = {}
self.maintenance_mode = False
self.last_post_ts = 0
self.message_threads = {}
self.utc_offset = None
self.time_last = 0
self.is_restarting_bitmessage = False
self.list_stats = []
self.first_run = True
self.update_ntp = False
self.update_post_numbers = False
# Bitmessage
self.api_status = None
self.bm_onion_address = None
self.bm_connected = False
self.bm_connected_timer = None
self.bm_sync_complete = False
self.bm_pending_download = True
self.bm_pending_download_timer = None
self.bm_number_messages_processed_last = 0
# Timers
now = time.time()
self.timer_check_bm_alive = now
self.timer_time_server = now
self.timer_board_info = now
self.timer_check_downloads = now
self.timer_addresses = now
self.timer_queue_messages = now
self.timer_stats = now
self.timer_clear_inventory = now
self.timer_message_threads = now
self.timer_clear_uploads = now
self.timer_unread_mail = now
self.timer_non_bitchan_message_ids = now
self.timer_safe_send = now
self.timer_update_post_numbers = now
self.timer_new_tor_identity = now + random.randint(10800, 28800)
self.timer_check_onion_address = now
self.timer_check_torrents = now
self.timer_remove_old_torrents = now
self.timer_delete_and_vacuum = now + (60 * 60) # 1 hour
self.timer_check_locked_threads = now + (60 * 20) # 20 minutes
self.timer_delete_msgs = now + (60 * 10) # 10 minutes
self.timer_delete_captchas = now + (60 * 10) # 10 minutes
self.timer_get_msg_expires_time = now + (60 * 10) # 10 minutes
self.timer_remove_deleted_msgs = now + (60 * 10) # 10 minutes
self.timer_send_lists = now + (60 * 5) # 5 minutes
self.timer_send_commands = now + (60 * 5) # 5 minutes
self.timer_clear_session_info = now + (60 * 5) # 5 minutes
self.timer_wipe = now + 120 # 2 minutes
self.timer_sync = now + (60 * 2) # 2 minutes
self.timer_game = now + (60 * 1) # 1 minutes
# Start timer at next top of the hour
t = datetime.datetime.now()
epoch_next_hour = t.replace(second=0, microsecond=0, minute=0, hour=t.hour) + datetime.timedelta(hours=1)
self.timer_top_of_hour = int(epoch_next_hour.strftime('%s'))
# Net disable settings
self.allow_net_file_size_check = False
self.allow_net_ntp = False
def refresh_settings(self):
with session_scope(config.DB_PATH) as new_session:
settings = new_session.query(GlobalSettings).first()
try:
self._non_bitchan_message_ids = json.loads(settings.discard_message_ids)
except:
self._non_bitchan_message_ids = "[]"
self.allow_net_file_size_check = settings.allow_net_file_size_check
self.allow_net_ntp = settings.allow_net_ntp
@staticmethod
def reset_all_auto_downloads():
with session_scope(config.DB_PATH) as new_session:
messages = new_session.query(Messages).filter(
Messages.start_download.is_(True),
Messages.file_currently_downloading.is_(True)).all()
for msg in messages:
msg.file_currently_downloading = False
new_session.commit()
def run(self):
self.logger.info("Starting BitChan v{}".format(config.VERSION_BITCHAN))
self.refresh_settings()
self.reset_all_auto_downloads()
time.sleep(3)
self.process_stored_messages() # Process messages that were already processed and stored in the database
self.logger.debug("Initialization complete. Starting daemon.")
while self.running:
if not self.is_restarting_bitmessage:
try:
self.run_periodic()
except:
self.logger.exception("Error executing run_periodic()")
time.sleep(5)
time.sleep(1)
self.logger.info("Daemon shutting down")
def run_periodic(self):
now = time.time()
#
# Check settings
#
with session_scope(config.DB_PATH) as new_session:
settings = new_session.query(GlobalSettings).first()
if settings.maintenance_mode:
self.logger.info("Maintenance mode enabled. Pausing daemon operation.")
self.maintenance_mode = True
#
# Maintenance Mode
#
while self.maintenance_mode:
with session_scope(config.DB_PATH) as new_session:
settings = new_session.query(GlobalSettings).first()
if not settings.maintenance_mode:
self.logger.info("Maintenance mode disabled. Resuming daemon operation.")
self.maintenance_mode = False
time.sleep(1)
#
# Update the time
#
if self.allow_net_ntp:
self.update_ntp = False
if abs(self.time_last - now) > 600:
self.logger.info("Time changed? Update NTP.")
self.update_ntp = True
self.time_last = now
if self.timer_time_server < now or self.update_ntp:
while self.timer_time_server < now:
self.timer_time_server += (60 * 6 * random.randint(40, 70))
ntp = Thread(target=self.update_utc_offset)
ntp.daemon = True
ntp.start()
#
# Check Bitmessage onion address
#
if self.timer_check_onion_address < now:
try:
self.logger.debug("Run check_onion_address()")
self.check_onion_address()
self.logger.debug("End check_onion_address()")
except:
self.logger.exception("Could not complete check_onion_address()")
self.timer_check_onion_address = time.time() + 60 * 60 * 12 # 12 hours
#
# Update Chans Board Info
#
if self.timer_board_info < now or self.update_ntp:
try:
self.logger.debug("Run generate_chans_board_info()")
self.generate_chans_board_info()
self.logger.debug("End generate_chans_board_info()")
except:
self.logger.exception("Could not complete generate_chans_board_info()")
self.timer_board_info = time.time() + config.REFRESH_BOARD_INFO
#
# Check if downloads initiated
#
if self.timer_check_downloads < now or self.update_ntp:
try:
self.logger.debug("Run check_downloads()")
self.check_downloads()
self.logger.debug("End check_downloads()")
except:
self.logger.exception("Could not complete check_downloads()")
self.timer_check_downloads = time.time() + config.REFRESH_CHECK_DOWNLOAD
#
# Check if BM sync is complete
#
if self.timer_sync < now or self.update_ntp:
try:
self.logger.debug("Run check_sync()")
lf = LF()
if lf.lock_acquire("/var/lock/bm_sync_check.lock", to=60):
try:
self.check_sync_locked()
except Exception as err:
self.logger.error("Error check_sync(): {}".format(err))
finally:
lf.lock_release("/var/lock/bm_sync_check.lock")
self.logger.debug("End check_sync()")
except:
self.logger.exception("Could not complete check_sync()")
self.timer_sync = time.time() + config.REFRESH_CHECK_SYNC
#
# Check if BM is alive
#
if not self.is_restarting_bitmessage and self.timer_check_bm_alive < time.time():
try:
self.logger.debug("Run bitmessage_monitor()")
self.bitmessage_monitor()
self.logger.debug("End bitmessage_monitor()")
except:
self.logger.exception("Could not complete bitmessage_monitor()")
self.timer_check_bm_alive = time.time() + config.API_CHECK_FREQ
#
# New tor Identity
#
if self.timer_new_tor_identity < now:
try:
self.logger.debug("Run new_tor_identity()")
self.new_tor_identity()
self.logger.debug("End new_tor_identity()")
except:
self.logger.exception("Could not complete new_tor_identity()")
self.timer_new_tor_identity = time.time() + random.randint(10800, 28800)
#
# Update addresses periodically
#
if self.timer_addresses < now or self._refresh:
self._refresh = False
try:
self.logger.debug("Run update_identities()")
self.update_identities()
# self.update_subscriptions() # Currently not used
self.logger.debug("Run update_address_book()")
self.update_address_book()
self.logger.debug("Run update_chans()")
self.update_chans()
except Exception:
self.logger.exception("Updating addresses")
self.timer_addresses = time.time() + config.REFRESH_ADDRESSES
#
# Update messages periodically
#
if self.timer_queue_messages < now:
try:
self.logger.debug("Run queue_new_messages()")
self.queue_new_messages()
self.logger.debug("End queue_new_messages()")
except Exception:
self.logger.exception("Updating messages")
self.timer_queue_messages = time.time() + config.REFRESH_MSGS
#
# Update stats
#
if self.timer_stats < now:
try:
self.logger.debug("Run queue_new_messages()")
self.queue_new_messages()
self.logger.debug("End queue_new_messages()")
with session_scope(config.DB_PATH) as new_session:
post_count = new_session.query(Messages).count()
board_count = new_session.query(Chan).filter(Chan.type == "board").count()
list_count = new_session.query(Chan).filter(Chan.type == "list").count()
list_stats = [
post_count,
board_count,
list_count,
len(self._identity_dict),
len(self._address_book_dict)
]
if self.list_stats != list_stats:
msg = str(post_count)
msg += " post, " if post_count == 1 else " posts, "
msg += str(board_count)
msg += " board, " if board_count == 1 else " boards, "
msg += str(list_count)
msg += " list, " if list_count == 1 else " lists, "
msg += str(len(self._identity_dict))
msg += " identity, " if len(self._identity_dict) == 1 else " identities, "
msg += str(len(self._address_book_dict))
if len(self._address_book_dict) == 1:
msg += " address book entry"
else:
msg += " address book entries"
self.logger.info(msg)
self.list_stats = list_stats
except Exception:
self.logger.exception("Updating stats")
self.timer_stats = time.time() + config.REFRESH_STATS
#
# Update message thread queue
#
if self.timer_message_threads < now:
try:
self.logger.debug("Run check_message_threads()")
self.check_message_threads()
self.logger.debug("End check_message_threads()")
except:
self.logger.exception("Could not complete check_message_threads()")
self.timer_message_threads = time.time() + config.REFRESH_THREAD_QUEUE
#
# Clear upload progress table
#
if self.timer_clear_uploads < now:
self.logger.debug("Run clear upload progress table")
try:
with session_scope(config.DB_PATH) as new_session:
if self.first_run:
upl = new_session.query(UploadProgress).all()
else:
upl = new_session.query(UploadProgress).filter(and_(
UploadProgress.progress_percent == 100,
UploadProgress.uploading.is_(False))).all()
for each_upl in upl:
new_session.delete(each_upl)
except:
self.logger.exception("Could not complete clearing upload progress table")
self.timer_clear_uploads = time.time() + config.REFRESH_CLEAR_PROGRESS
#
# Clear inventory 10 minutes after last board/list join
#
if self.timer_clear_inventory < now:
with session_scope(config.DB_PATH) as new_session:
settings = new_session.query(GlobalSettings).first()
if settings and settings.clear_inventory:
if not self.message_threads and self.bm_sync_complete:
# Only clear inventory if no messages are being processed and sync is complete
self.logger.debug("Run clear_bm_inventory()")
self.is_pow_sending()
if self.timer_safe_send < now: # Ensure BM isn't restarted while sending
settings.clear_inventory = False
new_session.commit()
try:
self.clear_bm_inventory()
self.bm_sync_complete = False
self.update_post_numbers = True
except:
self.logger.exception("Could not complete clear_bm_inventory()")
else:
self.timer_clear_inventory += 60 # Wait additional 60 seconds if messages are being processed
#
# Get message expires time if not currently set
#
if self.timer_get_msg_expires_time < now:
try:
self.logger.debug("Run get_message_expires_times()")
self.get_message_expires_times()
self.logger.debug("End get_message_expires_times()")
except:
self.logger.exception("Could not complete get_message_expires_times()")
self.timer_get_msg_expires_time = time.time() + config.REFRESH_EXPIRES_TIME
#
# Delete messages from sent box
#
if self.timer_delete_msgs < now:
try:
self.logger.debug("Run delete_msgs()")
self.delete_msgs()
self.logger.debug("End delete_msgs()")
except:
self.logger.exception("Could not complete delete_msgs()")
self.timer_delete_msgs = time.time() + config.REFRESH_DELETE_SENT
#
# Delete entries in deleted message database 1 day after they expire
#
if self.timer_remove_deleted_msgs < now:
self.logger.debug("Run delete entries in deleted message database")
try:
self.logger.info("Checking for expired message entries")
with session_scope(config.DB_PATH) as new_session:
expired = time.time() - (24 * 60 * 60 * 5) # 5 days in the past (expired 5 days ago)
for each_msg in new_session.query(DeletedMessages).all():
if each_msg.expires_time and expired and each_msg.expires_time < expired:
self.logger.info("DeletedMessages table: delete: {}, {}".format(
each_msg.expires_time, each_msg.message_id))
new_session.delete(each_msg)
new_session.commit()
except:
self.logger.exception("remove_deleted_msgs")
self.timer_remove_deleted_msgs = time.time() + config.REFRESH_REMOVE_DEL
#
# Check lists that may be expiring and resend
#
if self.timer_send_lists < now and self.bm_sync_complete:
try:
self.logger.info("Run send_lists()")
self.send_lists()
self.logger.info("End send_lists()")
except:
self.logger.exception("Could not complete send_lists()")
self.timer_send_lists = time.time() + config.REFRESH_CHECK_LISTS
#
# Check commands that may be expiring and resend
#
if self.timer_send_commands < now and self.bm_sync_complete:
try:
self.logger.debug("Run send_commands()")
send_commands()
self.logger.debug("End send_commands()")
except:
self.logger.exception("Could not complete send_commands()")
self.timer_send_commands = time.time() + config.REFRESH_CHECK_CMDS
#
# Get unread mail counts
#
if self.timer_unread_mail < now:
try:
self.logger.debug("Run check_unread_mail()")
self.check_unread_mail()
self.logger.debug("End check_unread_mail()")
except:
self.logger.exception("Could not complete check_unread_mail()")
self.timer_unread_mail = time.time() + config.REFRESH_UNREAD_COUNT
#
# Rule: Automatically Wipe Board/List
#
if self.timer_wipe < now:
try:
with session_scope(config.DB_PATH) as new_session:
for each_chan in new_session.query(Chan).all():
if not each_chan.rules:
continue
try:
rules = json.loads(each_chan.rules)
if ("automatic_wipe" in rules and
rules["automatic_wipe"]["wipe_epoch"] < now):
self.clear_list_board_contents(each_chan.address)
while rules["automatic_wipe"]["wipe_epoch"] < now:
rules["automatic_wipe"]["wipe_epoch"] += rules["automatic_wipe"]["interval_seconds"]
each_chan.rules = json.dumps(rules)
new_session.commit()
except Exception as err:
self.logger.error("Error clearing board/list: {}".format(err))
except:
self.logger.exception("Could not complete check_unread_mail()")
self.timer_wipe = time.time() + config.REFRESH_WIPE
#
# Update post numbers
#
if (self.timer_update_post_numbers < now and
self.update_post_numbers and
self.bm_sync_complete):
try:
self.logger.debug("Run generate_post_numbers()")
self.update_post_numbers = False
self.generate_post_numbers()
self.logger.debug("End generate_post_numbers()")
except:
self.logger.exception("Could not complete generate_post_numbers()")
self.timer_update_post_numbers = time.time() + (60 * 5) # 5 minutes
#
# Delete and Vacuum
#
if self.timer_delete_and_vacuum < now:
self.timer_delete_and_vacuum = time.time() + (60 * 60 * 6) # 6 hours
# Check for expire times before deleting and vacuuming
try:
self.logger.debug("Run get_message_expires_times()")
self.get_message_expires_times()
self.logger.debug("End get_message_expires_times()")
except:
self.logger.exception("Could not complete get_message_expires_times()")
self.timer_get_msg_expires_time = time.time() + config.REFRESH_EXPIRES_TIME
try:
self.logger.debug("Run delete_and_vacuum()")
self.delete_and_vacuum()
self.logger.debug("End delete_and_vacuum()")
except:
self.logger.exception("Could not complete delete_and_vacuum()")
# Set any expire times still None to 0
self.logger.debug("Setting message expire times to 0")
try:
with session_scope(config.DB_PATH) as new_session:
msg_inbox = new_session.query(Messages).filter(
Messages.expires_time.is_(None)).all()
for each_msg in msg_inbox:
self.logger.info("{}: Setting message expire time to 0".format(
each_msg.message_id[-config.ID_LENGTH:].upper()))
each_msg.expires_time = 0
msg_deleted = new_session.query(DeletedMessages).filter(
DeletedMessages.expires_time.is_(None)).all()
for each_msg in msg_deleted:
self.logger.info("{}: Setting deleted message expire time to 0".format(
each_msg.message_id[-config.ID_LENGTH:].upper()))
each_msg.expires_time = 0
new_session.commit()
except:
self.logger.exception("Could not set message expire times to 0")
#
# Clear flask session info
#
if self.timer_clear_session_info < now:
try:
self.logger.debug("Run clear_session_info()")
self.clear_session_info()
self.logger.debug("End clear_session_info()")
except:
self.logger.exception("Could not complete clear_session_info()")
finally:
self.timer_clear_session_info = time.time() + (60 * 60 * 12) # 12 hours
#
# Check that no posts exist past lock on locked threads
#
if self.timer_check_locked_threads < now:
with session_scope(config.DB_PATH) as new_session:
admin_store = new_session.query(AdminMessageStore).count()
if self.bm_sync_complete and not admin_store:
try:
self.logger.debug("Run scan_locked_threads()")
self.scan_locked_threads()
self.logger.debug("End scan_locked_threads()")
except:
self.logger.exception("Could not complete scan_locked_threads()")
self.timer_check_locked_threads = time.time() + (60 * 60)
else:
self.timer_check_locked_threads = time.time() + 60
#
# Periodically delete stale captchas
#
if self.timer_delete_captchas < now:
try:
self.logger.debug("Run delete_old_captchas()")
self.delete_old_captchas()
self.logger.debug("End delete_old_captchas()")
except:
self.logger.exception("Could not complete delete_old_captchas()")
finally:
self.timer_delete_captchas = time.time() + (60 * 60 * 3) # 3 Hours
#
# Periodically check games
#
if self.timer_game < now:
try:
self.logger.debug("Run check_games()")
self.check_games()
self.logger.debug("End check_games()")
except:
self.logger.exception("Could not complete check_games()")
finally:
self.timer_game = time.time() + 20 # 20 seconds
#
# Periodically check every hour (at the top of the hour)
#
if self.timer_top_of_hour < now:
try:
self.logger.debug("Run hourly_run()")
self.hourly_run(self.timer_top_of_hour)
self.logger.debug("End hourly_run()")
except:
self.logger.exception("Could not complete hourly_run()")
finally:
self.timer_top_of_hour = time.time() + (60 * 60 * 3) # 1 hour
#
# Check for completed torrents (and process their downloaded data)
#
if self.timer_check_torrents < now:
try:
self.logger.debug("Run check_torrents()")
self.check_torrents()
self.logger.debug("End check_torrents()")
except:
self.logger.exception("Could not complete check_torrents()")
finally:
while self.timer_check_torrents < now:
self.timer_check_torrents += 10 # 10 seconds
#
# Remove torrents/data more than 28 days old
#
if self.timer_remove_old_torrents < now:
try:
self.logger.debug("Run remove_old_torrents()")
self.remove_old_torrents()
self.logger.debug("End remove_old_torrents()")
except:
self.logger.exception("Could not complete remove_old_torrents()")
finally:
self.timer_remove_old_torrents = time.time() + (60 * 60) # 60 minutes
self.first_run = False
def hourly_run(self, timestamp_hour):
try:
thread_hashes = {}
new_posts = {}
chan_addresses = {}
rss_threads = {}
rss_boards = {}
with session_scope(config.DB_PATH) as new_session:
for category, each_data in self.view_counter.items():
for endpoint in each_data:
if endpoint in ["index", "verify_wait", "verify_test", "verify_success"]:
count = EndpointCount()
count.timestamp_epoch = timestamp_hour
count.category = category
count.endpoint = endpoint
count.requests = self.view_counter[category][endpoint]
new_session.add(count)
if category in ["boards", "lists"] and endpoint:
if endpoint not in chan_addresses:
chan_addresses[endpoint] = 0
chan_addresses[endpoint] += self.view_counter[category][endpoint]
if category == "threads" and endpoint:
if endpoint not in thread_hashes:
thread_hashes[endpoint] = 0
thread_hashes[endpoint] += self.view_counter[category][endpoint]
if category == "new_posts" and endpoint:
if endpoint not in new_posts:
new_posts[endpoint] = 0
new_posts[endpoint] += self.view_counter[category][endpoint]
if category == "rss_thread" and endpoint:
if endpoint not in rss_threads:
rss_threads[endpoint] = 0
rss_threads[endpoint] += self.view_counter[category][endpoint]
if category == "rss_board" and endpoint:
if endpoint not in rss_boards:
rss_boards[endpoint] = 0
rss_boards[endpoint] += self.view_counter[category][endpoint]
# Add DB entry for board loads
for each_address, views in chan_addresses.items():
count = EndpointCount()
count.timestamp_epoch = timestamp_hour
count.chan_address = each_address
count.category = "requests"
count.requests = views
new_session.add(count)
# Add DB entry for thread loads
for thread_hash, views in thread_hashes.items():
count = EndpointCount()
count.timestamp_epoch = timestamp_hour
count.thread_hash = thread_hash
count.category = "requests"
count.requests = views
new_session.add(count)
new_session.commit()
# Add DB entry for rss board loads
for board_address, views in rss_boards.items():
test_count = new_session.query(EndpointCount).filter(and_(
EndpointCount.timestamp_epoch == timestamp_hour,
EndpointCount.chan_address == board_address,
EndpointCount.category == "rss_board")).first()
if test_count:
test_count.rss = views
else:
count = EndpointCount()
count.timestamp_epoch = timestamp_hour
count.chan_address = board_address
count.category = "rss_board"
count.rss = views
new_session.add(count)
new_session.commit()
# Add DB entry for rss thread loads
for thread_hash, views in rss_threads.items():
test_count = new_session.query(EndpointCount).filter(and_(
EndpointCount.timestamp_epoch == timestamp_hour,
EndpointCount.thread_hash == thread_hash,
EndpointCount.category == "rss_thread")).first()
if test_count:
test_count.rss = views
else:
count = EndpointCount()
count.timestamp_epoch = timestamp_hour
count.thread_hash = thread_hash
count.category = "rss_thread"
count.rss = views
new_session.add(count)
new_session.commit()
# Add DB entry for thread updates
for thread_hash, views in new_posts.items():
test_count = new_session.query(EndpointCount).filter(and_(
EndpointCount.timestamp_epoch == timestamp_hour,
EndpointCount.thread_hash == thread_hash,
EndpointCount.category == "requests")).first()
if test_count:
test_count.new_posts = views
else:
count = EndpointCount()
count.timestamp_epoch = timestamp_hour
count.thread_hash = thread_hash
count.category = "requests"
count.new_posts = views
new_session.add(count)
new_session.commit()
self.reset_view_counter()
except:
self.logger.error("Could not import view_counter")
def check_torrents(self):
"""Check torrents that need to be paused or processed after download completes"""
skip_size_check = False
with session_scope(config.DB_PATH) as new_session:
torrents = new_session.query(UploadTorrents).filter(
UploadTorrents.torrent_completed.is_(False)).all()
for torrent in torrents:
if not torrent.message_id:
continue # Don't initiate if message ID absent
message = new_session.query(Messages).filter(
Messages.message_id == torrent.message_id).first()
if not message:
continue # Don't initiate if message absent
# Check if torrent is present in torrent client
conn_info = dict(host=config.QBITTORRENT_HOST, port=8080)
qbt_client = qbittorrentapi.Client(**conn_info)
qbt_client.auth_log_in()
try:
torrent_prop = qbt_client.torrents_info(
torrent_hashes=torrent.torrent_hash)[0]
except:
torrent_prop = None
if not torrent_prop:
# Torrent not found in client, prevent processing this torrent again
self.logger.error(
f"{message.message_id[-config.ID_LENGTH:].upper()}: Torrent {torrent.file_hash} "
f"with hash {torrent.torrent_hash} not found in client. Download Disabled.")
message.file_do_not_download = True
message.file_currently_downloading = False
message.file_progress = "Torrent not found in client. Download disabled."
message.regenerate_post_html = True
new_session.commit()
continue
# Check if torrent should be prevented from starting
if message.file_do_not_download and not message.start_download:
continue # Don't initiate if file prevented from being downloaded
# Pause if total size is greater than max size permitted to be auto-downloaded
elif not message.file_do_not_download and not message.start_download:
settings = new_session.query(GlobalSettings).first()
max_size_bytes = settings.max_download_size * 1024 * 1024
if torrent_prop and torrent_prop.total_size > max_size_bytes:
# Torrent exists and size is too large, pause torrent
qbt_client.torrents_pause(torrent_hashes=torrent.torrent_hash)
msg = (
f"Attachments too large. "
f"Max download size allowed is {human_readable_size(max_size_bytes)}. "
f"Pausing torrent.")
self.logger.info(msg)
message.file_do_not_download = True
message.file_currently_downloading = False
message.file_progress = msg
message.regenerate_post_html = True
new_session.commit()
continue
# User manually allowed download, so skip file size check
elif message.start_download:
skip_size_check = True
#
# Torrent exists and size is not too large, allow processing downloaded files
#
try:
# Torrent is paused
if torrent_prop and torrent_prop.state in ['pausedDL', 'pausedDL']:
self.logger.info(
f"Torrent {torrent.file_hash} paused when it's allowed to be downloaded. Resuming.")
qbt_client.torrents_resume(torrent_hashes=torrent.torrent_hash)
time.sleep(1)
# Torrent is seeding
elif torrent_prop and torrent_prop.state in ['stalledUP', 'uploading']:
self.logger.info(
f"Torrent {torrent.file_hash} seeding when it hasn't been processed. Processing.")
# Check only the seeding directory for torrents that are completed and are seeding
path_downloaded_file = os.path.join('/i2p_qb/Downloads', torrent.file_hash)
path_downloaded_file_zip = os.path.join('/i2p_qb/Downloads', f"{torrent.file_hash}.zip")
if torrent.message_id and torrent.file_hash and os.path.isfile(path_downloaded_file):
self.logger.info(f"Found completed torrent data {path_downloaded_file}")
path_downloaded = path_downloaded_file
elif torrent.message_id and torrent.file_hash and os.path.isfile(path_downloaded_file_zip):
self.logger.info(f"Found completed torrent data {path_downloaded_file_zip}")
path_downloaded = path_downloaded_file_zip
else:
continue # Couldn't find completed torrent data, move to next torrent file
self.logger.info("{}: Message with torrent data hash {} found, decrypting...".format(
message.message_id[-config.ID_LENGTH:].upper(), torrent.file_hash))
# Add missing parts back into encrypted file
path_enc_file = f'/tmp/{torrent.file_hash}'
try:
file_extracts_start_base64 = json.loads(message.file_extracts_start_base64)
except:
file_extracts_start_base64 = None
if file_extracts_start_base64:
self.logger.info(f"extracts: {file_extracts_start_base64}")
size_before = os.path.getsize(path_downloaded)
data_file_multiple_insert(
path_downloaded,
file_extracts_start_base64,
chunk=4096,
copy_file_path=path_enc_file)
self.logger.info("{}: File data insertion. Before: {}, After: {}".format(
message.message_id[-config.ID_LENGTH:].upper(),
size_before,
os.path.getsize(path_enc_file)))
else:
shutil.copy(path_downloaded, path_enc_file)
# Compare encrypted file hash to expected hash
if message.file_sha256_hash:
if not validate_file(path_enc_file, message.file_sha256_hash):
message.file_progress = "Attachment hash ({}) doesn't match expected hash ({}).".format(
generate_hash(path_enc_file), message.file_sha256_hash)
message.file_sha256_hashes_match = False
message.regenerate_post_html = True
new_session.commit()
return
else:
message.file_sha256_hashes_match = True
# Hashes match, decrypt and extract attachments
status, media_info, message_steg = decrypt_and_process_attachments(
message.message_id,
message.file_enc_cipher,
message.file_enc_key_bytes,
message.file_enc_password,
path_enc_file,
skip_size_check=skip_size_check)
if not status: # Success
message.file_download_successful = True
message.file_currently_downloading = False
message.file_progress = ""
message.media_info = json.dumps(media_info)
message.message_steg = json.dumps(message_steg)
torrent.torrent_completed = True # No longer include this torrent in this search function
else: # Failure
message.file_currently_downloading = False
message.file_download_successful = False
message.file_do_not_download = True
new_session.commit()
regenerate_card_popup_post_html(message_id=torrent.message_id)
check_banned_file_hashes(torrent.message_id, media_info)
# Torrent is in some unpaused state but not completed downloading
elif torrent_prop and torrent_prop.state in [
'checkingUP', 'allocating', 'downloading', 'queuedDL',
'stalledDL', 'checkingDL', 'checkingResumeData']:
pass
# Torrent is in some unknown state. Record to logs to diagnose the issue.
else:
self.logger.error(f"Torrent {torrent.file_hash} in unknown state '{torrent_prop.state}'")
except:
self.logger.exception(f"Checking torrent {torrent.file_hash} for completion")
qbt_client.auth_log_out()
def remove_old_torrents(self):
"""Remove torrents and data if more than 28 days old"""
now = time.time()
with session_scope(config.DB_PATH) as new_session:
settings = new_session.query(GlobalSettings).first()