Skip to content

To provide an introduction and demos in Google Cloud Platform Airflow Composer

License

Notifications You must be signed in to change notification settings

hilsdsg3/Google_Cloud_Platform_Composer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Goals of this repo :

1. Introduce GCP Composer which is a data job scheduler
2. Demo GCP Composer




Google Cloud Composer


Table of Contents

Composer_Introduction

Cloud composer is workflow management system that you create, schedule, and monitor data pipelines that utilize the cloud and data centers. Composer is built on a service called Airflow which uses certain workflow configurations. For more descrition see the google documentation.


Composer_Flow

Differences

Composer_Operations


Running a sample DAG


1. Creating a Directed Acyclic Graph (DAG) using Python

Python_bash.py

Import statements

import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

Setting a variable for yesterday so we can be sure our DAG runs when we upload the Python file

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())
default_dag_args = {'start_date': yesterday}

Setting the interval at 1 day to run the DAG

with models.DAG(
        'running_python_and_bash_operator',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Any operators like the following will be added to the DAG object

# Python function
hello_world_greeting = python_operator.PythonOperator(
    task_id='python_1',
    python_callable=hello_world)

# Python function
sales_greeting = python_operator.PythonOperator(
    task_id='python_2',
    python_callable=greeting)

# Initiation of the bash operator
bash_greeting = bash_operator.BashOperator(
    task_id='bye_bash',
    bash_command='echo Goodbye! Hope to see you soon.')

Task order

hello_world_greeting >> sales_greeting >> bash_greeting
2. Upload the python script to the preformed bucket

3. Open the Airflow console through the GCP Composer view
4. Airflow overview and detail screens
Note : Airflow has a slider button on the home page that indicates whether the DAG is active or non-active. When using a test DAG to avoid GCP charges, it is advisable to have the trigger off by turning the button to off status.



Configuring Trigger rules

The following code sets the retry statement at 1 and the retry delay at 2min

default_dag_args = {
    'start_date': yesterday,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=2)}

Also when triggered, we forced a ValueError to see if we get the proper response from running the DAG.

def hello_world():
    raise ValueError('Oops! something went wrong.')
    print('Hello World!')
    return 1

The DAG ran as expected and gave us an error.

A triggering rule can also be set with the bash operator

bash_greeting = bash_operator.BashOperator(
    task_id='bye_bash',
    bash_command='echo Goodbye! Hope to see you soon.',
    trigger_rule=trigger_rule.TriggerRule.ONE_FAILED)

Dummy Operator

In python_dash_dummy.py, an additional dummy statement is added for branching. Because most branching has a join operation, the dummy statement fills one of legs when branching.

# Addiotions to the python code
def makeBranchChoice():
        x = random.randint(1, 5)
        if(x <= 2):
            return 'hello'
        else:
            return 'dummy'  
    run_this_first = dummy_operator.DummyOperator(
        task_id='run_this_first')
    branching = python_operator.BranchPythonOperator(
        task_id='branching',
        python_callable=makeBranchChoice)
    run_this_first >> branching
    sales_greeting = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)
    dummy_followed_python = dummy_operator.DummyOperator(
        task_id='follow_python')
    dummy = dummy_operator.DummyOperator(
        task_id='dummy')
    bash_greeting = bash_operator.BashOperator(
        task_id='bye_bash',
        bash_command='echo Goodbye! Hope to see you soon.',
        trigger_rule='one_success'
    )

The above code generated a path with a dummy operator. In the if-else condition, the x > 2 so it followed the dummy path. If a dummy path was not programmed the DAG would have failed.

About

To provide an introduction and demos in Google Cloud Platform Airflow Composer

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages