Skip to content

Commit

Permalink
feat!: Adds support for Airflow 2 Cloud Composer environment and oper…
Browse files Browse the repository at this point in the history
…ators (#134)

* new DAG operator imports for Airflow 2.0+

* support Airflow 2 operators when generating DAGs

* version compatibility checks for DAGs and Airflow environments

* tests to copy pipeline.yaml to dot folder (for airflow version spec)

* tests for version compatibility checks

* revised pipeline YAML template for Airflow 1 compat

* feat!: Upgrade dependencies to Airflow 2.1.1 (#135)

* revised dependendices for Airflow 2.1.1

* use proper variables file as required by Airflow 2

* changed README to use Airflow 2 requirement

* uncommented deploy_dag_versioning check

* feat: Upgrade `usa_names` pipeline to usse Airflow 2 operators and environment (#136)

* default to Airflow 2
  • Loading branch information
adlersantos committed Aug 10, 2021
1 parent d5ef401 commit b2749c6
Show file tree
Hide file tree
Showing 30 changed files with 1,287 additions and 675 deletions.
8 changes: 5 additions & 3 deletions Pipfile
Expand Up @@ -7,7 +7,9 @@ name = "pypi"
beautifulsoup4 = "==4.9.3"

[dev-packages]
apache-airflow = {version = "==1.10.15", extras = ["google"]}
apache-airflow = "==2.1.1"
apache-airflow-providers-google = "*"
apache-airflow-providers-cncf-kubernetes = "*"
black = "==20.8b1"
flake8 = "==3.9.2"
isort = "*"
Expand All @@ -16,8 +18,8 @@ pandas-gbq = "==0.14.1"
pytest-mock = "*"
pytest = "*"
"ruamel.yaml" = "==0.17.10"
Jinja2 = "*"
SQLAlchemy = "==1.3.15"
Jinja2 = "==2.11.3"
SQLAlchemy = "==1.3.18"

[requires]
python_version = "3.8"
1,407 changes: 879 additions & 528 deletions Pipfile.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -8,11 +8,11 @@ Cloud-native, data pipeline architecture for onboarding public datasets to [Data

# Requirements
- Python `>=3.6.10,<3.9`. We currently use `3.8`. For more info, see the [Cloud Composer version list](https://cloud.google.com/composer/docs/concepts/versioning/composer-versions).
- Familiarity with [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html) (>=v1.10.15)
- Familiarity with [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/concepts/index.html) (>=v2.1)
- [pipenv](https://pipenv-fork.readthedocs.io/en/latest/install.html#installing-pipenv) for creating similar Python environments via `Pipfile.lock`
- [gcloud](https://cloud.google.com/sdk/gcloud) command-line tool with Google Cloud Platform credentials configured. Instructions can be found [here](https://cloud.google.com/sdk/docs/initializing).
- [Terraform](https://learn.hashicorp.com/tutorials/terraform/install-cli) `>=v0.15.1`
- [Google Cloud Composer](https://cloud.google.com/composer/docs/concepts/overview) environment running [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html) `>=v1.10.15,<2.0`. To create a new Cloud Composer environment, see [this guide](https://cloud.google.com/composer/docs/how-to/managing/creating).
- [Google Cloud Composer](https://cloud.google.com/composer/docs/concepts/overview) environment running [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html) `>=2.0`. To create a new Cloud Composer environment, see [this guide](https://cloud.google.com/composer/docs/how-to/managing/creating).

# Environment Setup

Expand Down
1 change: 1 addition & 0 deletions datasets/bls/cpsaat18/pipeline.yaml
Expand Up @@ -19,6 +19,7 @@ resources:
description: "Current population survey 18: Employed persons by detailed industry, sex, race, and Hispanic or Latino ethnicity"

dag:
airflow_version: 1
initialize:
dag_id: cpsaat18
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: city_level_cases_and_deaths

dag:
airflow_version: 1
initialize:
dag_id: "city_level_cases_and_deaths"
default_args:
Expand Down
Expand Up @@ -19,6 +19,7 @@ resources:

dag:
initialize:
airflow_version: 1
dag_id: "covid_racial_data_tracker"
default_args:
owner: "Google"
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: national_testing_and_outcomes

dag:
airflow_version: 1
initialize:
dag_id: "national_testing_and_outcomes"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_facility_level_long_term_care

dag:
airflow_version: 1
initialize:
dag_id: "state_facility_level_long_term_care"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_level_aggregate_long_term_care

dag:
airflow_version: 1
initialize:
dag_id: "state_level_aggregate_long_term_care"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_level_cumulative_long_term_care

dag:
airflow_version: 1
initialize:
dag_id: "state_level_cumulative_long_term_care"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_level_current_outbreak_long_term_care

dag:
airflow_version: 1
initialize:
dag_id: "state_level_current_outbreak_long_term_care"
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/covid19_tracking/state_screenshots/pipeline.yaml
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_screenshots

dag:
airflow_version: 1
initialize:
dag_id: "state_screenshots"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_testing_and_outcomes

dag:
airflow_version: 1
initialize:
dag_id: "state_testing_and_outcomes"
default_args:
Expand Down
Expand Up @@ -28,6 +28,7 @@ resources:
description: "This table represents the boundaries of areas surrounding vaccination facilities from which people can reach the facility by walking within predetermined time periods."

dag:
airflow_version: 1
initialize:
dag_id: vaccination_access_to_bq
default_args:
Expand Down
Expand Up @@ -122,6 +122,7 @@ resources:
]
dag:
airflow_version: 1
initialize:
dag_id: covid19_vaccination_search_insights
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/google_dei/diversity_annual_report/pipeline.yaml
Expand Up @@ -65,6 +65,7 @@ resources:
deletion_protection: False

dag:
airflow_version: 1
initialize:
dag_id: diversity_annual_report
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/google_trends/top_terms/pipeline.yaml
Expand Up @@ -93,6 +93,7 @@ resources:
]
dag:
airflow_version: 1
initialize:
dag_id: top_terms
default_args:
Expand Down
Expand Up @@ -36,6 +36,7 @@ dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
airflow_version: 1
initialize:
dag_id: 2020_sales_train
default_args:
Expand Down
Expand Up @@ -36,6 +36,7 @@ dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
airflow_version: 1
initialize:
dag_id: 2021_sales_predict
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/ml_datasets/penguins/pipeline.yaml
Expand Up @@ -36,6 +36,7 @@ dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
airflow_version: 1
initialize:
dag_id: penguins
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/usa_names/usa_1910_current/pipeline.yaml
Expand Up @@ -22,6 +22,7 @@ resources:
source: http://www.ssa.gov/OACT/babynames/limits.html
dag:
airflow_version: 2
initialize:
dag_id: "usa_1910_current"
default_args:
Expand Down
8 changes: 4 additions & 4 deletions datasets/usa_names/usa_1910_current/usa_1910_current_dag.py
Expand Up @@ -14,8 +14,8 @@


from airflow import DAG
from airflow.contrib.operators import gcs_to_bq
from airflow.operators import bash_operator
from airflow.operators import bash
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
"owner": "Google",
Expand All @@ -34,7 +34,7 @@
) as dag:

# Task to copy `namesbystate.zip` from Social Security Administration to GCS
download_and_process_source_zip_file = bash_operator.BashOperator(
download_and_process_source_zip_file = bash.BashOperator(
task_id="download_and_process_source_zip_file",
bash_command="mkdir -p $data_dir/{{ ds }}\ncurl -o $data_dir/{{ ds }}/namesbystate.zip -L $zip_source_url\nunzip $data_dir/{{ ds }}/namesbystate.zip -d $data_dir/{{ ds }}\ncat $data_dir/{{ ds }}/*.TXT \u003e\u003e $data_dir/{{ ds }}/data.csv\n",
env={
Expand All @@ -44,7 +44,7 @@
)

# Task to load the data from Airflow data folder to BigQuery
load_csv_file_to_bq_table = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
load_csv_file_to_bq_table = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_csv_file_to_bq_table",
bucket="{{ var.json.shared.composer_bucket }}",
source_objects=["data/usa_names/usa_1910_current/{{ ds }}/data.csv"],
Expand Down
1 change: 1 addition & 0 deletions datasets/vizgen_merfish/mouse_brain_map/pipeline.yaml
Expand Up @@ -19,6 +19,7 @@ dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
airflow_version: 1
initialize:
dag_id: mouse_brain_map
default_args:
Expand Down

0 comments on commit b2749c6

Please sign in to comment.