Skip to content

Commit

Permalink
feat: add Iowa liquor sales samples for Vertex AI Forecasting tutorial (
Browse files Browse the repository at this point in the history
#85)

* Add pipelines for iowa-liquor-sales dataset

* Fix pipelines for iowa_liquor_sales

* Fix yaml lint errors

* Use console.cloud.google.com instead of pantheon.corp.google.com

* Provision iowa_liquor_sales_forecasting instead of iowa_liquor_sales

* rename dataset folder to iowa_liquor_sales_forecasting

Co-authored-by: Adler Santos <adlersantos@google.com>
  • Loading branch information
thehardikv and adlersantos committed Jun 14, 2021
1 parent 1962584 commit d832327
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 0 deletions.
@@ -0,0 +1,44 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from airflow import DAG
from airflow.contrib.operators import bigquery_operator

default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-06-01",
}


with DAG(
dag_id="iowa_liquor_sales_forecasting.2020_sales_train",
default_args=default_args,
max_active_runs=1,
schedule_interval="@once",
catchup=False,
default_view="graph",
) as dag:

# Task to run a BigQueryOperator
sample_iowa_liquor_sales_2020 = bigquery_operator.BigQueryOperator(
task_id="sample_iowa_liquor_sales_2020",
sql='SELECT date, store_name, MAX(city) as city, MAX(zip_code) as zip_code, MAX(county) as county, SUM(sale_dollars) AS sale_dollars FROM `bigquery-public-data.iowa_liquor_sales.sales` WHERE REGEXP_CONTAINS(CAST(date AS String), "2020-") GROUP BY date, store_name',
use_legacy_sql=False,
destination_dataset_table="iowa_liquor_sales_forecasting.2020_sales_train",
write_disposition="WRITE_TRUNCATE",
)

sample_iowa_liquor_sales_2020
100 changes: 100 additions & 0 deletions datasets/iowa_liquor_sales_forecasting/2020_sales_train/pipeline.yaml
@@ -0,0 +1,100 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

---
resources:
# A list of GCP resources that are unique and specific to your pipeline.
#
# The currently supported resources are shown below. Use only the resources
# needed by your pipeline, and delete the rest of the examples.
#
# We will keep adding to the list below to support more Google Cloud resources
# over time. If a resource you need isn't supported, please file an issue on
# the repository.

- type: bigquery_table
# A Google BigQuery table to store your data. Requires a `bigquery_dataset`
# to be specified in the config (i.e. `dataset.yaml) for the dataset that
# this pipeline belongs in.
#
# Required Properties:
# table_id
table_id: 2020_sales_train

dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
initialize:
dag_id: 2020_sales_train
default_args:
owner: "Google"

# When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded
depends_on_past: False
start_date: "2021-06-01"
max_active_runs: 1
schedule_interval: "@once"
catchup: False
default_view: graph

tasks:
# This is where you specify the tasks (a.k.a. processes) that your data
# pipeline will run to onboard the data.
#
# As the examples below will show, every task must be represented by an
# Airflow operator. The list of suported operators are listed in
#
# scripts/dag_imports.json
#
# If an operator you need isn't supported, please file an issue on the
# repository.
#
# Use the YAML list syntax in this block to specify every task for your
# pipeline.

- operator: "BigQueryOperator"
# Initializes a BigQuery operator that executes SQL queries in a specific
# BigQuery table.

# Task description
description: "Task to run a BigQueryOperator"

args:
# Arguments supported by this operator:
# https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator

task_id: "sample_iowa_liquor_sales_2020"

# The SQL query to execute, along with query parameters. For more info,
# see https://cloud.google.com/bigquery/docs/parameterized-queries.
sql: "SELECT date, store_name, MAX(city) as city, MAX(zip_code) as zip_code, MAX(county) as county, SUM(sale_dollars) AS sale_dollars FROM `bigquery-public-data.iowa_liquor_sales.sales` WHERE REGEXP_CONTAINS(CAST(date AS String), \"2020-\") GROUP BY date, store_name"

use_legacy_sql: False

# The BigQuery destination table
destination_dataset_table: "iowa_liquor_sales_forecasting.2020_sales_train"

# How to write to the destination: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

graph_paths:
# This is where you specify the relationships (i.e. directed paths/edges)
# among the tasks specified above. Use the bitshift operator to define the
# relationships and the `task_id` value above to represent tasks.
#
# For more info, see
# https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#setting-up-dependencies
- "sample_iowa_liquor_sales_2020"
@@ -0,0 +1,51 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from airflow import DAG
from airflow.contrib.operators import bigquery_operator

default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-06-01",
}


with DAG(
dag_id="iowa_liquor_sales_forecasting.2021_sales_predict",
default_args=default_args,
max_active_runs=1,
schedule_interval="@once",
catchup=False,
default_view="graph",
) as dag:

# Task to run a BigQueryOperator
sample_iowa_liquor_sales_2021 = bigquery_operator.BigQueryOperator(
task_id="sample_iowa_liquor_sales_2021",
sql='SELECT date, store_name, MAX(city) as city, MAX(zip_code) as zip_code, MAX(county) as county, SUM(sale_dollars) AS sale_dollars FROM `bigquery-public-data.iowa_liquor_sales.sales` WHERE REGEXP_CONTAINS(CAST(date AS String), r"2021-0[1-4]") GROUP BY date, store_name',
use_legacy_sql=False,
destination_dataset_table="iowa_liquor_sales_forecasting.2021_sales_predict",
write_disposition="WRITE_TRUNCATE",
)

# Task to run a BigQueryOperator
update_iowa_liquor_sales_2021 = bigquery_operator.BigQueryOperator(
task_id="update_iowa_liquor_sales_2021",
sql='UPDATE `iowa_liquor_sales_forecasting.2021_sales_predict` SET sale_dollars = NULL WHERE REGEXP_CONTAINS(CAST(date as String), "2021-04-")',
use_legacy_sql=False,
)

sample_iowa_liquor_sales_2021 >> update_iowa_liquor_sales_2021
@@ -0,0 +1,119 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

---
resources:
# A list of GCP resources that are unique and specific to your pipeline.
#
# The currently supported resources are shown below. Use only the resources
# needed by your pipeline, and delete the rest of the examples.
#
# We will keep adding to the list below to support more Google Cloud resources
# over time. If a resource you need isn't supported, please file an issue on
# the repository.

- type: bigquery_table
# A Google BigQuery table to store your data. Requires a `bigquery_dataset`
# to be specified in the config (i.e. `dataset.yaml) for the dataset that
# this pipeline belongs in.
#
# Required Properties:
# table_id
table_id: 2021_sales_predict

dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
# onboard your data.
initialize:
dag_id: 2021_sales_predict
default_args:
owner: "Google"

# When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded
depends_on_past: False
start_date: "2021-06-01"
max_active_runs: 1
schedule_interval: "@once"
catchup: False
default_view: graph

tasks:
# This is where you specify the tasks (a.k.a. processes) that your data
# pipeline will run to onboard the data.
#
# As the examples below will show, every task must be represented by an
# Airflow operator. The list of suported operators are listed in
#
# scripts/dag_imports.json
#
# If an operator you need isn't supported, please file an issue on the
# repository.
#
# Use the YAML list syntax in this block to specify every task for your
# pipeline.

- operator: "BigQueryOperator"
# Initializes a BigQuery operator that executes SQL queries in a specific
# BigQuery table.

# Task description
description: "Task to run a BigQueryOperator"

args:
# Arguments supported by this operator:
# https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator

task_id: "sample_iowa_liquor_sales_2021"

# The SQL query to execute, along with query parameters. For more info,
# see https://cloud.google.com/bigquery/docs/parameterized-queries.
sql: "SELECT date, store_name, MAX(city) as city, MAX(zip_code) as zip_code, MAX(county) as county, SUM(sale_dollars) AS sale_dollars FROM `bigquery-public-data.iowa_liquor_sales.sales` WHERE REGEXP_CONTAINS(CAST(date AS String), r\"2021-0[1-4]\") GROUP BY date, store_name"

use_legacy_sql: False

# The BigQuery destination table
destination_dataset_table: "iowa_liquor_sales_forecasting.2021_sales_predict"

# How to write to the destination: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

- operator: "BigQueryOperator"
# Initializes a BigQuery operator that executes SQL queries in a specific
# BigQuery table.

# Task description
description: "Task to run a BigQueryOperator"

args:
# Arguments supported by this operator:
# https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator

task_id: "update_iowa_liquor_sales_2021"

# The SQL query to execute, along with query parameters. For more info,
# see https://cloud.google.com/bigquery/docs/parameterized-queries.
sql: "UPDATE `iowa_liquor_sales_forecasting.2021_sales_predict` SET sale_dollars = NULL WHERE REGEXP_CONTAINS(CAST(date as String), \"2021-04-\")"

use_legacy_sql: False

graph_paths:
# This is where you specify the relationships (i.e. directed paths/edges)
# among the tasks specified above. Use the bitshift operator to define the
# relationships and the `task_id` value above to represent tasks.
#
# For more info, see
# https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#setting-up-dependencies
- "sample_iowa_liquor_sales_2021 >> update_iowa_liquor_sales_2021"
@@ -0,0 +1,36 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


resource "google_bigquery_table" "bqt_2020_sales_train" {
project = var.project_id
dataset_id = "iowa_liquor_sales_forecasting"
table_id = "2020_sales_train"



depends_on = [
google_bigquery_dataset.iowa_liquor_sales_forecasting
]
}

output "bigquery_table-2020_sales_train-table_id" {
value = google_bigquery_table.bqt_2020_sales_train.table_id
}

output "bigquery_table-2020_sales_train-id" {
value = google_bigquery_table.bqt_2020_sales_train.id
}
@@ -0,0 +1,36 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


resource "google_bigquery_table" "bqt_2021_sales_predict" {
project = var.project_id
dataset_id = "iowa_liquor_sales_forecasting"
table_id = "2021_sales_predict"



depends_on = [
google_bigquery_dataset.iowa_liquor_sales_forecasting
]
}

output "bigquery_table-2021_sales_predict-table_id" {
value = google_bigquery_table.bqt_2021_sales_predict.table_id
}

output "bigquery_table-2021_sales_predict-id" {
value = google_bigquery_table.bqt_2021_sales_predict.id
}

0 comments on commit d832327

Please sign in to comment.