Skip to content

Commit

Permalink
chg: [importers] improve abstract class and logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Terrtia committed Jun 2, 2023
1 parent 550ca3a commit 7c77995
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 54 deletions.
2 changes: 1 addition & 1 deletion bin/DB_KVROCKS_MIGRATION.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def domain_migration():
domain = Domains.Domain(dom)
domain.update_daterange(first_seen)
domain.update_daterange(last_check)
domain._set_ports(ports) # TODO ############################################################################
# domain._set_ports(ports)
if last_origin:
domain.set_last_origin(last_origin)
for language in languages:
Expand Down
32 changes: 12 additions & 20 deletions bin/importer/FileImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,29 @@

logging.config.dictConfig(ail_logger.get_config(name='modules'))

# TODO Clean queue one object destruct

class FileImporter(AbstractImporter):
def __init__(self, feeder='file_import'):
super().__init__()
super().__init__(queue=True)
self.logger = logging.getLogger(f'{self.__class__.__name__}')

self.feeder_name = feeder # TODO sanityze feeder name

# Setup the I/O queues
self.queue = AILQueue('FileImporter', 'manual')

def importer(self, path):
if os.path.isfile(path):
with open(path, 'rb') as f:
content = f.read()
mimetype = ail_files.get_mimetype(content)
if ail_files.is_text(mimetype):
if content:
mimetype = ail_files.get_mimetype(content)
item_id = ail_files.create_item_id(self.feeder_name, path)
content = ail_files.create_gzipped_b64(content)
if content:
message = f'dir_import {item_id} {content}'
self.logger.info(message)
self.queue.send_message(message)
elif mimetype == 'application/gzip':
item_id = ail_files.create_item_id(self.feeder_name, path)
content = ail_files.create_b64(content)
if content:
message = f'dir_import {item_id} {content}'
self.logger.info(message)
self.queue.send_message(message)
gzipped = False
if mimetype == 'application/gzip':
gzipped = True
elif not ail_files.is_text(mimetype):
return None

message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import')
if message:
self.add_message_to_queue(message)

class DirImporter(AbstractImporter):
def __init__(self):
Expand Down
14 changes: 3 additions & 11 deletions bin/importer/PystemonImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
# https://github.com/cvandeplas/pystemon/blob/master/pystemon.yaml#L52
#

import base64
import os
import gzip
import sys
import redis

Expand All @@ -32,10 +30,6 @@ def __init__(self, pystemon_dir, host='localhost', port=6379, db=10):
self.r_pystemon = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
self.dir_pystemon = pystemon_dir

# # TODO: add exception
def encode_and_compress_data(self, content):
return base64.b64encode(gzip.compress(content)).decode()

def importer(self):
item_id = self.r_pystemon.lpop("pastes")
print(item_id)
Expand All @@ -53,9 +47,8 @@ def importer(self):
if not content:
return None

b64_gzipped_content = self.encode_and_compress_data(content)
print(item_id, b64_gzipped_content)
return f'{item_id} {b64_gzipped_content}'
return self.create_message(item_id, content, source='pystemon')

except IOError as e:
print(f'Error: {full_item_path}, IOError')
return None
Expand All @@ -81,8 +74,7 @@ def get_message(self):
return self.importer.importer()

def compute(self, message):
relay_message = f'pystemon {message}'
self.add_message_to_queue(relay_message)
self.add_message_to_queue(message)


if __name__ == '__main__':
Expand Down
70 changes: 65 additions & 5 deletions bin/importer/abstract_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,41 @@
Import Content
"""
import base64
import gzip
import logging
import logging.config
import os
import sys

from abc import ABC, abstractmethod


# sys.path.append(os.environ['AIL_BIN'])
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
# from ConfigLoader import ConfigLoader
from lib import ail_logger
from lib.ail_queues import AILQueue

class AbstractImporter(ABC):
def __init__(self):
logging.config.dictConfig(ail_logger.get_config(name='modules'))

# TODO Clean queue one object destruct

class AbstractImporter(ABC): # TODO ail queues
def __init__(self, queue=False):
"""
Init Module
importer_name: str; set the importer name if different from the instance ClassName
AIL Importer
:param queue: Allow to push messages to other modules
"""
# Module name if provided else instance className
self.name = self._name()
self.logger = logging.getLogger(f'{self.__class__.__name__}')

# Setup the I/O queues for one shot importers
if queue:
self.queue = AILQueue(self.name, 'importer_manual')

@abstractmethod
def importer(self, *args, **kwargs):
Expand All @@ -39,4 +54,49 @@ def _name(self):
"""
return self.__class__.__name__

def add_message_to_queue(self, message, queue_name=None):
"""
Add message to queue
:param message: message to send in queue
:param queue_name: queue or module name
ex: add_message_to_queue(item_id, 'Mail')
"""
if message:
self.queue.send_message(message, queue_name)

@staticmethod
def b64(content):
if isinstance(content, str):
content = content.encode()
return base64.b64encode(content).decode()

@staticmethod
def create_gzip(content):
if isinstance(content, str):
content = content.encode()
return gzip.compress(content)

def b64_gzip(self, content):
try:
gziped = self.create_gzip(content)
return self.b64(gziped)
except Exception as e:
self.logger.warning(e)
return ''

def create_message(self, obj_id, content, b64=False, gzipped=False, source=None):
if not gzipped:
content = self.b64_gzip(content)
elif not b64:
content = self.b64(gzipped)
if not content:
return None
if isinstance(content, bytes):
content = content.decode()
if not source:
source = self.name
self.logger.info(f'{source} {obj_id}')
# self.logger.debug(f'{source} {obj_id} {content}')
return f'{source} {obj_id} {content}'

14 changes: 0 additions & 14 deletions bin/lib/ail_files.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*

import base64
import datetime
import gzip
import logging.config
import magic
import os
Expand Down Expand Up @@ -181,15 +179,3 @@ def create_item_id(feeder_name, path):
item_id = os.path.join(feeder_name, date, basename)
# TODO check if already exists
return item_id

def create_b64(b_content):
return base64.standard_b64encode(b_content).decode()

def create_gzipped_b64(b_content):
try:
gzipencoded = gzip.compress(b_content)
gzip64encoded = create_b64(gzipencoded)
return gzip64encoded
except Exception as e:
logger.warning(e)
return ''
5 changes: 2 additions & 3 deletions bin/modules/abstract_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ class AbstractModule(ABC):

def __init__(self, module_name=None, queue=True):
"""
Init Module
AIL Module,
module_name: str; set the module name if different from the instance ClassName
queue_name: str; set the queue name if different from the instance ClassName
logger_channel: str; set the logger channel name, 'Script' by default
:param queue: Allow to push messages to other modules
"""
self.logger = logging.getLogger(f'{self.__class__.__name__}')

Expand Down

0 comments on commit 7c77995

Please sign in to comment.