/
pipeline.yaml
119 lines (99 loc) · 4.89 KB
/
pipeline.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
---
resources:
# A list of GCP resources that are unique and specific to your pipeline.
#
# The currently supported resources are shown below. Use only the resources
# needed by your pipeline, and delete the rest of the examples.
#
# We will keep adding to the list below to support more Google Cloud resources
# over time. If a resource you need isn't supported, please file an issue on
# the repository.
- type: bigquery_table
# A Google BigQuery table to store your data. Requires a `bigquery_dataset`
# to be specified in the config (i.e. `dataset.yaml) for the dataset that
# this pipeline belongs in.
#
# Required Properties:
# table_id
table_id: 2021_sales_predict
dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
initialize:
dag_id: 2021_sales_predict
default_args:
owner: "Google"
# When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded
depends_on_past: False
start_date: "2021-06-01"
max_active_runs: 1
schedule_interval: "@once"
catchup: False
default_view: graph
tasks:
# This is where you specify the tasks (a.k.a. processes) that your data
# pipeline will run to onboard the data.
#
# As the examples below will show, every task must be represented by an
# Airflow operator. The list of suported operators are listed in
#
# scripts/dag_imports.json
#
# If an operator you need isn't supported, please file an issue on the
# repository.
#
# Use the YAML list syntax in this block to specify every task for your
# pipeline.
- operator: "BigQueryOperator"
# Initializes a BigQuery operator that executes SQL queries in a specific
# BigQuery table.
# Task description
description: "Task to run a BigQueryOperator"
args:
# Arguments supported by this operator:
# https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator
task_id: "sample_iowa_liquor_sales_2021"
# The SQL query to execute, along with query parameters. For more info,
# see https://cloud.google.com/bigquery/docs/parameterized-queries.
sql: "SELECT date, store_name, MAX(city) as city, MAX(zip_code) as zip_code, MAX(county) as county, SUM(sale_dollars) AS sale_dollars FROM `bigquery-public-data.iowa_liquor_sales.sales` WHERE REGEXP_CONTAINS(CAST(date AS String), r\"2021-0[1-4]\") GROUP BY date, store_name"
use_legacy_sql: False
# The BigQuery destination table
destination_dataset_table: "iowa_liquor_sales_forecasting.2021_sales_predict"
# How to write to the destination: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"
- operator: "BigQueryOperator"
# Initializes a BigQuery operator that executes SQL queries in a specific
# BigQuery table.
# Task description
description: "Task to run a BigQueryOperator"
args:
# Arguments supported by this operator:
# https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator
task_id: "update_iowa_liquor_sales_2021"
# The SQL query to execute, along with query parameters. For more info,
# see https://cloud.google.com/bigquery/docs/parameterized-queries.
sql: "UPDATE `iowa_liquor_sales_forecasting.2021_sales_predict` SET sale_dollars = NULL WHERE REGEXP_CONTAINS(CAST(date as String), \"2021-04-\")"
use_legacy_sql: False
graph_paths:
# This is where you specify the relationships (i.e. directed paths/edges)
# among the tasks specified above. Use the bitshift operator to define the
# relationships and the `task_id` value above to represent tasks.
#
# For more info, see
# https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#setting-up-dependencies
- "sample_iowa_liquor_sales_2021 >> update_iowa_liquor_sales_2021"