Ray Worker is build on top of Ray Core, a distributed execution engine to offload compute intensive work as remote functions to remote worker process threads. The primary function of Ray Worker is to associate a TOML config to match a data file to a Python entry processing method, which itself is then executed as a Ray remote function on a cluster.
Ray Worker also uses an Object Store to persist and stage files for processing, as well as a PostGres Database to track file status as it is being processed, in addition, high level file status and progress are also exposed via Kafka as optional integration point.
This Ray Cast-Iron Worker example leverages several Python libraries to accomplish distributed File workflow
- Install Python 3.9, preferably using Pyenv
$ pyenv install
- This project utilizes Poetry for managing python dependencies.
$ curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python
- Install dependencies
$ make install
Run
make up
Run
make clean
Run
make
to view commands available for this project
With the docker containers running and the worker running in either a container or locally
- Navigate to MinIO
http://localhost:9001
, login using cred as configured in project.env - Add
etl/example/example_config.toml
to theetl
bucket - Refresh the page to verify that additional etl buckets are created
- Navigate into
01_inbox
- Add
etl/example/test.csv
- Observe the log for Ray Cast Iron Worker
docker logs -f ray-cast-iron-worker
The configured example processor will simply output the file you just dropped
8. The test.csv file will also be moved to the archive_dir
bucket
The processor config handled_file_glob
configures file extension pattern matching. The matchers should be provided as e.g. _test.tsv|_updated.csv|.mp3
(no spaces).
Files are matched to processors as such: for a single file, checks are made based on processor configurations, one processor at a time.
- The first processor that is found to match the file is used to process the file, and the rest are ignored.
- So if two processors could have each matched a file, the order in which the processors are checked determines which matches and which is ignored.
Toml is used to create configuration files that can be used to tell the worker how to ETL a given file.
An example configuration file can be seen in the example_config.toml
and the example_python_config.toml
.
Several buckets are used as stages in the ETL process. These buckets are defined in the toml config file. The buckets are created i
inbox_dir
processing_dir
archive_dir
error_dir
canceled_dir
The Settings
class allows for the usage of environment variables or a .env
file to supply the appropriate arguments
based on its inheritance of Pyandatic's BaseSettings
class. Additional details on how BaseSettings
works can be
found in the Pydantic documentation.