Skip to content

Commit

Permalink
Merge pull request #13 from black-cape/feature/postgres-workflow
Browse files Browse the repository at this point in the history
Add Postgres for Workflow tracking
  • Loading branch information
capeken committed Apr 21, 2022
2 parents a85ff5f + 16b8406 commit 78f7019
Show file tree
Hide file tree
Showing 13 changed files with 1,120 additions and 398 deletions.
8 changes: 4 additions & 4 deletions etl/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Entry point for ETL worker"""
from typing import AsyncIterable, Dict, List
import logging.config
from typing import AsyncIterable, Dict, List

from etl.config import settings
from etl.event_processor import GeneralEventProcessor, EtlConfigEventProcessor
from etl.event_processor import EtlConfigEventProcessor, GeneralEventProcessor
from etl.messaging.kafka_producer import KafkaMessageProducer
from etl.object_store.minio import MinioObjectStore
from etl.tasking.faust import FaustTaskSink, FaustAppConfig
from etl.tasking.faust import FaustAppConfig, FaustTaskSink

logging.config.fileConfig(settings.logging_conf_file)

Expand All @@ -29,7 +29,7 @@ async def etl_config_file_evt(evts: AsyncIterable[Dict]) -> None:
#Fause Agent definition to process ETL source data files
async def general_file_evt(evts: AsyncIterable[Dict]) -> None:
async for evt in evts:
etl_source_data_event_processor.process(evt)
await etl_source_data_event_processor.process(evt)


# configure two separate Faust Apps for the single Faust worker in FaustTaskSink
Expand Down
8 changes: 4 additions & 4 deletions etl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ class Settings(BaseSettings):
"""Settings derived from command line, environment variables, .env file or defaults """

# Consumer group, also used as FAUST APP ID
# we want this to operate in pub/sub mode so all cast iron worker instance gets a same copy of config, hence
# this should be overwritten to be different per worker
# we want this to operate in pub/sub mode so all cast iron worker instance gets a same copy of
# config, hence this should be overwritten to be different per worker
consumer_grp_etl_config = 'etl-config-grp'
consumer_grp_etl_source_file = 'etl-source-file-grp'

database_host: str = 'localhost'
database_host: str = 'postgres'
database_password: str = '12345678' # Default for local debugging
database_port: int = 5432
database_user: str = 'castiron'
database_table: str = 'castiron'
database_db: str = 'castiron'

kafka_broker: str = 'localhost:9092'
kafka_topic_castiron_etl_config = 'castiron_etl_config'
Expand Down
Empty file added etl/database/__init__.py
Empty file.
110 changes: 110 additions & 0 deletions etl/database/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Contains the Minio implementation of the object store backend interface"""
import json
from datetime import datetime
from sqlite3 import DatabaseError
from typing import Any, Dict, Optional
from asyncpg import InvalidPasswordError

from asyncpg_utils.databases import PoolDatabase
from asyncpg_utils.managers import TableManager

from etl.config import settings
from etl.database.interfaces import DatabaseStore, FileObject
from etl.util import get_logger

LOGGER = get_logger(__name__)


class PGDatabase(DatabaseStore):
"""Implements the DatabaseStore interface using Minio as the backend service"""
def __init__(self):
self._database = PoolDatabase(f'postgres://{settings.database_user}:{settings.database_password}@{settings.database_host}/{settings.database_db}')
self._table_manager = TableManager(self._database, 'files', pk_field='id', hooks=None)

async def create_table(self) -> bool:
""" Check for and create database table """
LOGGER.info('Creating DB table...')
try:
await self._database.init_pool()
conn = await self._database.get_connection()
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS files (
id uuid PRIMARY KEY,
bucket_name text,
file_name text,
status text,
processing_status text,
original_filename text,
event_name text,
source_ip text,
size int,
etag text,
content_type text,
create_datetime timestamp with time zone,
update_datetime timestamp with time zone,
classification jsonb,
metadata jsonb
);
"""
)
await conn.close()
return True
except (DatabaseError, InvalidPasswordError) as db_error:
LOGGER.info('Database not active. Exception: %s', db_error)
return False

async def insert_file(self, filedata: FileObject):
""" Track a new file from Minio"""
LOGGER.info("Inserting file into DB...")
await self._database.insert('files', dict(filedata))

async def move_file(self, rowid: str, new_name: str):
""" Track the moving of a file in Minio"""
rec_data = {}
rec_data['path'] = new_name
rec_data['update_datetime'] = f'{datetime.now().isoformat()}Z'
await self._table_manager.update(rowid, rec_data)

async def delete_file(self, rowid: str):
""" Track the deleting of a file in Minio"""
await self._table_manager.delete(rowid)

async def list_files(self, metadata: Optional[Dict]):
""" List all tracked files by provided filter """
return await self._table_manager.list(filters=metadata)

async def retrieve_file_metadata(self, rowid: str):
""" Retrieve a row based on ID """
return await self._table_manager.detail(rowid)

async def update_status(self, rowid: str, new_status: str, new_filename: str):
""" Update the file status/state """
rec_data = {}
rec_data['status'] = new_status
rec_data['file_name'] = new_filename
rec_data['update_datetime'] = datetime.now()
await self._table_manager.update(rowid, rec_data)

def parse_notification(self, evt_data: Any):
""" Parse a Minio notification to create a DB row """
LOGGER.info(evt_data)
bucket_name, file_name = evt_data['Key'].split('/', 1)
metadata = evt_data['Records'][0]['s3']['object'].get('userMetadata', None)
db_evt = {
'id': metadata.get('X-Amz-Meta-Id', None),
'bucket_name': bucket_name,
'file_name': file_name,
'status': 'Queued',
'processing_status': None,
'original_filename': metadata.get('X-Amz-Meta-Originalfilename', None),
'event_name': evt_data['EventName'],
'source_ip': evt_data['Records'][0]['requestParameters']['sourceIPAddress'],
'size': evt_data['Records'][0]['s3']['object']['size'],
'etag': evt_data['Records'][0]['s3']['object']['eTag'],
'content_type': evt_data['Records'][0]['s3']['object']['contentType'],
'create_datetime': datetime.now(),
'classification': metadata.get('X-Amz-Meta-Classification', None),
'metadata': json.dumps(metadata)
}
return db_evt
73 changes: 73 additions & 0 deletions etl/database/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Describes interface for sending messages to a message broker"""
import abc
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional


@dataclass
class FileObject:
"""Represents an implementation-neutral file event"""
id: str
bucket_name: str
file_name: str
status: str
processing_status: str
original_filename: str
event_name: str
source_ip: str
size: int
etag: str
content_type: str
create_datetime: datetime
update_datetime: datetime
classification: str
metadata: str


class DatabaseStore(abc.ABC):
"""Interface for message producer backend"""

async def insert_file(self, filedata: FileObject) -> None:
"""Insert a file record
:param file: Dict containing record
"""
raise NotImplementedError

async def move_file(self, rowid: str, new_name: str) -> None:
"""Rename a file record
:param id: The id
:param newName: New path value
"""
raise NotImplementedError

async def update_status(self, rowid: str, new_status: str, new_filename: str) -> None:
"""Rename a file record
:param id: The id
:param newStatus: New status value
"""
raise NotImplementedError

async def delete_file(self, rowid: str) -> None:
"""Delete a record
:param id: The id
"""
raise NotImplementedError

async def list_files(self, metadata: Optional[Dict]) -> List[Dict]:
"""Retrieve records based metadata criteria
:param metadata: Dict containing query restrictions
"""
raise NotImplementedError

async def retrieve_file_metadata(self, rowid: str) -> Dict:
"""Retrieve a row based on ID
:param id: The id
"""
raise NotImplementedError

def parse_notification(self, evt_data: Any) -> Dict:
"""Parse the event into a DB row/dict
:param evt_data: The event data from S3/Minio
"""
raise NotImplementedError
70 changes: 56 additions & 14 deletions etl/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@
import subprocess
import tempfile
import traceback
from pathlib import Path
from typing import Dict, Optional
from uuid import uuid4

from etl.config import settings
from etl.database.database import PGDatabase
from etl.file_processor_config import (FileProcessorConfig,
load_python_processor, try_loads)
from etl.messaging.interfaces import MessageProducer
from etl.object_store.interfaces import EventType, ObjectStore
from etl.object_store.object_id import ObjectId
from etl.path_helpers import (filename, get_archive_path, get_error_path,
get_inbox_path, get_processing_path,
processor_matches, parent, rename)
get_inbox_path, get_processing_path, parent,
processor_matches, rename)
from etl.pizza_tracker import PizzaTracker
from etl.util import create_rest_client, get_logger, short_uuid

Expand All @@ -24,9 +27,9 @@
file_suffix_to_ignore = ['.toml', '.keep', ERROR_LOG_SUFFIX]


# As per https://stackoverflow.com/questions/19924104/python-multiprocessing-handling-child-errors-in-parent/33599967#33599967
# needs to bubble exception up to parent
# As per stackoverflow.com needs to bubble exception up to parent
class ProcessWithExceptionBubbling(mp.Process):
""" Class the handles multiprocessing with exception bubbling """
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
Expand All @@ -43,6 +46,7 @@ def run(self):

@property
def exception(self):
""" Exception bubbling """
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
Expand All @@ -51,7 +55,7 @@ def exception(self):
class EtlConfigEventProcessor:
"""A service that processes individual object events"""
# cached list of processor configs, needs to be accessed outside of ths class
processors: Dict[ObjectId, FileProcessorConfig] = dict()
processors: Dict[ObjectId, FileProcessorConfig] = {}

def __init__(self, object_store: ObjectStore):
self._object_store = object_store
Expand Down Expand Up @@ -85,7 +89,7 @@ def _toml_put(self, toml_object_id: ObjectId) -> bool:
if cfg.enabled:
# Register processor
EtlConfigEventProcessor.processors[toml_object_id] = cfg
LOGGER.info(f'number of processor configs: {len(EtlConfigEventProcessor.processors)}')
LOGGER.info('number of processor configs: %s', len(EtlConfigEventProcessor.processors))
for processor_key in EtlConfigEventProcessor.processors.keys():
LOGGER.info(processor_key)

Expand Down Expand Up @@ -115,9 +119,17 @@ def __init__(self, object_store: ObjectStore, message_producer: MessageProducer)
self._object_store = object_store
self._message_producer = message_producer
self._rest_client = create_rest_client()
self._database = PGDatabase()
self._database_active = False

def process(self, evt_data: Dict) -> None:
async def process(self, evt_data: Dict) -> None:
"""Object event process entry point"""

# Check for database connection
if not self._database_active:
self._database_active = await self._database.create_table()
db_evt = {}

evt = self._object_store.parse_notification(evt_data)
# this processor would get TOML config as well as regular file upload, there doesn't seem to be a way
# to filter bucket notification to exclude by file path
Expand All @@ -127,9 +139,34 @@ def process(self, evt_data: Dict) -> None:
if evt.event_type == EventType.Delete:
pass
elif evt.event_type == EventType.Put:
self._file_put(evt.object_id)
if evt.original_filename:
if not evt.event_status:
if self._database_active:
db_evt = self._database.parse_notification(evt_data)
try:
await self._database.insert_file(db_evt)
except Exception as e:
LOGGER.error(f'Database error. Unable to process/track file. Exception: {e}')
await self._file_put(evt.object_id, db_evt.get('id', None))
else:
# New object. "Rename" object.
obj_path = Path(evt.object_id.path)
dirpath = obj_path.parent
filename = obj_path.name
obj_uuid = str(uuid4())
new_path = f'{dirpath}/{obj_uuid}-{filename}'
dest_object_id = ObjectId(evt.object_id.namespace, f'{new_path}')

metadata = evt_data['Records'][0]['s3']['object'].get('userMetadata', {})
metadata['originalFilename'] = filename
metadata['id'] = obj_uuid

def _file_put(self, object_id: ObjectId) -> bool:
self._object_store.move_object(evt.object_id, dest_object_id, metadata)




async def _file_put(self, object_id: ObjectId, uuid: str) -> bool:
"""Handle possible data file puts.
:return: True if successful.
"""
Expand All @@ -138,7 +175,6 @@ def _file_put(self, object_id: ObjectId) -> bool:
LOGGER.info(f'number of ETL processing configs now {len(EtlConfigEventProcessor.processors)}')

for config_object_id, processor in EtlConfigEventProcessor.processors.items():

if (
parent(object_id) != get_inbox_path(config_object_id, processor) or
not processor_matches(
Expand All @@ -162,13 +198,15 @@ def _file_put(self, object_id: ObjectId) -> bool:
self._message_producer.job_created(job_id, filename(object_id), filename(config_object_id), 'castiron')

# mv to processing
self._object_store.move_object(object_id, processing_file)
metadata = self._object_store.retrieve_object_metadata(object_id)
metadata['status'] = 'Processing'
self._object_store.move_object(object_id, processing_file, metadata)
await self._database.update_status(uuid, 'Processing', processing_file.path)

with tempfile.TemporaryDirectory() as work_dir:
# Download to local temp working directory
local_data_file = os.path.join(work_dir, filename(object_id))
self._object_store.download_object(processing_file, local_data_file)
metadata = self._object_store.retrieve_object_metadata(processing_file)

with open(os.path.join(work_dir, 'out.txt'), 'w') as out, \
PizzaTracker(self._message_producer, work_dir, job_id) as pizza_tracker:
Expand Down Expand Up @@ -255,11 +293,15 @@ def _file_put(self, object_id: ObjectId) -> bool:
if success:
# Success. mv to archive
if archive_file:
self._object_store.move_object(processing_file, archive_file)
metadata['status'] = 'Success'
self._object_store.move_object(processing_file, archive_file, metadata)
await self._database.update_status(uuid, 'Success', archive_file.path)
self._message_producer.job_evt_status(job_id, 'success')
else:
# Failure. mv to failed
self._object_store.move_object(processing_file, error_file)
metadata['status'] = 'Failed'
self._object_store.move_object(processing_file, error_file, metadata)
await self._database.update_status(uuid, 'Failed', error_file.path)
self._message_producer.job_evt_status(job_id, 'failure')

# Optionally save error log to failed, use same metadata as original file
Expand Down

0 comments on commit 78f7019

Please sign in to comment.