Create Airflow DAGs and orchestrate dependencies between Operators can be somewhat repetitive and not easy for users without a programming background. Airglue's configuration driven approach makes this easy to manage without needing to write any Python at all.
- Design Concept
- Key Features
- Getting Started
- TODO
- Additional Examples
- Credits
By defining a YAML config such as the one in this example, you would be able to dynamically create a complex DAG flow like this
With powerful templating & rendering
And producing output to these tables in BigQuery
For more details on exactly how this works, see Example DAG and DAG Configuration Explained
- User can create DAGs and manage Operator dependencies through a simple YAML file
- Plugin based architecture allows components such as custom Operators and Operator Factory classes to be added which helps eliminate most duplication
- Environment, Airflow and static variables can all be defined in the YAML and made available through
{{ params }}
ready to be used in the Airflow templating context
The following instructions will get Airglue up and running in Docker on your laptop (Mac or Linux). The whole process should only take around 10-15 minutes depending on your internet connection speed.
- Have Docker Desktop installed
- Have the gcloud sdk installed
- Have a GCP default application credential generated under
~/.config/gcloud/application_default_credentials.json
. - Create a service account on your GCP sandbox project and call it
airglue@<project_id>.iam.gserviceaccount.com
, generate a JSON key and save to Secret Manager as a new secret with a nameairflow-connections-gcp_airglue
. See The Secret Manager Backend for more details on how to generate the secret value.
The reason service account impersonation is not used is due to there is no support on Airflow V1 to the built-in operators. It is important to rotate the key regularly when using service account keys. Airflow V2 made this available via the
impersonation_chain
argument. See https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html
make build
Go to localhost:8082
You should how see the example DAG and to get started, see the example dag here.
It is quicker to start up Airglue without re-building each time
make up
make down
make restart
make logs
make exec
There is an example DAG created in this repository called example_glue. It is designed to be a show case
DAG that demos a lot of supported features in Airglue.
In order for this DAG to work, the airglue@<project_id>.iam.gserviceaccount.com
service account must be granted the following roles
- Project Level: BigQuery Data Editor and BigQuery Job User
- Bucket Level: Storage Object Admin on the bucket defined below using the environment variable
AIRGLUE_EXAMPLE_BUCKET_NAME
And the make
command must be executed with the following variables
AIRGLUE_SANDBOX_PROJECT_ID
: A GCP project id for your sandbox environment.AIRGLUE_EXAMPLE_BUCKET_NAME
: An example GCS bucket used to persist data. The service account must be able to write into this bucket.
And to run the example DAG use something like
make build AIRGLUE_SANDBOX_PROJECT_ID={replace me} AIRGLUE_EXAMPLE_BUCKET_NAME={replace me}
enabled: { optional<true|false>. Enable or disable the DAG, default is true if not specified. Disabled DAGs won't show up in the Airflow GUI }
schedule_interval: { required<cron expression>. A valid CRON expression used to setup DAG schedule, if no schedule is required, set to `null`. i.e. "0 2 * * *" or null. You may use https://crontab.guru/ to validate your CRON expression }
timezone: { required<timezone>. Timezone for the schedule. I.e. "Europe/London". Local time is always preferred so that processing can be done exactly at the right time for the correct data boundaries }
description: { optional<string with markdown support>. A description for the DAG }
params: { optional<key:value>. Static key value pairs to be loaded and made available as {{ params }} in any context in Airflow with Jinja templating enabled }
# i.e.
# default_dataset: airglue_example can be referenced by {{ params.default_dataset }}
envs: { optional<list>. A list of environment variables to be loaded and made available as {{ params.envs.<name> }}, undefined ones will be omitted }
# i.e. the following can be referenced by {{ params.envs.AIRGLUE_SANDBOX_PROJECT_ID }}
# - AIRGLUE_SANDBOX_PROJECT_ID
vars: { optional<list>. It is possible to access airflow variables by using {{ var.value.get('my.var', 'fallback') }}, but you can also make them available under {{ params.vars.<name> }} to be more explicit for the DAG by defining them here, undefined ones will be omitted }
# i.e. the following can be referenced by {{ params.vars.example_bucket_name }}
# - example_bucket_name
To to the Example DAG to see a working version of this.
- identifier: { required<string>. Identifier for the task, must be unique within a DAG }
operator: { required<string>. Fully qualified airflow operator name, i.e. airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator }
operator_factory: { optional<string>. Fully qualified operator factory name, i.e. airglue.contrib.operator_factory.default.DefaultOperatorFactory, but if `DefaultOperatorFactory` is all that is required, this arguments can be omitted }
arguments: { optional<key:value>. Key value pairs translated into Airflow Operator arguments. }
# i.e. source_format: "NEWLINE_DELIMITED_JSON"
description: { optional<string with markdown support>. A description for the task }
dependencies: { optional<list>. A list of task identifiers this task depends on, used to create the link between Airflow Operators within a DAG }
# i.e.
# - task_identifier_1
# - task_identifier_2
To to the Example DAG to see a working version of this.
Under airglue/contrib/operator, custom Airflow Operators can be defined if Airflow does not have one to fulfill the requirements or you simply prefer to write your own so that there is no limit to what Airglue can execute.
See airglue/contrib/operator/bigquery/query_runner.py as an example.
Operator Factory is a concept introduced to make the configuration file more compact and easier to read, it can also be used to define common abstractions so that the configuration file becomes smaller hence much quicker to add new integrations.
Operator Factories can be added to airglue/contrib/operator_factory and by default the default Operator Factory is used.
Important: requires Airflow version >=1.10.10
Important: your user group must have been given theSecret Manager Secret Accessor
role in order to access the secret
There are situations when using a secret (i.e. in Airflow Connections or Variables) is not avoidable. Airglue uses Secret Manager as the secret backend so that you do not have to store any credentials (such as service account keys) inside Airflow / Cloud Composer. This makes storing secret more secure and easy to manage. For example, you can setup key rotation automation and it only has to be done in one place which is secret manager.
The Secret Manager Backend can be enabled by supplying the following Airflow environment variables
AIRFLOW__SECRETS__BACKEND: "airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS: '{"project_id": "${AIRGLUE_SANDBOX_PROJECT_ID}", ...}'
On the Local Docker Composer environment, this has been done in docker-composer.yaml. Because secrets stored in Secret Manager has to be a valid URI. I.e. for a Airflow GCP connection, the secret value has to be in the format of
google-cloud-platform://?extra__google_cloud_platform__project={project_id}&extra__google_cloud_platform__keyfile_dict={quoted_key}"
To make this easier to generate, we've created a collection of tools under tools/secret_value_creator which can be used to generate the URI.
For more information on exactly how to use this, see Alternative secrets backend
In order to make the setup compatible with Cloud Composer as much as possible, we've created a separate release for each Composer version under Local Docker Release.
There is a default one specified in the Makefile
which is the latest supported version by Cloud Composer, but it can be overwritten by passing in AIRGLUE_COMPOSER_AIRFLOW_VERSION
when running any make
command.
I.e.
make build AIRGLUE_COMPOSER_AIRFLOW_VERSION=1.10.10
We aim to align the Airflow support to the latest 3 versions on Cloud Composer. Any older versions will still be available in the repository but will no longer be supported. See Composer versions to check what are the latest versions.
- Windows user support
This example shows how to run a Batch Java Dataflow job using a dedicated DataFlow controller service account.
To use the Example Dataflow Runner,
- Create the
dataflow-controller
service account and assign the following roles, see more about controller service account here- Project level: Dataflow Admin
- Project level: Dataflow Worker
- Bucket Level: Storage Object Admin
- Grant the
airglue
service account the following roles- Project level: Dataflow Admin
- Service Account Level: Allow the
airglue
service account toact as
dataflow-controllerby granting it the
Service Account User` role.
- Upload the bundled jar to
gs://{{ params.vars.AIRGLUE_EXAMPLE_BUCKET_NAME }}/jars/
This application uses Open Source components. You can find the source code of their open source projects along with license information below. We acknowledge and are grateful to these developers for their contributions to open source.
Project: DOP https://github.com/teamdatatonic/dop
Copyright (c) 2021 Datatonic
License MIT https://github.com/teamdatatonic/dop/blob/master/LICENSE.md