From eba8eb9d66daa3a36f75e9cc6ab2d34176b5bb97 Mon Sep 17 00:00:00 2001 From: Dipannita Banerjee Date: Mon, 13 Sep 2021 08:30:05 +0000 Subject: [PATCH 1/4] feat: Onboard COVID19 Google Mobility dataset --- .../_images/run_csv_transform_kub/Dockerfile | 38 ++++ .../run_csv_transform_kub/csv_transform.py | 146 ++++++++++++++ .../run_csv_transform_kub/requirements.txt | 3 + .../covid19_google_mobility_dataset.tf | 26 +++ .../_terraform/mobility_report_pipeline.tf | 39 ++++ .../_terraform/provider.tf | 28 +++ .../_terraform/variables.tf | 23 +++ datasets/covid19_google_mobility/dataset.yaml | 67 +++++++ .../mobility_report/mobility_report_dag.py | 176 +++++++++++++++++ .../mobility_report/pipeline.yaml | 182 ++++++++++++++++++ 10 files changed, 728 insertions(+) create mode 100644 datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile create mode 100644 datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/covid19_google_mobility/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/covid19_google_mobility/_terraform/covid19_google_mobility_dataset.tf create mode 100644 datasets/covid19_google_mobility/_terraform/mobility_report_pipeline.tf create mode 100644 datasets/covid19_google_mobility/_terraform/provider.tf create mode 100644 datasets/covid19_google_mobility/_terraform/variables.tf create mode 100644 datasets/covid19_google_mobility/dataset.yaml create mode 100644 datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py create mode 100644 datasets/covid19_google_mobility/mobility_report/pipeline.yaml diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..85af90570 --- /dev/null +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,38 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..02bf40cb8 --- /dev/null +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,146 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime +import json +import logging +import math +import os +import pathlib +import typing + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, + headers: typing.List[str], + rename_mappings: dict, + pipeline_name: str, +) -> None: + + logging.info( + f"COVID19 Google mobility {pipeline_name} process started at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + logging.info("creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file from {source_url}...") + download_file(source_url, source_file) + + logging.info(f"Opening file {source_file}...") + df = pd.read_csv(str(source_file)) + + logging.info(f"Transforming {source_file}... ") + + logging.info(f"Transform: Rename columns.. {source_file}") + rename_headers(df, rename_mappings) + + logging.info(f"Transform: converting to integer {source_file}... ") + df["retail_and_recreation_percent_change_from_baseline"] = df[ + "retail_and_recreation_percent_change_from_baseline" + ].apply(convert_to_integer_string) + df["grocery_and_pharmacy_percent_change_from_baseline"] = df[ + "grocery_and_pharmacy_percent_change_from_baseline" + ].apply(convert_to_integer_string) + df["parks_percent_change_from_baseline"] = df[ + "parks_percent_change_from_baseline" + ].apply(convert_to_integer_string) + df["transit_stations_percent_change_from_baseline"] = df[ + "transit_stations_percent_change_from_baseline" + ].apply(convert_to_integer_string) + df["workplaces_percent_change_from_baseline"] = df[ + "workplaces_percent_change_from_baseline" + ].apply(convert_to_integer_string) + df["residential_percent_change_from_baseline"] = df[ + "residential_percent_change_from_baseline" + ].apply(convert_to_integer_string) + + logging.info("Transform: Reordering headers..") + df = df[headers] + + logging.info(f"Saving to output file.. {target_file}") + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info( + f"COVID19 Google mobility {pipeline_name} process completed at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + +def convert_to_integer_string(input: typing.Union[str, float]) -> str: + str_val = "" + if not input or (math.isnan(input)): + str_val = "" + else: + str_val = str(int(round(input, 0))) + return str_val + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df.rename(columns=rename_mappings, inplace=True) + + +def save_to_new_file(df: pd.DataFrame, file_path: str) -> None: + df.to_csv(file_path, index=False) + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"Downloading {source_url} into {source_file}") + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + headers=json.loads(os.environ["CSV_HEADERS"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + pipeline_name=os.environ["PIPELINE_NAME"], + ) diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/requirements.txt b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/covid19_google_mobility/_terraform/covid19_google_mobility_dataset.tf b/datasets/covid19_google_mobility/_terraform/covid19_google_mobility_dataset.tf new file mode 100644 index 000000000..186557df7 --- /dev/null +++ b/datasets/covid19_google_mobility/_terraform/covid19_google_mobility_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "covid19_google_mobility" { + dataset_id = "covid19_google_mobility" + project = var.project_id + description = "Terms of use\nIn order to download or use the data or reports, you must agree to the Google Terms of Service: https://policies.google.com/terms\n\nDescription\nThis dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential.\n\nThis dataset is intended to help remediate the impact of COVID-19. It shouldn\u2019t be used for medical diagnostic, prognostic, or treatment purposes. It also isn\u2019t intended to be used for guidance on personal travel plans.\n\nTo learn more about the dataset, the place categories, and how we calculate these trends and preserve privacy, do the following:\n\n\u2022 Visit the help center: https://support.google.com/covid19-mobility.\n\n\u2022 Or, read the dataset documentation: https://www.google.com/covid19/mobility/data_documentation.html." +} + +output "bigquery_dataset-covid19_google_mobility-dataset_id" { + value = google_bigquery_dataset.covid19_google_mobility.dataset_id +} diff --git a/datasets/covid19_google_mobility/_terraform/mobility_report_pipeline.tf b/datasets/covid19_google_mobility/_terraform/mobility_report_pipeline.tf new file mode 100644 index 000000000..30414bb72 --- /dev/null +++ b/datasets/covid19_google_mobility/_terraform/mobility_report_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "mobility_report" { + project = var.project_id + dataset_id = "covid19_google_mobility" + table_id = "mobility_report" + + description = "Terms of use By downloading or using the data, you agree to Google\u0027s Terms of Service: https://policies.google.com/terms Description This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. This dataset is intended to help remediate the impact of COVID-19. It shouldn\u2019t be used for medical diagnostic, prognostic, or treatment purposes. It also isn\u2019t intended to be used for guidance on personal travel plans. To learn more about the dataset, the place categories and how we calculate these trends and preserve privacy, read the data documentation: https://www.google.com/covid19/mobility/data_documentation.html" + + + + + depends_on = [ + google_bigquery_dataset.covid19_google_mobility + ] +} + +output "bigquery_table-mobility_report-table_id" { + value = google_bigquery_table.mobility_report.table_id +} + +output "bigquery_table-mobility_report-id" { + value = google_bigquery_table.mobility_report.id +} diff --git a/datasets/covid19_google_mobility/_terraform/provider.tf b/datasets/covid19_google_mobility/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/covid19_google_mobility/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/covid19_google_mobility/_terraform/variables.tf b/datasets/covid19_google_mobility/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/covid19_google_mobility/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/covid19_google_mobility/dataset.yaml b/datasets/covid19_google_mobility/dataset.yaml new file mode 100644 index 000000000..ce0ec9ba8 --- /dev/null +++ b/datasets/covid19_google_mobility/dataset.yaml @@ -0,0 +1,67 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + # The `dataset` block includes properties for your dataset that will be shown + # to users of your data on the Google Cloud website. + + name: covid19_google_mobility + + # A friendly, human-readable name of the dataset + friendly_name: covid19_google_mobility + + # A short, descriptive summary of the dataset. + description: |- + Terms of use + In order to download or use the data or reports, you must agree to the Google Terms of Service: https://policies.google.com/terms + + Description + This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. + + This dataset is intended to help remediate the impact of COVID-19. It shouldn’t be used for medical diagnostic, prognostic, or treatment purposes. It also isn’t intended to be used for guidance on personal travel plans. + + To learn more about the dataset, the place categories, and how we calculate these trends and preserve privacy, do the following: + + • Visit the help center: https://support.google.com/covid19-mobility. + + • Or, read the dataset documentation: https://www.google.com/covid19/mobility/data_documentation.html. + + + dataset_sources: ~ + + terms_of_use: ~ + + +resources: + # A list of Google Cloud resources needed by your dataset. In principle, all + # pipelines under a dataset should be able to share these resources. + + - type: bigquery_dataset + # Google BigQuery dataset to namespace all tables managed by this folder + + dataset_id: covid19_google_mobility + description: |- + Terms of use + In order to download or use the data or reports, you must agree to the Google Terms of Service: https://policies.google.com/terms + + Description + This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. + + This dataset is intended to help remediate the impact of COVID-19. It shouldn’t be used for medical diagnostic, prognostic, or treatment purposes. It also isn’t intended to be used for guidance on personal travel plans. + + To learn more about the dataset, the place categories, and how we calculate these trends and preserve privacy, do the following: + + • Visit the help center: https://support.google.com/covid19-mobility. + + • Or, read the dataset documentation: https://www.google.com/covid19/mobility/data_documentation.html. diff --git a/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py b/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py new file mode 100644 index 000000000..877154cbf --- /dev/null +++ b/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py @@ -0,0 +1,176 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="covid19_google_mobility.mobility_report", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + mobility_report_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="mobility_report_transform_csv", + startup_timeout_seconds=600, + name="mobility_report", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.covid19_google_mobility.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/covid19_google_mobility/mobility_report/data_output.csv", + "PIPELINE_NAME": "mobility_report", + "CSV_HEADERS": '["country_region_code" ,"country_region" ,"sub_region_1" ,"sub_region_2" ,"metro_area" ,"iso_3166_2_code" ,"census_fips_code" ,"place_id" ,"date" ,"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline"]', + "RENAME_MAPPINGS": '{"country_region_code":"country_region_code" ,"country_region":"country_region" ,"sub_region_1":"sub_region_1" ,"sub_region_2":"sub_region_2" ,"metro_area":"metro_area" ,"iso_3166_2_code":"iso_3166_2_code" ,"census_fips_code":"census_fips_code" ,"place_id":"place_id" ,"date":"date" ,"retail_and_recreation_percent_change_from_baseline":"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline":"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline":"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline":"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline":"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline":"residential_percent_change_from_baseline"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_mobility_report_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_mobility_report_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=["data/covid19_google_mobility/mobility_report/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="covid19_google_mobility.mobility_report", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "country_region_code", + "type": "string", + "description": "2 letter alpha code for the country/region in which changes are measured relative to the baseline. These values correspond with the ISO 3166-1 alpha-2 codes", + "mode": "nullable", + }, + { + "name": "country_region", + "type": "string", + "description": "The country/region in which changes are measured relative to the baseline", + "mode": "nullable", + }, + { + "name": "sub_region_1", + "type": "string", + "description": "First geographic sub-region in which the data is aggregated. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities", + "mode": "nullable", + }, + { + "name": "sub_region_2", + "type": "string", + "description": "Second geographic sub-region in which the data is aggregated. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities", + "mode": "nullable", + }, + { + "name": "metro_area", + "type": "string", + "description": "A specific metro area to measure mobility within a given city/metro area. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities", + "mode": "nullable", + }, + { + "name": "iso_3166_2_code", + "type": "string", + "description": "Unique identifier for the geographic region as defined by ISO Standard 3166-2.", + "mode": "nullable", + }, + { + "name": "census_fips_code", + "type": "string", + "description": "Unique identifier for each US county as defined by the US Census Bureau. Maps to county_fips_code in other tables", + "mode": "nullable", + }, + { + "name": "place_id", + "type": "string", + "description": "A textual identifier that uniquely identifies a place in the Google Places database and on Google Maps (details). For example ChIJd_Y0eVIvkIARuQyDN0F1LBA. For details see the following link: https://developers.google.com/places/web-service/place-id", + "mode": "nullable", + }, + { + "name": "date", + "type": "date", + "description": "Changes for a given date as compared to baseline. Baseline is the median value for the corresponding day of the week during the 5-week period Jan 3–Feb 6 2020.", + "mode": "nullable", + }, + { + "name": "retail_and_recreation_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places like restaurants cafes shopping centers theme parks museums libraries and movie theaters.", + "mode": "nullable", + }, + { + "name": "grocery_and_pharmacy_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places like grocery markets food warehouses farmers markets specialty food shops drug stores and pharmacies.", + "mode": "nullable", + }, + { + "name": "parks_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places like local parks national parks public beaches marinas dog parks plazas and public gardens.", + "mode": "nullable", + }, + { + "name": "transit_stations_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places like public transport hubs such as subway bus and train stations.", + "mode": "nullable", + }, + { + "name": "workplaces_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places of work.", + "mode": "nullable", + }, + { + "name": "residential_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places of residence.", + "mode": "nullable", + }, + ], + ) + + mobility_report_transform_csv >> load_mobility_report_to_bq diff --git a/datasets/covid19_google_mobility/mobility_report/pipeline.yaml b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml new file mode 100644 index 000000000..edf650445 --- /dev/null +++ b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml @@ -0,0 +1,182 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: mobility_report + + # Description of the table + description: "Terms of use By downloading or using the data, you agree to Google's Terms of Service: https://policies.google.com/terms Description This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. This dataset is intended to help remediate the impact of COVID-19. It shouldn’t be used for medical diagnostic, prognostic, or treatment purposes. It also isn’t intended to be used for guidance on personal travel plans. To learn more about the dataset, the place categories and how we calculate these trends and preserve privacy, read the data documentation: https://www.google.com/covid19/mobility/data_documentation.html" + +dag: + airflow_version: 1 + initialize: + dag_id: mobility_report + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "mobility_report_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "mobility_report" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment"s resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.covid19_google_mobility.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/covid19_google_mobility/mobility_report/data_output.csv" + PIPELINE_NAME: "mobility_report" + CSV_HEADERS: >- + ["country_region_code" ,"country_region" ,"sub_region_1" ,"sub_region_2" ,"metro_area" ,"iso_3166_2_code" ,"census_fips_code" ,"place_id" ,"date" ,"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline"] + RENAME_MAPPINGS: >- + {"country_region_code":"country_region_code" ,"country_region":"country_region" ,"sub_region_1":"sub_region_1" ,"sub_region_2":"sub_region_2" ,"metro_area":"metro_area" ,"iso_3166_2_code":"iso_3166_2_code" ,"census_fips_code":"census_fips_code" ,"place_id":"place_id" ,"date":"date" ,"retail_and_recreation_percent_change_from_baseline":"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline":"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline":"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline":"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline":"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline":"residential_percent_change_from_baseline"} + + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_mobility_report_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/covid19_google_mobility/mobility_report/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "covid19_google_mobility.mobility_report" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "country_region_code" + type: "string" + description: "2 letter alpha code for the country/region in which changes are measured relative to the baseline. These values correspond with the ISO 3166-1 alpha-2 codes" + mode: "nullable" + - name: "country_region" + type: "string" + description: "The country/region in which changes are measured relative to the baseline" + mode: "nullable" + - name: "sub_region_1" + type: "string" + description: "First geographic sub-region in which the data is aggregated. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities" + mode: "nullable" + - name: "sub_region_2" + type: "string" + description: "Second geographic sub-region in which the data is aggregated. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities" + mode: "nullable" + - name: "metro_area" + type: "string" + description: "A specific metro area to measure mobility within a given city/metro area. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities" + mode: "nullable" + - name: "iso_3166_2_code" + type: "string" + description: "Unique identifier for the geographic region as defined by ISO Standard 3166-2." + mode: "nullable" + - name: "census_fips_code" + type: "string" + description: "Unique identifier for each US county as defined by the US Census Bureau. Maps to county_fips_code in other tables" + mode: "nullable" + - name: "place_id" + type: "string" + description: "A textual identifier that uniquely identifies a place in the Google Places database and on Google Maps (details). For example ChIJd_Y0eVIvkIARuQyDN0F1LBA. For details see the following link: https://developers.google.com/places/web-service/place-id" + mode: "nullable" + - name: "date" + type: "date" + description: "Changes for a given date as compared to baseline. Baseline is the median value for the corresponding day of the week during the 5-week period Jan 3–Feb 6 2020." + mode: "nullable" + - name: "retail_and_recreation_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places like restaurants cafes shopping centers theme parks museums libraries and movie theaters." + mode: "nullable" + - name: "grocery_and_pharmacy_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places like grocery markets food warehouses farmers markets specialty food shops drug stores and pharmacies." + mode: "nullable" + - name: "parks_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places like local parks national parks public beaches marinas dog parks plazas and public gardens." + mode: "nullable" + - name: "transit_stations_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places like public transport hubs such as subway bus and train stations." + mode: "nullable" + - name: "workplaces_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places of work." + mode: "nullable" + - name: "residential_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places of residence." + mode: "nullable" + + graph_paths: + - "mobility_report_transform_csv >> load_mobility_report_to_bq" From 9c15d07c8b82d2a22ce2e59deee65f94f2dafe07 Mon Sep 17 00:00:00 2001 From: Dipannita Banerjee Date: Mon, 20 Sep 2021 08:28:05 +0000 Subject: [PATCH 2/4] style: change is styling --- .../run_csv_transform_kub/csv_transform.py | 35 +++++++++---------- .../mobility_report/pipeline.yaml | 21 ----------- 2 files changed, 16 insertions(+), 40 deletions(-) diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py index 02bf40cb8..abed3dc3d 100644 --- a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py @@ -42,7 +42,7 @@ def main( + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) ) - logging.info("creating 'files' folder") + logging.info("Creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) logging.info(f"Downloading file from {source_url}...") @@ -57,24 +57,7 @@ def main( rename_headers(df, rename_mappings) logging.info(f"Transform: converting to integer {source_file}... ") - df["retail_and_recreation_percent_change_from_baseline"] = df[ - "retail_and_recreation_percent_change_from_baseline" - ].apply(convert_to_integer_string) - df["grocery_and_pharmacy_percent_change_from_baseline"] = df[ - "grocery_and_pharmacy_percent_change_from_baseline" - ].apply(convert_to_integer_string) - df["parks_percent_change_from_baseline"] = df[ - "parks_percent_change_from_baseline" - ].apply(convert_to_integer_string) - df["transit_stations_percent_change_from_baseline"] = df[ - "transit_stations_percent_change_from_baseline" - ].apply(convert_to_integer_string) - df["workplaces_percent_change_from_baseline"] = df[ - "workplaces_percent_change_from_baseline" - ].apply(convert_to_integer_string) - df["residential_percent_change_from_baseline"] = df[ - "residential_percent_change_from_baseline" - ].apply(convert_to_integer_string) + convert_values_to_integer_string(df) logging.info("Transform: Reordering headers..") df = df[headers] @@ -105,6 +88,20 @@ def convert_to_integer_string(input: typing.Union[str, float]) -> str: return str_val +def convert_values_to_integer_string(df: pd.DataFrame) -> None: + cols = [ + "retail_and_recreation_percent_change_from_baseline", + "grocery_and_pharmacy_percent_change_from_baseline", + "parks_percent_change_from_baseline", + "transit_stations_percent_change_from_baseline", + "workplaces_percent_change_from_baseline", + "residential_percent_change_from_baseline", + ] + + for cols in cols: + df[cols] = df[cols].apply(convert_to_integer_string) + + def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: df.rename(columns=rename_mappings, inplace=True) diff --git a/datasets/covid19_google_mobility/mobility_report/pipeline.yaml b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml index edf650445..b94040c9d 100644 --- a/datasets/covid19_google_mobility/mobility_report/pipeline.yaml +++ b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml @@ -16,10 +16,8 @@ resources: - type: bigquery_table - # Required Properties: table_id: mobility_report - # Description of the table description: "Terms of use By downloading or using the data, you agree to Google's Terms of Service: https://policies.google.com/terms Description This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. This dataset is intended to help remediate the impact of COVID-19. It shouldn’t be used for medical diagnostic, prognostic, or treatment purposes. It also isn’t intended to be used for guidance on personal travel plans. To learn more about the dataset, the place categories and how we calculate these trends and preserve privacy, read the data documentation: https://www.google.com/covid19/mobility/data_documentation.html" dag: @@ -29,7 +27,6 @@ dag: default_args: owner: "Google" - # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded depends_on_past: False start_date: "2021-03-01" max_active_runs: 1 @@ -40,7 +37,6 @@ dag: tasks: - operator: "KubernetesPodOperator" - # Task description description: "Run CSV transform within kubernetes pod" args: @@ -48,11 +44,8 @@ dag: task_id: "mobility_report_transform_csv" startup_timeout_seconds: 600 - - # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id name: "mobility_report" - # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment"s resources without starving other pipelines. namespace: "default" affinity: @@ -68,10 +61,8 @@ dag: image_pull_policy: "Always" - # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. image: "{{ var.json.covid19_google_mobility.container_registry.run_csv_transform_kub }}" - # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. env_vars: SOURCE_URL: "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv" SOURCE_FILE: "files/data.csv" @@ -84,8 +75,6 @@ dag: RENAME_MAPPINGS: >- {"country_region_code":"country_region_code" ,"country_region":"country_region" ,"sub_region_1":"sub_region_1" ,"sub_region_2":"sub_region_2" ,"metro_area":"metro_area" ,"iso_3166_2_code":"iso_3166_2_code" ,"census_fips_code":"census_fips_code" ,"place_id":"place_id" ,"date":"date" ,"retail_and_recreation_percent_change_from_baseline":"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline":"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline":"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline":"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline":"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline":"residential_percent_change_from_baseline"} - - # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes resources: request_memory: "2G" request_cpu: "1" @@ -96,26 +85,16 @@ dag: args: task_id: "load_mobility_report_to_bq" - # The GCS bucket where the CSV file is located in. bucket: "{{ var.json.shared.composer_bucket }}" - # The GCS object path for the CSV file source_objects: ["data/covid19_google_mobility/mobility_report/data_output.csv"] source_format: "CSV" destination_project_dataset_table: "covid19_google_mobility.mobility_report" - # Use this if your CSV file contains a header row skip_leading_rows: 1 - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. - schema_fields: - name: "country_region_code" type: "string" From 7aecc21ed98fad29c4b3cf880e101b59cc6bcaf2 Mon Sep 17 00:00:00 2001 From: Dipannita Banerjee Date: Wed, 22 Sep 2021 07:45:57 +0000 Subject: [PATCH 3/4] style: removing commented lines --- .../mobility_report/mobility_report_dag.py | 4 ++-- .../covid19_google_mobility/mobility_report/pipeline.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py b/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py index 877154cbf..e5fcfd717 100644 --- a/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py +++ b/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py @@ -61,7 +61,7 @@ "SOURCE_URL": "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", - "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/covid19_google_mobility/mobility_report/data_output.csv", "PIPELINE_NAME": "mobility_report", "CSV_HEADERS": '["country_region_code" ,"country_region" ,"sub_region_1" ,"sub_region_2" ,"metro_area" ,"iso_3166_2_code" ,"census_fips_code" ,"place_id" ,"date" ,"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline"]', @@ -73,7 +73,7 @@ # Task to load CSV data to a BigQuery table load_mobility_report_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( task_id="load_mobility_report_to_bq", - bucket="{{ var.json.shared.composer_bucket }}", + bucket="{{ var.value.composer_bucket }}", source_objects=["data/covid19_google_mobility/mobility_report/data_output.csv"], source_format="CSV", destination_project_dataset_table="covid19_google_mobility.mobility_report", diff --git a/datasets/covid19_google_mobility/mobility_report/pipeline.yaml b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml index b94040c9d..d3eda945a 100644 --- a/datasets/covid19_google_mobility/mobility_report/pipeline.yaml +++ b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml @@ -67,7 +67,7 @@ dag: SOURCE_URL: "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" - TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/covid19_google_mobility/mobility_report/data_output.csv" PIPELINE_NAME: "mobility_report" CSV_HEADERS: >- @@ -85,7 +85,7 @@ dag: args: task_id: "load_mobility_report_to_bq" - bucket: "{{ var.json.shared.composer_bucket }}" + bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/covid19_google_mobility/mobility_report/data_output.csv"] source_format: "CSV" From 4d42115a8a45421ea73d457525e0f85ef81fa527 Mon Sep 17 00:00:00 2001 From: Dipannita Banerjee Date: Tue, 5 Oct 2021 16:41:47 +0000 Subject: [PATCH 4/4] fix: remove commented lines and white spaces --- .../_images/run_csv_transform_kub/Dockerfile | 1 - .../run_csv_transform_kub/csv_transform.py | 2 +- datasets/covid19_google_mobility/dataset.yaml | 15 --------------- .../mobility_report/pipeline.yaml | 19 ------------------- 4 files changed, 1 insertion(+), 36 deletions(-) diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile index 85af90570..7265a1b71 100644 --- a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile @@ -13,7 +13,6 @@ # limitations under the License. # The base image for this build -# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim FROM python:3.8 # Allow statements and log messages to appear in Cloud logs diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py index abed3dc3d..e183375db 100644 --- a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py @@ -56,7 +56,7 @@ def main( logging.info(f"Transform: Rename columns.. {source_file}") rename_headers(df, rename_mappings) - logging.info(f"Transform: converting to integer {source_file}... ") + logging.info(f"Transform: Converting to integer {source_file}... ") convert_values_to_integer_string(df) logging.info("Transform: Reordering headers..") diff --git a/datasets/covid19_google_mobility/dataset.yaml b/datasets/covid19_google_mobility/dataset.yaml index ce0ec9ba8..1c17b5dcd 100644 --- a/datasets/covid19_google_mobility/dataset.yaml +++ b/datasets/covid19_google_mobility/dataset.yaml @@ -13,15 +13,8 @@ # limitations under the License. dataset: - # The `dataset` block includes properties for your dataset that will be shown - # to users of your data on the Google Cloud website. - name: covid19_google_mobility - - # A friendly, human-readable name of the dataset friendly_name: covid19_google_mobility - - # A short, descriptive summary of the dataset. description: |- Terms of use In order to download or use the data or reports, you must agree to the Google Terms of Service: https://policies.google.com/terms @@ -36,20 +29,12 @@ dataset: • Visit the help center: https://support.google.com/covid19-mobility. • Or, read the dataset documentation: https://www.google.com/covid19/mobility/data_documentation.html. - - dataset_sources: ~ - terms_of_use: ~ resources: - # A list of Google Cloud resources needed by your dataset. In principle, all - # pipelines under a dataset should be able to share these resources. - - type: bigquery_dataset - # Google BigQuery dataset to namespace all tables managed by this folder - dataset_id: covid19_google_mobility description: |- Terms of use diff --git a/datasets/covid19_google_mobility/mobility_report/pipeline.yaml b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml index d3eda945a..e95e3cd33 100644 --- a/datasets/covid19_google_mobility/mobility_report/pipeline.yaml +++ b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml @@ -17,7 +17,6 @@ resources: - type: bigquery_table table_id: mobility_report - description: "Terms of use By downloading or using the data, you agree to Google's Terms of Service: https://policies.google.com/terms Description This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. This dataset is intended to help remediate the impact of COVID-19. It shouldn’t be used for medical diagnostic, prognostic, or treatment purposes. It also isn’t intended to be used for guidance on personal travel plans. To learn more about the dataset, the place categories and how we calculate these trends and preserve privacy, read the data documentation: https://www.google.com/covid19/mobility/data_documentation.html" dag: @@ -26,28 +25,20 @@ dag: dag_id: mobility_report default_args: owner: "Google" - depends_on_past: False start_date: "2021-03-01" max_active_runs: 1 schedule_interval: "@daily" catchup: False default_view: graph - tasks: - operator: "KubernetesPodOperator" - description: "Run CSV transform within kubernetes pod" - args: - task_id: "mobility_report_transform_csv" - startup_timeout_seconds: 600 name: "mobility_report" - namespace: "default" - affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: @@ -57,12 +48,8 @@ dag: operator: In values: - "pool-e2-standard-4" - - image_pull_policy: "Always" - image: "{{ var.json.covid19_google_mobility.container_registry.run_csv_transform_kub }}" - env_vars: SOURCE_URL: "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv" SOURCE_FILE: "files/data.csv" @@ -74,25 +61,19 @@ dag: ["country_region_code" ,"country_region" ,"sub_region_1" ,"sub_region_2" ,"metro_area" ,"iso_3166_2_code" ,"census_fips_code" ,"place_id" ,"date" ,"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline"] RENAME_MAPPINGS: >- {"country_region_code":"country_region_code" ,"country_region":"country_region" ,"sub_region_1":"sub_region_1" ,"sub_region_2":"sub_region_2" ,"metro_area":"metro_area" ,"iso_3166_2_code":"iso_3166_2_code" ,"census_fips_code":"census_fips_code" ,"place_id":"place_id" ,"date":"date" ,"retail_and_recreation_percent_change_from_baseline":"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline":"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline":"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline":"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline":"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline":"residential_percent_change_from_baseline"} - resources: request_memory: "2G" request_cpu: "1" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" - args: task_id: "load_mobility_report_to_bq" - bucket: "{{ var.value.composer_bucket }}" - source_objects: ["data/covid19_google_mobility/mobility_report/data_output.csv"] source_format: "CSV" destination_project_dataset_table: "covid19_google_mobility.mobility_report" - skip_leading_rows: 1 - write_disposition: "WRITE_TRUNCATE" schema_fields: