/
pipeline.yaml
141 lines (134 loc) · 6.5 KB
/
pipeline.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
---
resources:
- type: bigquery_table
table_id: national_testing_and_outcomes
dag:
airflow_version: 1
initialize:
dag_id: "national_testing_and_outcomes"
default_args:
owner: "Google"
# When set to True, keeps a task from getting triggered if the previous schedule for the task hasn't succeeded
depends_on_past: False
start_date: '2021-03-01'
max_active_runs: 1
schedule_interval: "@once"
catchup: False
default_view: graph
tasks:
- operator: "BashOperator"
description: "Task to copy `national-history.csv` from COVID-19 Tracking Project to GCS"
args:
task_id: "copy_csv_file_to_gcs"
bash_command: |
echo $airflow_data_folder
echo $csv_source_url
mkdir -p $airflow_data_folder/covid19_tracking/national_testing_and_outcomes
curl -o $airflow_data_folder/covid19_tracking/national_testing_and_outcomes/national-history-{{ ds }}.csv -L $csv_source_url
env:
csv_source_url: "https://covidtracking.com/data/download/national-history.csv"
airflow_data_folder: "{{ var.json.shared.airflow_data_folder }}"
- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load the data from Airflow data folder to BigQuery"
args:
task_id: "load_csv_file_to_bq_table"
bucket: "{{ var.json.shared.composer_bucket }}"
source_objects: ["data/covid19_tracking/national_testing_and_outcomes/national-history-{{ ds }}.csv"]
source_format: "CSV"
destination_project_dataset_table: "covid19_tracking.national_testing_and_outcomes"
skip_leading_rows: 1
write_disposition: "WRITE_TRUNCATE"
schema_fields:
- name: "date"
type: "DATE"
mode: "REQUIRED"
description: "Date of the observations"
- name: "death"
type: "INTEGER"
mode: "NULLABLE"
description: "Total cumulative number of people that have died"
- name: "death_increase"
type: "INTEGER"
mode: "NULLABLE"
description: "The daily increase in the number of people that have died based on the previous day's value"
- name: "in_icu_cumulative"
type: "INTEGER"
mode: "NULLABLE"
description: "Total number of individuals who have ever been hospitalized in the Intensive Care Unit with COVID-19"
- name: "in_icu_currently"
type: "INTEGER"
mode: "NULLABLE"
description: "Individuals who are currently hospitalized in the Intensive Care Unit with COVID-19"
- name: "hospitalized_increase"
type: "INTEGER"
mode: "NULLABLE"
description: "Daily increase in hospitalized_cumulative, calculated from the previous day's value"
- name: "hospitalized_currently"
type: "INTEGER"
mode: "NULLABLE"
description: "Individuals who are currently hospitalized with COVID-19"
- name: "hospitalized_cumulative"
type: "INTEGER"
mode: "NULLABLE"
description: "Total number of individuals who have ever been hospitalized with COVID-19"
- name: "negative"
type: "INTEGER"
mode: "NULLABLE"
description: "Total number of unique people with a completed PCR test that returns negative"
- name: "negative_increase"
type: "INTEGER"
mode: "NULLABLE"
description: "Daily increase of unique people with a completed PCR test that returns negative based on the previous day's value"
- name: "on_ventilator_cumulative"
type: "INTEGER"
mode: "NULLABLE"
description: "Total number of individuals who have ever been hospitalized under advanced ventilation with COVID-19"
- name: "on_ventilator_currently"
type: "INTEGER"
mode: "NULLABLE"
description: "Individuals who are currently hospitalized under advanced ventilation with COVID-19"
- name: "positive"
type: "INTEGER"
mode: "NULLABLE"
description: "Total number of confirmed plus probable cases of COVID-19 reported by the state or territory"
- name: "positive_increase"
type: "INTEGER"
mode: "NULLABLE"
description: "The daily increase in positive, which measures cases (confirmed plus probable) calculated based on the previous day's value"
- name: "states"
type: "INTEGER"
mode: "NULLABLE"
description: "The number of states and territories included in the US dataset for this day"
- name: "total_test_results"
type: "INTEGER"
mode: "NULLABLE"
description: "In most states, this field is currently computed by adding positive and negative values because, historically, some states do not report totals, and to work around different reporting cadences for cases and tests"
- name: "total_test_results_increase"
type: "INTEGER"
mode: "NULLABLE"
description: "The daily increase in total_test_results, calculated from the previous day's value"
- operator: "GoogleCloudStorageToGoogleCloudStorageOperator"
description: "Task to archive the CSV file in the destination bucket"
args:
task_id: "archive_csv_file_to_destination_bucket"
source_bucket: "{{ var.json.shared.composer_bucket }}"
source_object: "data/covid19_tracking/national_testing_and_outcomes/national-history-{{ ds }}.csv"
destination_bucket: "{{ var.json.covid19_tracking.destination_bucket }}"
destination_object: "datasets/covid19_tracking/national_testing_and_outcomes/national-history-{{ ds }}.csv"
move_object: True
graph_paths:
- "copy_csv_file_to_gcs >> load_csv_file_to_bq_table"
- "load_csv_file_to_bq_table >> archive_csv_file_to_destination_bucket"