Skip to content

Build ETL piplines on AirFlow to load data from BigQuery and store it in MySQL

Notifications You must be signed in to change notification settings

angelxd84130/Airflow-ETL

Repository files navigation

Contributors Forks Stargazers Issues LinkedIn


AirFlow-ETL

ETL pipline build on AirFlow
· Report Bug · Request Feature

Table of Contents
  1. About The Project
  2. Getting Started
  3. Usage
  4. Roadmap
  5. Contact
  6. Acknowledgements

About The Project

The goal is building a ETL pipline on AirFlow to auto get data from Google BigQuery and storage in the local database MySQL after process.

Here's why:

  • AirFlow scheduling system can start ETL pipeline at a fixed time
  • Re-run program in fixed times when meeting operational errors
  • Quickly check running results and notice errors on the AirFlow Dags panel
  • Simply research running records through the log file system
  • Obtain and store the required data after the ETL process

Structure

AirFlow-ETL
The whole architecture consists of two independent systems, one is AirFlow-ETL and the other is a Real-time Dashboard.
This system runs the ETL program regularly through the AirFlow system, uses the JSON key to obtain event tracking data from Google BigQuery, converts the data, and stores it in the local database MySQL for access by another dashboard system.

ETL

Every 10 minutes, the transaction data of the day is requested from BigQuery, classifying the transaction results and error codes, and then updating data in MySQL.

Extract

Using JSON key accesses tables on BigQuery, and load daily data through SQL.
Since the database time zone on BigQuery is different from the local one, the UTC+8 time zone problem must be dealt with before using SQL to retrieve data.

# bigquery_eventtracking_regular_report.py
startTime = (datetime.now() + timedelta(days=-n-1)).strftime('%Y-%m-%d') + 'T16:00:00'
endTime = (datetime.now() + timedelta(days=-n)).strftime('%Y-%m-%d') + 'T16:00:00'

Transform

Store the obtained data as a DataFrame, and then add columns after procession.

# parse.py
df['event_date'] = eventTime
df['error_rate'] = df['sum_of_error'] / sum(df['sum_of_error'])

Load

Insert/ Update transformed data on MySQL through SQL.

  • If the row data already in the db -> update the value.
  • Else -> Insert the row data.
# storage.py

logging.info('searching exist in db..' + error_code)
sql = f"""
		SELECT * From {table} 
		WHERE event_date="{eventTime}" and error_code="{error_code}" and sport_code="{sport_code}"; 
	"""
cur.execute(sql)

if(cur.fetchone()):
    logging.info('updating data..')
    sql = f""" 
		UPDATE {table} SET sum_of_error = {sum_of_error} 
		WHERE event_date = "{eventTime}" and error_code = "{error_code}" and sport_code="{sport_code}";
	""" 
else:
    logging.info('inserting data..')
    sql = f"""
		INSERT INTO {table} (event_date, sport_code, error_code, sum_of_error)
		VALUES ("{eventTime}", "{sport_code}", "{error_code}", {sum_of_error}) ;
        """ 
cur.execute(sql)
conn.commit()

Evaluation

  1. Check whether the dag file runs every 10 mins
    AirFlow Dag Panel
    AirFlow-Dag

  2. Evaluate the data is written in database correctly
    Table : sport_transaction_error
    sport_transaction_error
    Table : sport_transaction_result
    sport_transaction_result

Built With

Getting Started

Download the whole project except this README file and the pic folder, and move the project under the path: /airflow/dags/

Prerequisites

  1. Install AirFlow and set up the Panel
  2. Replace BigQuery JSON key and table name to your own.
    # _query.py
    credential_path = "/home/albert/airflow/dags/bigquery_eventtracking_regular_report_module/sg-prod-readonly-303206-cb8365379fd6.json"  
  3. Install a local database MySQL & Create tables
  4. Create a virtual enviroment for airflow test tasks
    source airflow_venv/bin/activate
    
  5. Clone the project into the enviroment and check if it's runnable
    • Check whether the dag file is readable for the airflow dag list
    airflow dags list
    
    • Check whether the dag tasks is detectable for the task list
    airflow tasks list <dag_id>
    
    • Check whether the tasks in the dag are runnable
    airflow tasks test <dag_id> <task_id> <start_time>
    
    (time formate ex. yyyy-mm-dd)
  6. Close the airflow virtul evniroment
    deactivate  
    
  7. Clone the project again into the real enviroment under /airflow/dags
  8. Check whether the dag show up on the AirFlow Dag Panel

Usage

The structure is workable for every ETL process,
and the AirFlow system helps to centrally manage all tasks and instantly detect errors in operation.

Roadmap

See the open issues for a list of proposed features (and known issues).

Contact

Yu-Chieh Wang - LinkedIn
email: angelxd84130@gmail.com

Acknowledgements

About

Build ETL piplines on AirFlow to load data from BigQuery and store it in MySQL

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages