Skip to content

Commit

Permalink
chore: Use Airflow's AIRFLOW_VAR_* feature for shared variables (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
adlersantos committed Sep 16, 2021
1 parent d7d1200 commit b02f47a
Show file tree
Hide file tree
Showing 76 changed files with 212 additions and 208 deletions.
26 changes: 15 additions & 11 deletions README.md
Expand Up @@ -58,7 +58,7 @@ Use only underscores and alpha-numeric characters for the names.

If you created a new dataset directory above, you need to create a `datasets/DATASET/dataset.yaml` config file. See this [section](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/README.md#yaml-config-reference) for the `dataset.yaml` reference.

Create a `datasets/DATASET/PIPELINE/pipeline.yaml` config file for your pipeline. See [here](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/) for the `pipeline.yaml` references.
Create a `datasets/DATASET/PIPELINE/pipeline.yaml` config file for your pipeline. See [here](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/) for the `pipeline.yaml` references.

For a YAML config template using Airflow 1.10 operators, see [`samples/pipeline.airflow1.yaml`](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/pipeline.airflow1.yaml).

Expand Down Expand Up @@ -151,9 +151,12 @@ Docker images will be built and pushed to GCR by default whenever the command ab

Running the command in the previous step will parse your pipeline config and inform you about the Airflow variables that your pipeline expects to use and must be defined.

If your pipeline doesn't use any Airflow variables, you can skip this step.
If your pipeline doesn't use any Airflow variables, you can skip this step.

There are two types of variables that pipelines can use: **shared** and **dataset-specific**. Shared variables are those that can be reused by other pipelines in the same Airflow or Cloud Composer environment. These are variables that stay constant from pipeline to pipeline. Examples of shared variables include your Cloud Composer environment name and bucket, your GCP project ID, and paths to the Airflow DAG and data folders. To prevent duplication, specify your shared variables in one place:
There are two types of variables that pipelines can use: **shared** and **dataset-specific**. Shared variables are those that can be reused by other pipelines in the same Airflow or Cloud Composer environment. These variables will have the same values for any pipeline. Examples of shared variables include your Cloud Composer environment name and bucket, your GCP project ID, and paths to the Airflow DAG and data folders (e.g. `/home/airflow/gcs/data`). To specify your shared variables, you can either

* Store the variables as Cloud Composer environment variables [using Airflow's built-in `AIRFLOW_VAR_*` behavior](https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html#storing-variables-in-environment-variables). (Preferred)
* or, use a single `shared_variables.json` file by creating it under

```
[.dev|.test]/datasets/shared_variables.json
Expand All @@ -171,25 +174,26 @@ and inside the file, nest the variables under a common parent key. For example:
}
```

For dataset-specific variables, create the following file
Another type of variable is dataset-specific variables. To make use of dataset-specific variables, create the following file

```
[.dev|.test]/datasets/{DATASET}/{DATASET}_variables.json
```

In general, pipelines use the JSON dot notation to access Airflow variables. Make sure to define and nest your variables under some parent key when writing to the JSON files above. We recommend using your dataset's name as the parent key, to mimic the same structure as the folder hierarchy. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config:
In general, pipelines use the JSON dot notation to access Airflow variables. Make sure to define and nest your variables under some parent key when writing to the JSON files above. We recommend using your dataset's name as the parent key, to mimic the same structure as the folder hierarchy in the Composer's GCS bucket. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config:

- `{{ var.json.shared.composer_bucket }}`
- `{{ var.json.parent.nested }}`
- `{{ var.json.parent.another_nested }}`
- `{{ var.json.namespace.nested }}`
- `{{ var.json.namespace.some_key.nested_twice }}`

then your variables JSON file should look like this

```json
{
"parent": {
"namespace": {
"nested": "some value",
"another_nested": "another value"
"some_key": {
"nested_twice": "another value"
}
}
}

Expand Down Expand Up @@ -234,7 +238,7 @@ The `samples` folder contains references for the YAML config files, complete wit
# Best Practices

- When your tabular data contains percentage values, represent them as floats between 0 to 1.
- To represent hierarchical data in BigQuery, use either:
- To represent hierarchical data in BigQuery, use either:
- (Recommended) Nested columns in BigQuery. For more info, see [the documentation on nested and repeated columns](https://cloud.google.com/bigquery/docs/nested-repeated).
- Or, represent each level as a separate column. For example, if you have the following hierarchy: `chapter > section > subsection`, then represent them as

Expand Down
Expand Up @@ -60,7 +60,7 @@
"SOURCE_URL": "https://data.austintexas.gov/api/views/qd73-bsdg/rows.csv",
"SOURCE_FILE": "files/data.csv",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/austin_bikeshare/bikeshare_stations/data_output.csv",
"PIPELINE_NAME": "bikeshare_stations",
"CSV_HEADERS": '["station_id","name","status","address","alternate_name","city_asset_number","property_type","number_of_docks","power_type","footprint_length","footprint_width","notes","council_district","modified_date"]',
Expand All @@ -73,7 +73,7 @@
load_austin_bikeshare_stations_to_bq = (
gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id="load_austin_bikeshare_stations_to_bq",
bucket="{{ var.json.shared.composer_bucket }}",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/austin_bikeshare/bikeshare_stations/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="austin_bikeshare.bikeshare_stations",
Expand Down
4 changes: 2 additions & 2 deletions datasets/austin_bikeshare/bikeshare_stations/pipeline.yaml
Expand Up @@ -73,7 +73,7 @@ dag:
SOURCE_URL: "https://data.austintexas.gov/api/views/qd73-bsdg/rows.csv"
SOURCE_FILE: "files/data.csv"
TARGET_FILE: "files/data_output.csv"
TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/austin_bikeshare/bikeshare_stations/data_output.csv"
PIPELINE_NAME: "bikeshare_stations"
CSV_HEADERS: >-
Expand All @@ -93,7 +93,7 @@ dag:
task_id: "load_austin_bikeshare_stations_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.json.shared.composer_bucket }}"
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/austin_bikeshare/bikeshare_stations/data_output.csv"]
Expand Down
Expand Up @@ -61,7 +61,7 @@
"SOURCE_URL": "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-province/dpc-covid19-ita-province.csv",
"SOURCE_FILE": "files/data.csv",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/covid19_italy/data_by_province/data_output.csv",
"CSV_HEADERS": '["date","country","region_code","region_name","province_code","province_name","province_abbreviation","latitude","longitude","location_geom","confirmed_cases","note"]',
"RENAME_MAPPINGS": '{"data": "date","stato": "country","codice_regione": "region_code","denominazione_regione": "region_name","lat": "latitude","long": "longitude","codice_provincia": "province_code","denominazione_provincia": "province_name","sigla_provincia": "province_abbreviation","totale_casi": "confirmed_cases","note": "note"}',
Expand All @@ -73,7 +73,7 @@
# Task to load CSV data to a BigQuery table
load_data_by_province_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id="load_data_by_province_to_bq",
bucket="{{ var.json.shared.composer_bucket }}",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/covid19_italy/data_by_province/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="covid19_italy.data_by_province",
Expand Down
4 changes: 2 additions & 2 deletions datasets/covid19_italy/data_by_province/pipeline.yaml
Expand Up @@ -78,7 +78,7 @@ dag:
SOURCE_URL: "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-province/dpc-covid19-ita-province.csv"
SOURCE_FILE: "files/data.csv"
TARGET_FILE: "files/data_output.csv"
TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/covid19_italy/data_by_province/data_output.csv"
CSV_HEADERS: >-
["date","country","region_code","region_name","province_code","province_name","province_abbreviation","latitude","longitude","location_geom","confirmed_cases","note"]
Expand All @@ -98,7 +98,7 @@ dag:
task_id: "load_data_by_province_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.json.shared.composer_bucket }}"
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/covid19_italy/data_by_province/data_output.csv"]
Expand Down
4 changes: 2 additions & 2 deletions datasets/covid19_italy/data_by_region/data_by_region_dag.py
Expand Up @@ -61,7 +61,7 @@
"SOURCE_URL": "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv",
"SOURCE_FILE": "files/data.csv",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/covid19_italy/data_by_region/data_output.csv",
"CSV_HEADERS": '["date","country","region_code","region_name","latitude","longitude","location_geom","hospitalized_patients_symptoms","hospitalized_patients_intensive_care","total_hospitalized_patients","home_confinement_cases","total_current_confirmed_cases","new_current_confirmed_cases","new_total_confirmed_cases","recovered","deaths","total_confirmed_cases","tests_performed","note"]',
"RENAME_MAPPINGS": '{"data": "date","stato": "country","codice_regione": "region_code","denominazione_regione": "region_name","lat": "latitude","long": "longitude","ricoverati_con_sintomi": "hospitalized_patients_symptoms","terapia_intensiva": "hospitalized_patients_intensive_care","totale_ospedalizzati": "total_hospitalized_patients","isolamento_domiciliare": "home_confinement_cases","totale_positivi": "total_current_confirmed_cases","variazione_totale_positivi": "new_current_confirmed_cases","nuovi_positivi": "new_total_confirmed_cases","note": "note","dimessi_guariti": "recovered","totale_casi": "total_confirmed_cases","tamponi": "tests_performed","deceduti": "deaths"}',
Expand All @@ -73,7 +73,7 @@
# Task to load CSV data to a BigQuery table
load_data_by_region_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id="load_data_by_region_to_bq",
bucket="{{ var.json.shared.composer_bucket }}",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/covid19_italy/data_by_region/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="covid19_italy.data_by_region",
Expand Down
4 changes: 2 additions & 2 deletions datasets/covid19_italy/data_by_region/pipeline.yaml
Expand Up @@ -77,7 +77,7 @@ dag:
SOURCE_URL: "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv"
SOURCE_FILE: "files/data.csv"
TARGET_FILE: "files/data_output.csv"
TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/covid19_italy/data_by_region/data_output.csv"
CSV_HEADERS: >-
["date","country","region_code","region_name","latitude","longitude","location_geom","hospitalized_patients_symptoms","hospitalized_patients_intensive_care","total_hospitalized_patients","home_confinement_cases","total_current_confirmed_cases","new_current_confirmed_cases","new_total_confirmed_cases","recovered","deaths","total_confirmed_cases","tests_performed","note"]
Expand All @@ -97,7 +97,7 @@ dag:
task_id: "load_data_by_region_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.json.shared.composer_bucket }}"
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/covid19_italy/data_by_region/data_output.csv"]
Expand Down
4 changes: 2 additions & 2 deletions datasets/covid19_italy/national_trends/national_trends_dag.py
Expand Up @@ -61,7 +61,7 @@
"SOURCE_URL": "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-andamento-nazionale/dpc-covid19-ita-andamento-nazionale.csv",
"SOURCE_FILE": "files/data.csv",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/covid19_italy/national_trends/data_output.csv",
"CSV_HEADERS": '["date","country","hospitalized_patients_symptoms","hospitalized_patients_intensive_care","total_hospitalized_patients","home_confinement_cases","total_current_confirmed_cases","new_current_confirmed_cases","new_total_confirmed_cases","recovered","deaths","total_confirmed_cases","tests_performed","note"]',
"RENAME_MAPPINGS": '{"data": "date","stato": "country","ricoverati_con_sintomi": "hospitalized_patients_symptoms","terapia_intensiva": "hospitalized_patients_intensive_care","totale_ospedalizzati": "total_hospitalized_patients","isolamento_domiciliare": "home_confinement_cases","totale_positivi": "total_current_confirmed_cases","variazione_totale_positivi": "new_current_confirmed_cases","nuovi_positivi": "new_total_confirmed_cases","dimessi_guariti": "recovered","deceduti": "deaths","totale_casi": "total_confirmed_cases","tamponi": "tests_performed","note": "note"}',
Expand All @@ -73,7 +73,7 @@
# Task to load CSV data to a BigQuery table
load_national_trends_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id="load_national_trends_to_bq",
bucket="{{ var.json.shared.composer_bucket }}",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/covid19_italy/national_trends/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="covid19_italy.national_trends",
Expand Down
4 changes: 2 additions & 2 deletions datasets/covid19_italy/national_trends/pipeline.yaml
Expand Up @@ -78,7 +78,7 @@ dag:
SOURCE_URL: "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-andamento-nazionale/dpc-covid19-ita-andamento-nazionale.csv"
SOURCE_FILE: "files/data.csv"
TARGET_FILE: "files/data_output.csv"
TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/covid19_italy/national_trends/data_output.csv"
CSV_HEADERS: >-
["date","country","hospitalized_patients_symptoms","hospitalized_patients_intensive_care","total_hospitalized_patients","home_confinement_cases","total_current_confirmed_cases","new_current_confirmed_cases","new_total_confirmed_cases","recovered","deaths","total_confirmed_cases","tests_performed","note"]
Expand All @@ -98,7 +98,7 @@ dag:
task_id: "load_national_trends_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.json.shared.composer_bucket }}"
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/covid19_italy/national_trends/data_output.csv"]
Expand Down
Expand Up @@ -39,7 +39,7 @@
bash_command="mkdir -p $airflow_data_folder/covid19_tracking/city_level_cases_and_deaths/{{ ds }}\ncurl -o $airflow_data_folder/covid19_tracking/city_level_cases_and_deaths/{{ ds }}/raw-data.csv -L $csv_source_url\n",
env={
"csv_source_url": "https://docs.google.com/spreadsheets/d/e/2PACX-1vRg-dB5Pjt-zN38BZNoCdOk_RJ_MyYFAl3QIkK5fKSddUy44DUgJwZuhjCz8KPMpiFKRwhoIwfs0NbZ/pub?gid=0&single=true&output=csv",
"airflow_data_folder": "{{ var.json.shared.airflow_data_folder }}",
"airflow_data_folder": "{{ var.value.airflow_data_folder }}",
},
)

Expand All @@ -48,7 +48,7 @@
task_id="process_raw_csv_file",
bash_command="SOURCE_CSV=$airflow_home/data/$dataset/$pipeline/{{ ds }}/raw-data.csv TARGET_CSV=$airflow_home/data/$dataset/$pipeline/{{ ds }}/data.csv python $airflow_home/dags/$dataset/$pipeline/custom/csv_transform.py\n",
env={
"airflow_home": "{{ var.json.shared.airflow_home }}",
"airflow_home": "{{ var.value.airflow_home }}",
"dataset": "covid19_tracking",
"pipeline": "city_level_cases_and_deaths",
},
Expand All @@ -57,7 +57,7 @@
# Task to load the data from Airflow data folder to BigQuery
load_csv_file_to_bq_table = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id="load_csv_file_to_bq_table",
bucket="{{ var.json.shared.composer_bucket }}",
bucket="{{ var.value.composer_bucket }}",
source_objects=[
"data/covid19_tracking/city_level_cases_and_deaths/{{ ds }}/data.csv"
],
Expand Down Expand Up @@ -167,7 +167,7 @@
# Task to archive the CSV file in the destination bucket
archive_csv_file_to_destination_bucket = gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(
task_id="archive_csv_file_to_destination_bucket",
source_bucket="{{ var.json.shared.composer_bucket }}",
source_bucket="{{ var.value.composer_bucket }}",
source_object="data/covid19_tracking/city_level_cases_and_deaths/{{ ds }}/*",
destination_bucket="{{ var.json.covid19_tracking.destination_bucket }}",
destination_object="datasets/covid19_tracking/city_level_cases_and_deaths/{{ ds }}/",
Expand Down

0 comments on commit b02f47a

Please sign in to comment.