Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Adds support for Airflow 2 Cloud Composer environment and operators #134

Merged
merged 8 commits into from Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions Pipfile
Expand Up @@ -7,7 +7,9 @@ name = "pypi"
beautifulsoup4 = "==4.9.3"

[dev-packages]
apache-airflow = {version = "==1.10.15", extras = ["google"]}
apache-airflow = "==2.1.1"
apache-airflow-providers-google = "*"
apache-airflow-providers-cncf-kubernetes = "*"
black = "==20.8b1"
flake8 = "==3.9.2"
isort = "*"
Expand All @@ -16,8 +18,8 @@ pandas-gbq = "==0.14.1"
pytest-mock = "*"
pytest = "*"
"ruamel.yaml" = "==0.17.10"
Jinja2 = "*"
SQLAlchemy = "==1.3.15"
Jinja2 = "==2.11.3"
SQLAlchemy = "==1.3.18"

[requires]
python_version = "3.8"
1,407 changes: 879 additions & 528 deletions Pipfile.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -8,11 +8,11 @@ Cloud-native, data pipeline architecture for onboarding public datasets to [Data

# Requirements
- Python `>=3.6.10,<3.9`. We currently use `3.8`. For more info, see the [Cloud Composer version list](https://cloud.google.com/composer/docs/concepts/versioning/composer-versions).
- Familiarity with [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html) (>=v1.10.15)
- Familiarity with [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/concepts/index.html) (>=v2.1)
- [pipenv](https://pipenv-fork.readthedocs.io/en/latest/install.html#installing-pipenv) for creating similar Python environments via `Pipfile.lock`
- [gcloud](https://cloud.google.com/sdk/gcloud) command-line tool with Google Cloud Platform credentials configured. Instructions can be found [here](https://cloud.google.com/sdk/docs/initializing).
- [Terraform](https://learn.hashicorp.com/tutorials/terraform/install-cli) `>=v0.15.1`
- [Google Cloud Composer](https://cloud.google.com/composer/docs/concepts/overview) environment running [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html) `>=v1.10.15,<2.0`. To create a new Cloud Composer environment, see [this guide](https://cloud.google.com/composer/docs/how-to/managing/creating).
- [Google Cloud Composer](https://cloud.google.com/composer/docs/concepts/overview) environment running [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html) `>=2.0`. To create a new Cloud Composer environment, see [this guide](https://cloud.google.com/composer/docs/how-to/managing/creating).

# Environment Setup

Expand Down
1 change: 1 addition & 0 deletions datasets/bls/cpsaat18/pipeline.yaml
Expand Up @@ -19,6 +19,7 @@ resources:
description: "Current population survey 18: Employed persons by detailed industry, sex, race, and Hispanic or Latino ethnicity"

dag:
airflow_version: 1
initialize:
dag_id: cpsaat18
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: city_level_cases_and_deaths

dag:
airflow_version: 1
initialize:
dag_id: "city_level_cases_and_deaths"
default_args:
Expand Down
Expand Up @@ -19,6 +19,7 @@ resources:

dag:
initialize:
airflow_version: 1
dag_id: "covid_racial_data_tracker"
default_args:
owner: "Google"
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: national_testing_and_outcomes

dag:
airflow_version: 1
initialize:
dag_id: "national_testing_and_outcomes"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_facility_level_long_term_care

dag:
airflow_version: 1
initialize:
dag_id: "state_facility_level_long_term_care"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_level_aggregate_long_term_care

dag:
airflow_version: 1
initialize:
dag_id: "state_level_aggregate_long_term_care"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_level_cumulative_long_term_care

dag:
airflow_version: 1
initialize:
dag_id: "state_level_cumulative_long_term_care"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_level_current_outbreak_long_term_care

dag:
airflow_version: 1
initialize:
dag_id: "state_level_current_outbreak_long_term_care"
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/covid19_tracking/state_screenshots/pipeline.yaml
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_screenshots

dag:
airflow_version: 1
initialize:
dag_id: "state_screenshots"
default_args:
Expand Down
Expand Up @@ -18,6 +18,7 @@ resources:
table_id: state_testing_and_outcomes

dag:
airflow_version: 1
initialize:
dag_id: "state_testing_and_outcomes"
default_args:
Expand Down
Expand Up @@ -28,6 +28,7 @@ resources:
description: "This table represents the boundaries of areas surrounding vaccination facilities from which people can reach the facility by walking within predetermined time periods."

dag:
airflow_version: 1
initialize:
dag_id: vaccination_access_to_bq
default_args:
Expand Down
Expand Up @@ -122,6 +122,7 @@ resources:
]

dag:
airflow_version: 1
initialize:
dag_id: covid19_vaccination_search_insights
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/google_dei/diversity_annual_report/pipeline.yaml
Expand Up @@ -65,6 +65,7 @@ resources:
deletion_protection: False

dag:
airflow_version: 1
initialize:
dag_id: diversity_annual_report
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/google_trends/top_terms/pipeline.yaml
Expand Up @@ -93,6 +93,7 @@ resources:
]

dag:
airflow_version: 1
initialize:
dag_id: top_terms
default_args:
Expand Down
Expand Up @@ -36,6 +36,7 @@ dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
airflow_version: 1
initialize:
dag_id: 2020_sales_train
default_args:
Expand Down
Expand Up @@ -36,6 +36,7 @@ dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
airflow_version: 1
initialize:
dag_id: 2021_sales_predict
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/ml_datasets/penguins/pipeline.yaml
Expand Up @@ -36,6 +36,7 @@ dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
airflow_version: 1
initialize:
dag_id: penguins
default_args:
Expand Down
1 change: 1 addition & 0 deletions datasets/usa_names/usa_1910_current/pipeline.yaml
Expand Up @@ -22,6 +22,7 @@ resources:
source: http://www.ssa.gov/OACT/babynames/limits.html

dag:
airflow_version: 2
adlersantos marked this conversation as resolved.
Show resolved Hide resolved
initialize:
dag_id: "usa_1910_current"
default_args:
Expand Down
8 changes: 4 additions & 4 deletions datasets/usa_names/usa_1910_current/usa_1910_current_dag.py
Expand Up @@ -14,8 +14,8 @@


from airflow import DAG
from airflow.contrib.operators import gcs_to_bq
from airflow.operators import bash_operator
from airflow.operators import bash
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
"owner": "Google",
Expand All @@ -34,7 +34,7 @@
) as dag:

# Task to copy `namesbystate.zip` from Social Security Administration to GCS
download_and_process_source_zip_file = bash_operator.BashOperator(
download_and_process_source_zip_file = bash.BashOperator(
task_id="download_and_process_source_zip_file",
bash_command="mkdir -p $data_dir/{{ ds }}\ncurl -o $data_dir/{{ ds }}/namesbystate.zip -L $zip_source_url\nunzip $data_dir/{{ ds }}/namesbystate.zip -d $data_dir/{{ ds }}\ncat $data_dir/{{ ds }}/*.TXT \u003e\u003e $data_dir/{{ ds }}/data.csv\n",
env={
Expand All @@ -44,7 +44,7 @@
)

# Task to load the data from Airflow data folder to BigQuery
load_csv_file_to_bq_table = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
load_csv_file_to_bq_table = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_csv_file_to_bq_table",
bucket="{{ var.json.shared.composer_bucket }}",
source_objects=["data/usa_names/usa_1910_current/{{ ds }}/data.csv"],
Expand Down
1 change: 1 addition & 0 deletions datasets/vizgen_merfish/mouse_brain_map/pipeline.yaml
Expand Up @@ -19,6 +19,7 @@ dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
airflow_version: 1
initialize:
dag_id: mouse_brain_map
default_args:
Expand Down