Skip to content

Universal Luigi ETL pipeline. Validates data received from external sources. Extracts, transforms them and lands.

License

Notifications You must be signed in to change notification settings

Pawsanie/Luigi_ETL

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

41 Commits
 
 
 
 
 
 

Repository files navigation

Luigi ETL pipeline

Disclaimer:

⚠️Using some or all of the elements of this code, You assume responsibility for any consequences!

⚠️The licenses for the technologies on which the code depends are subject to change by their authors.

Description of the pipeline:

The pipeline collects data from sources, in the form of (csv tables / json dictionaries) data, so that in the end:

  • Collects data from external sources to Luigi targets.
  • Data cleansing.
  • Land data to DWH.

Required:

The application code is written in python and obviously depends on it.
Python version 3.6 [Python Software Foundation License / (with) Zero-Clause BSD license (after 3.8.6 version Python)]:

Required Packages:

Luigi [Apache License 2.0]:

Used to Luigi tasks conveyor.

Pandas [BSD-3-Clause license]:

Used to work with tabular data.

NumPy [BSD-3-Clause license]:

Used to bring the table cells to the desired value.

PyArrow [Apache-2.0 license]:

Used to save data in parquet format.

Installing the Required Packages:

pip install luigi
pip install pandas
pip install numpy
pip install pyarrow

Description of tasks:

ExternalData:

Wrappers for data from external sources.

  • Reads datasets in the directory received from the parameter 'external_data_path'.
    ⚠️All paths to partitions inside the root directory of the passed ExternalData must be in the format 'Dataset_Name/YYYY/MM/DD/'.
  • For all partitions where a '_Validate' flag file was found, creates a new '_Validate_Success' flag as Luigi.LocalTarget.

ExtractTask:

  • Reads data from ExternalData by dates.
  • Merges them into one array.
  • If 'drop_list' parameter is not 'None' ('None' as default) Task will drop all columns names in this Luigi.ListParameter.
    Example of 'drop_list' Luigi.ListParameter:
["drop_name", "Delete"]
  • 'extract_file_mask' Luigi.Parameter as output file format and 'external_data_file_mask' as input.

TransformTask:

  • Remove all lines matching the transform_parsing_rules_drop parameter.
    Example of 'transform_parsing_rules_drop' Luigi.DictParameter:
{"column_to_drop": ["False", "NaN", 0]}
  • Rows will be discarded if at least one value matches in ALL keys of transform_parsing_rules_filter.
    Example of 'transform_parsing_rules_filter' Luigi.DictParameter:
{"column_to_filter": ["drop_if_not_in_vip", "drop_too"], "filter_too": ["0"]}
  • And provided that the string does not contain values from the transform_parsing_rules_vip keys.
    Example of 'transform_parsing_rules_vip' Luigi.DictParameter:
{"data_to_save_like_vip": ["vip_value_1", "vip_value_2"], "save_too": ["vip_value_3"]}
  • Has 'date_parameter' Luigi.DateParameter (today as default).
  • 'transform_file_mask' Luigi.Parameter as output file format and 'extract_file_mask' as input.

LoadTask:

  • Landing result data to directory received from the Luigi.Parameter 'load_data_path'.
  • Has 'date_parameter' Luigi.DateParameter (today as default).
  • 'load_file_mask' Luigi.Parameter as output file format and 'transform_file_mask' as input.

Launch:

Launch with 'luigi_config' and Luigi.build:

If you want to use a simple launch by passing Luigi parameters through a configuration file:

  1. Fill the 'luigi_config.cfg' file with correct data.
  2. Then run the script 'luigi_pipeline.py'. Files location:
    ./📂Luigi_ETL
       └── 📁Pipeline
                ├── 📄luigi_pipeline.py
                └── 📁My_Beautiful_Tasks.py
                         └── 📁Configuration
                                  └── 📄luigi_config.cfg

Please note that rows with optional parameters can be removed from the 'luigi_config' if you do not need them.

Example of run script:

python3 -B -m .luigi_pipeline.py

Launch with terminal or command line:

First you need to replace the variable 'build' to variable 'run' in 'Pipeline_launcher.py' script, with removing all the parameters passed to it.
Then you need to clear all parameters in Luigi's task instances that are called in 'luigi_pipeline.py' script.

After that, you can start Luigi by passing parameters through the terminal, or using a 'start_luigi_etl_pipeline.sh' script.

Files location:
./📂Luigi_ETL
   └── 📁Pipeline
            ├── 📄luigi_pipeline.py
            ├── 📄start_luigi_etl_pipeline.sh
            └── 📁My_Beautiful_Tasks
                     └── 📄Pipeline_launcher.py

If Your OS has a bash shell the ETL pipeline can be started using the bash script:

./start_luigi_etl_pipeline.sh

The script contains an example of all the necessary arguments to run.
To launch the pipeline through this script, do not forget to make it executable.

chmod +x ./start_luigi_etl_pipeline.sh

The script can also be run directly with python.
Example of run script:

python3 -B -m luigi_pipeline Load.LoadTask --local-scheduler \
--ExternalData.ExternalData-external-data-path "~/luigi_tasks/ExternalData" \
\
--Extract.ExtractTask-extract-data-path "~/luigi_tasks/ExtractTask" \
--Extract.ExtractTask-extract-file-mask "csv" \
--Extract.ExtractTask-external-data-file-mask "csv" \
--Extract.ExtractTask-drop-list "['column_drop_name', 'column_to_delete']" \
\
--Transform.TransformTask-file-to-transform-path "~/luigi_tasks/TransformTask" \
--Transform.TransformTask-transform-file-mask "json" \
--Transform.TransformTask-transform-parsing-rules-drop "{'column_to_drop': [False, 'NaN', 0]}" \
--Transform.TransformTask-transform-parsing-rules-filter "{'column_to_filter': ['drop_if_not_in_vip', 'drop_too'], 'filter_too': ['0']}" \
--Transform.TransformTask-transform-parsing-rules-vip "{'data_to_save_like_vip': ['vip_value_1, vip_value_2'], 'save_too': ['vip_value_3']}" \
--Transform.TransformTask-date-path-part $(date +%F --date "2022-12-01") \
\
--Load.LoadTask-load-data-path "~/luigi_tasks/LoadTask" \
--Load.LoadTask-load-file-mask "parquet"

The example above shows the launch of all tasks.

Tests:

Tests are embedded inside the pipeline.


Thank you for your interest in my work.

About

Universal Luigi ETL pipeline. Validates data received from external sources. Extracts, transforms them and lands.

Topics

Resources

License

Stars

Watchers

Forks