/
crawlers.py
228 lines (207 loc) · 8.43 KB
/
crawlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
__filename__ = "crawlers.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "Core"
import os
import time
from utils import data_dir
from utils import save_json
from utils import user_agent_domain
from utils import remove_eol
from blocking import get_mil_domains_list
from blocking import update_blocked_cache
from blocking import is_blocked_domain
default_user_agent_blocks = [
'fedilist', 'ncsc scan'
]
def update_known_crawlers(ua_str: str,
base_dir: str, known_crawlers: {},
last_known_crawler: int) -> int:
"""Updates a dictionary of known crawlers accessing nodeinfo
or the masto API
"""
if not ua_str:
return None
curr_time = int(time.time())
if known_crawlers.get(ua_str):
known_crawlers[ua_str]['hits'] += 1
known_crawlers[ua_str]['lastseen'] = curr_time
else:
known_crawlers[ua_str] = {
"lastseen": curr_time,
"hits": 1
}
if curr_time - last_known_crawler >= 30:
# remove any old observations
remove_crawlers = []
for uagent, item in known_crawlers.items():
if curr_time - item['lastseen'] >= 60 * 60 * 24 * 30:
remove_crawlers.append(uagent)
for uagent in remove_crawlers:
del known_crawlers[uagent]
# save the list of crawlers
dir_str = data_dir(base_dir)
save_json(known_crawlers, dir_str + '/knownCrawlers.json')
return curr_time
def load_known_web_bots(base_dir: str) -> []:
"""Returns a list of known web bots
"""
known_bots_filename = data_dir(base_dir) + '/knownBots.txt'
if not os.path.isfile(known_bots_filename):
return []
crawlers_str = None
try:
with open(known_bots_filename, 'r', encoding='utf-8') as fp_crawlers:
crawlers_str = fp_crawlers.read()
except OSError:
print('EX: unable to load web bots from ' +
known_bots_filename)
if not crawlers_str:
return []
known_bots = []
crawlers_list = crawlers_str.split('\n')
for crawler in crawlers_list:
if not crawler:
continue
crawler = remove_eol(crawler).strip()
if not crawler:
continue
if crawler not in known_bots:
known_bots.append(crawler)
return known_bots
def _save_known_web_bots(base_dir: str, known_bots: []) -> bool:
"""Saves a list of known web bots
"""
known_bots_filename = data_dir(base_dir) + '/knownBots.txt'
known_bots_str = ''
for crawler in known_bots:
known_bots_str += crawler.strip() + '\n'
try:
with open(known_bots_filename, 'w+', encoding='utf-8') as fp_crawlers:
fp_crawlers.write(known_bots_str)
except OSError:
print("EX: unable to save known web bots to " +
known_bots_filename)
return False
return True
def blocked_user_agent(calling_domain: str, agent_str: str,
news_instance: bool, debug: bool,
user_agents_blocked: [],
blocked_cache_last_updated,
base_dir: str,
blocked_cache: [],
block_federated: [],
blocked_cache_update_secs: int,
crawlers_allowed: [],
known_bots: [], path: str,
block_military: {}) -> (bool,):
"""Should a GET or POST be blocked based upon its user agent?
"""
if not agent_str:
return True, blocked_cache_last_updated
agent_str_lower = agent_str.lower()
for ua_block in default_user_agent_blocks:
if ua_block in agent_str_lower:
print('BLOCK: Blocked User agent 1: ' + ua_block)
return True, blocked_cache_last_updated
agent_domain = None
if agent_str:
# is this a web crawler? If so then block it by default
# unless this is a news instance or if it is in the allowed list
bot_strings = (
'bot/', 'bot-', '/bot', '_bot', 'bot_', 'bot;', ' bot ',
'/robot', 'gptbot', '-ai/', ' ai/', '-ai ',
' ai ', 'spider/', 'spider.ht', '/spider.', '-spider',
'externalhit/', 'chatgpt', 'google', 'anthropic',
'facebook', 'slurp', 'crawler', 'crawling', ' crawl ',
'gigablast', 'archive.org', 'httrack',
'spider-', ' spider ', 'findlink', 'ips-agent',
'woriobot', 'mlbot', 'webbot', 'webcrawl',
'voilabot', 'rank/', 'ezooms', 'heritrix', 'indeedbot',
'woobot', 'infobot', 'viewbot', 'swimgbot', 'eright',
'apercite', 'bot (', 'summify', 'ccbot', 'linkfind',
'linkanalyze', 'analyzer', 'wotbox', 'ichiro',
'drupact', 'searchengine', 'coccoc',
'explorer/', 'explorer;', 'crystalsemantics',
'scraper/', ' scraper ', ' scrape ', 'scraping')
contains_bot_string = False
for bot_str in bot_strings:
if bot_str in agent_str_lower:
if '://bot' not in agent_str_lower and \
'://robot' not in agent_str_lower and \
'://spider' not in agent_str_lower and \
'pixelfedbot/' not in agent_str_lower:
contains_bot_string = True
break
if contains_bot_string:
if agent_str_lower not in known_bots:
known_bots.append(agent_str_lower)
known_bots.sort()
_save_known_web_bots(base_dir, known_bots)
# if this is a news instance then we want it
# to be indexed by search engines
if news_instance:
return False, blocked_cache_last_updated
# is this crawler allowed?
for crawler in crawlers_allowed:
if crawler.lower() in agent_str_lower:
return False, blocked_cache_last_updated
print('BLOCK: Blocked Crawler: ' + agent_str)
return True, blocked_cache_last_updated
# get domain name from User-Agent
agent_domain = user_agent_domain(agent_str, debug)
else:
# no User-Agent header is present
return True, blocked_cache_last_updated
# is the User-Agent type blocked? eg. "Mastodon"
if user_agents_blocked:
blocked_ua = False
for agent_name in user_agents_blocked:
if agent_name in agent_str:
blocked_ua = True
break
if blocked_ua:
return True, blocked_cache_last_updated
if not agent_domain:
return False, blocked_cache_last_updated
# is the User-Agent domain blocked
blocked_ua = False
if not agent_domain.startswith(calling_domain):
blocked_cache_last_updated = \
update_blocked_cache(base_dir, blocked_cache,
blocked_cache_last_updated,
blocked_cache_update_secs)
blocked_ua = \
is_blocked_domain(base_dir, agent_domain,
blocked_cache, block_federated)
if blocked_ua:
print('BLOCK: Blocked User agent 2: ' + agent_domain)
# optionally block military domains on a per account basis
if not blocked_ua and block_military:
if '/users/' in path:
# which accounts is this?
nickname = path.split('/users/')[1]
if '/' in nickname:
nickname = nickname.split('/')[0]
# does this account block military domains?
if block_military.get(nickname):
mil_domains = get_mil_domains_list()
for domain_str in mil_domains:
if '.' not in domain_str:
tld = domain_str
if agent_domain.endswith('.' + tld):
blocked_ua = True
print('BLOCK: Blocked military tld user agent: ' +
agent_domain)
break
else:
if agent_domain.endswith(domain_str):
blocked_ua = True
print('BLOCK: Blocked military user agent: ' +
agent_domain)
break
return blocked_ua, blocked_cache_last_updated