Skip to content

Commit

Permalink
Merge pull request #15 from black-cape/deprecate-mimetype-based-matching
Browse files Browse the repository at this point in the history
Deprecate mimetype based matching
  • Loading branch information
huanchh2 committed Oct 13, 2023
2 parents 398a39b + e8485dd commit ec98ef2
Show file tree
Hide file tree
Showing 11 changed files with 1,075 additions and 1,676 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Ray Worker

### 2.0.0

- No longer needing Mime Type based matching and reverting to initial Ray Worker Implementation
- Mime Type based routing should be moved to worker_run_method as configured in config

3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ help: ## This info
build: ## build the docker image
docker build -t cast-iron/ray-worker .

up: ## Run the service and its docker dependencies, using a cached build if available
docker compose up --detach

nag: sort lint type test ## Run all checks

lint: ## Run pylint over the main project files
Expand Down
9 changes: 0 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ ray-cast-iron-worker | (StreamingTextPayloadWorker pid=241) in example text str
ray-cast-iron-worker | (StreamingTextPayloadWorker pid=241) got arg1 test and arg2 test2
```




## Streaming Video Workflow

coming soon
Expand All @@ -91,15 +88,9 @@ The configured example processor will simply output the file you just dropped
### Matching files to processors
The processor config `handled_file_glob` configures file extension pattern matching. The matchers should be provided as e.g. `_test.tsv|_updated.csv|.mp3` (no spaces).

The processor config `handled_mimetypes` specifies Tika mimetypes for a processor to match. Its value should be a comma-separated string of mimetypes, e.g. `application/pdf,application/vnd.openxmlformats-officedocument.wordprocessingml.document`
* Note: in order to enable Tika mimetype matching, the environment setting `ENABLE_TIKA` must be set to a truthy value. See the `Settings` section below for details about environment settings.

Files are matched to processors as such: for a single file, checks are made based on processor configurations, one processor at a time.
* The first processor that is found to match the file is used to process the file, and the rest are ignored.
* So if two processors could have each matched a file, the order in which the processors are checked determines which matches and which is ignored.
* One or the other, or both, of `handled_mimetypes` and `handled_file_glob` can be specified for a processor.
* If both are specified, mimetype checking is tried first, then file extension glob if mimetype failed or returned False for that processor.
* Each processor will check both mimetype and file extension glob matching before moving on to the next processor.

## Technology

Expand Down
45 changes: 9 additions & 36 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,22 @@ version: '3.8'

# included for testing against Ray-Worker only
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
healthcheck:
test: ["CMD", "nc", "-vz", "localhost", "2181"]
interval: 10s
timeout: 10s
retries: 3
restart: on-failure

# Message Queues
kafka:
image: wurstmeister/kafka
image: bitnami/kafka:3.4.1
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_CREATE_TOPICS: "greetings:1:1,castiron_etl_config:1:1,castiron_etl_source_file:4:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://kafka:9092,EXTERNAL_PLAINTEXT://localhost:9093"
depends_on: # should be started after
- zookeeper
healthcheck:
test: ["CMD", "nc", "-vz", "localhost", "9092"]
interval: 10s
timeout: 10s
retries: 3
restart: on-failure
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER

# Object Storage
minio:
image: minio/minio
image: minio/minio:RELEASE.2023-10-07T15-07-38Z
environment:
MINIO_ROOT_USER: castiron
MINIO_ROOT_PASSWORD: castiron
Expand All @@ -53,16 +33,12 @@ services:
ports:
- "9001:9001"
- "9000:9000"
depends_on:
kafka:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 10s
timeout: 20s
start_period: 10s
retries: 3
restart: on-failure

ray_cast_iron_worker:
build:
Expand All @@ -84,8 +60,5 @@ services:
volumes:
- ./etl:/app/etl
depends_on:
kafka:
condition: service_healthy
minio:
condition: service_healthy
restart: always
condition: service_healthy
2 changes: 0 additions & 2 deletions etl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class Settings(BaseSettings):
connection_params: Optional[Dict]
client_cert: Optional[str]
client_key: Optional[str]
enable_tika: bool = False
tika_host: str = 'UNSET'

# Ray configs
# [WS] Cast Iron will create (num_s3_workflow_workers * 2) + 2 actors + num_text_streaming_workers text
Expand Down
3 changes: 0 additions & 3 deletions etl/example/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ enabled = true
# A unix-like filename glob pattern indicating which filenames this handler will process
handled_file_glob = ".csv|.CSV"

# A comma-separated list of mimetypes this processor will handle. An empty string disables Tika mimetype checking.
handled_mimetypes = "text/csv"

# Directory relative to this file to accept new files to process. Will be created if absent
inbox_dir = "01_inbox/"

Expand Down
1 change: 0 additions & 1 deletion etl/file_processor_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class FileProcessorConfig(BaseModel):
"""An ETL processor configuration."""
enabled: bool
handled_file_glob: str
handled_mimetypes: Optional[str]
inbox_dir: str
processing_dir: str
archive_dir: Optional[str]
Expand Down
7 changes: 3 additions & 4 deletions etl/general_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import ray
from ray.exceptions import RayTaskError, TaskCancelledError, WorkerCrashedError

from etl.config import settings
from etl.database.database import ClickHouseDatabase
from etl.database.interfaces import (
FileObject, STATUS_CANCELED, STATUS_FAILED, STATUS_PROCESSING, STATUS_QUEUED, STATUS_SUCCESS
Expand Down Expand Up @@ -41,7 +40,6 @@ def __init__(self, toml_processor: TOMLProcessor, task_manager: TaskManager):
self._message_producer = KafkaMessageProducer()
self._object_store = MinioObjectStore()
self._toml_processor = toml_processor
self._rest_client = create_rest_client()
self._database = ClickHouseDatabase()
self._task_params = {}
self._task_manager = task_manager
Expand Down Expand Up @@ -138,8 +136,9 @@ async def _file_put(self, object_id: ObjectId, uuid: str):
for config_object_id, processor in processor_dict.items():
if (
parent(object_id) != get_inbox_path(config_object_id, processor) or not processor_matches(
object_id, config_object_id, processor, self._object_store, self._rest_client, settings.tika_host,
settings.enable_tika
object_id=object_id,
config_object_id=config_object_id,
cfg=processor
)
):
# File isn't in our inbox directory or filename doesn't match our glob pattern
Expand Down
49 changes: 1 addition & 48 deletions etl/path_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,58 +125,11 @@ def glob_matches(object_id: ObjectId, config_object_id: ObjectId, cfg: FileProce
LOGGER.error("Value error during glob matching: %s", value_error)
return False


def mimetype_matches(
object_id: ObjectId, config_object_id: ObjectId, cfg: FileProcessorConfig, object_store: ObjectStore,
rest_client: RestClient, tika_host: str
) -> bool:
"""Checks if the configured mimetype matches the inferred mimetype of the uploaded file"""
if object_id.namespace != config_object_id.namespace:
return False

try:
file_path = str(PurePosixPath(object_id.path).relative_to(parent(config_object_id).path))
file_name = file_path.split('/')[-1]
file_object = object_store.read_object(object_id)
mimetypes = cfg.handled_mimetypes.split(",") if cfg.handled_mimetypes else []

try:
response = rest_client.make_request(
f'{tika_host}/detect/stream',
method='put',
data=file_object,
headers={
'Content-Type': 'application/octet-stream',
'file-name': file_name
}
)
if not response or response.status_code != HTTPStatus.OK:
LOGGER.warning(f'Unexpected response from Tika service: {response.content}')
else:
detected_mimetype = response.text
LOGGER.info(f'Detected mimetype {detected_mimetype}')
# TODO maybe handle more complex mimetype lookups from mime.types file to provide mapping
# TODO from several mimetypes to single file extension types, allowing user to specify file type only.
for mimetype in mimetypes:
if detected_mimetype == mimetype:
return True
except Exception as e:
LOGGER.error('Failed to query Tika service for file mimetype', e)
except Exception as e:
LOGGER.error('Failed to read object from object store', e)

return False


def processor_matches(
object_id: ObjectId, config_object_id: ObjectId, cfg: FileProcessorConfig, object_store: ObjectStore,
rest_client: RestClient, tika_host: str, enable_tika: bool
object_id: ObjectId, config_object_id: ObjectId, cfg: FileProcessorConfig
) -> bool:
matches = False

if enable_tika:
matches = mimetype_matches(object_id, config_object_id, cfg, object_store, rest_client, tika_host)

if not matches:
matches = glob_matches(object_id, config_object_id, cfg)

Expand Down

0 comments on commit ec98ef2

Please sign in to comment.