-
Notifications
You must be signed in to change notification settings - Fork 282
/
Mixer.py
executable file
·224 lines (177 loc) · 9.12 KB
/
Mixer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The Mixer Module
================
This module is consuming the Redis-list created by the ZMQ_Feed_Q Module.
This module take all the feeds provided in the config.
Depending on the configuration, this module will process the feed as follow:
operation_mode 1: "Avoid any duplicate from any sources"
- The module maintain a list of content for each paste
- If the content is new, process it
- Else, do not process it but keep track for statistics on duplicate
operation_mode 2: "Keep duplicate coming from different sources"
- The module maintain a list of name given to the paste by the feeder
- If the name has not yet been seen, process it
- Elseif, the saved content associated with the paste is not the same, process it
- Else, do not process it but keep track for statistics on duplicate
operation_mode 3: "Don't look if duplicated content"
- SImply do not bother to check if it is a duplicate
- Simply do not bother to check if it is a duplicate
Note that the hash of the content is defined as the sha1(gzip64encoded).
Every data coming from a named feed can be sent to a pre-processing module before going to the global module.
The mapping can be done via the variable FEED_QUEUE_MAPPING
"""
import os
import sys
import base64
import hashlib
import time
from pubsublogger import publisher
import redis
from Helper import Process
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
# CONFIG #
refresh_time = 30
FEED_QUEUE_MAPPING = { "feeder2": "preProcess1" } # Map a feeder name to a pre-processing module
if __name__ == '__main__':
publisher.port = 6380
publisher.channel = 'Script'
config_section = 'Mixer'
p = Process(config_section)
config_loader = ConfigLoader.ConfigLoader()
# REDIS #
server = config_loader.get_redis_conn("Redis_Mixer_Cache")
server_cache = config_loader.get_redis_conn("Redis_Log_submit")
# LOGGING #
publisher.info("Feed Script started to receive & publish.")
# OTHER CONFIG #
operation_mode = config_loader.get_config_int("Module_Mixer", "operation_mode")
ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate")
default_unnamed_feed_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name")
PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/'
config_loader = None
# STATS #
processed_paste = 0
processed_paste_per_feeder = {}
duplicated_paste_per_feeder = {}
time_1 = time.time()
print('Operation mode ' + str(operation_mode))
while True:
message = p.get_from_set()
if message is not None:
splitted = message.split()
if len(splitted) == 2:
complete_paste, gzip64encoded = splitted
try:
#feeder_name = ( complete_paste.replace("archive/","") ).split("/")[0]
feeder_name, paste_name = complete_paste.split('>>')
feeder_name.replace(" ","")
if 'import_dir' in feeder_name:
feeder_name = feeder_name.split('/')[1]
except ValueError as e:
feeder_name = default_unnamed_feed_name
paste_name = complete_paste
# remove absolute path
paste_name = paste_name.replace(PASTES_FOLDER, '', 1)
# Processed paste
processed_paste += 1
try:
processed_paste_per_feeder[feeder_name] += 1
except KeyError as e:
# new feeder
processed_paste_per_feeder[feeder_name] = 1
duplicated_paste_per_feeder[feeder_name] = 0
relay_message = "{0} {1}".format(paste_name, gzip64encoded)
#relay_message = b" ".join( [paste_name, gzip64encoded] )
digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest()
# Avoid any duplicate coming from any sources
if operation_mode == 1:
if server.exists(digest): # Content already exists
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
else: # New content
# populate Global OR populate another set based on the feeder_name
if feeder_name in FEED_QUEUE_MAPPING:
p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
server.sadd(digest, feeder_name)
server.expire(digest, ttl_key)
# Keep duplicate coming from different sources
elif operation_mode == 2:
# Filter to avoid duplicate
content = server.get('HASH_'+paste_name)
if content is None:
# New content
# Store in redis for filtering
server.set('HASH_'+paste_name, digest)
server.sadd(paste_name, feeder_name)
server.expire(paste_name, ttl_key)
server.expire('HASH_'+paste_name, ttl_key)
# populate Global OR populate another set based on the feeder_name
if feeder_name in FEED_QUEUE_MAPPING:
p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else:
if digest != content:
# Same paste name but different content
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
server.sadd(paste_name, feeder_name)
server.expire(paste_name, ttl_key)
# populate Global OR populate another set based on the feeder_name
if feeder_name in FEED_QUEUE_MAPPING:
p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else:
# Already processed
# Keep track of processed pastes
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
continue
else:
# populate Global OR populate another set based on the feeder_name
if feeder_name in FEED_QUEUE_MAPPING:
p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else:
# TODO Store the name of the empty paste inside a Redis-list.
print("Empty Paste: not processed")
publisher.debug("Empty Paste: {0} not processed".format(message))
else:
print("Empty Queues: Waiting...")
if int(time.time() - time_1) > refresh_time:
# update internal feeder
list_feeder = server_cache.hkeys("mixer_cache:list_feeder")
if list_feeder:
for feeder in list_feeder:
count = int(server_cache.hget("mixer_cache:list_feeder", feeder))
if count is None:
count = 0
processed_paste_per_feeder[feeder] = processed_paste_per_feeder.get(feeder, 0) + count
processed_paste = processed_paste + count
print(processed_paste_per_feeder)
to_print = 'Mixer; ; ; ;mixer_all All_feeders Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time)
print(to_print)
publisher.info(to_print)
processed_paste = 0
for feeder, count in processed_paste_per_feeder.items():
to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time)
print(to_print)
publisher.info(to_print)
processed_paste_per_feeder[feeder] = 0
for feeder, count in duplicated_paste_per_feeder.items():
to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time)
print(to_print)
publisher.info(to_print)
duplicated_paste_per_feeder[feeder] = 0
time_1 = time.time()
# delete internal feeder list
server_cache.delete("mixer_cache:list_feeder")
time.sleep(0.5)
continue