diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..7265a1b71 --- /dev/null +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,37 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..e183375db --- /dev/null +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,143 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime +import json +import logging +import math +import os +import pathlib +import typing + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, + headers: typing.List[str], + rename_mappings: dict, + pipeline_name: str, +) -> None: + + logging.info( + f"COVID19 Google mobility {pipeline_name} process started at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + logging.info("Creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file from {source_url}...") + download_file(source_url, source_file) + + logging.info(f"Opening file {source_file}...") + df = pd.read_csv(str(source_file)) + + logging.info(f"Transforming {source_file}... ") + + logging.info(f"Transform: Rename columns.. {source_file}") + rename_headers(df, rename_mappings) + + logging.info(f"Transform: Converting to integer {source_file}... ") + convert_values_to_integer_string(df) + + logging.info("Transform: Reordering headers..") + df = df[headers] + + logging.info(f"Saving to output file.. {target_file}") + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info( + f"COVID19 Google mobility {pipeline_name} process completed at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + +def convert_to_integer_string(input: typing.Union[str, float]) -> str: + str_val = "" + if not input or (math.isnan(input)): + str_val = "" + else: + str_val = str(int(round(input, 0))) + return str_val + + +def convert_values_to_integer_string(df: pd.DataFrame) -> None: + cols = [ + "retail_and_recreation_percent_change_from_baseline", + "grocery_and_pharmacy_percent_change_from_baseline", + "parks_percent_change_from_baseline", + "transit_stations_percent_change_from_baseline", + "workplaces_percent_change_from_baseline", + "residential_percent_change_from_baseline", + ] + + for cols in cols: + df[cols] = df[cols].apply(convert_to_integer_string) + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df.rename(columns=rename_mappings, inplace=True) + + +def save_to_new_file(df: pd.DataFrame, file_path: str) -> None: + df.to_csv(file_path, index=False) + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"Downloading {source_url} into {source_file}") + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + headers=json.loads(os.environ["CSV_HEADERS"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + pipeline_name=os.environ["PIPELINE_NAME"], + ) diff --git a/datasets/covid19_google_mobility/_images/run_csv_transform_kub/requirements.txt b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/covid19_google_mobility/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/covid19_google_mobility/_terraform/covid19_google_mobility_dataset.tf b/datasets/covid19_google_mobility/_terraform/covid19_google_mobility_dataset.tf new file mode 100644 index 000000000..186557df7 --- /dev/null +++ b/datasets/covid19_google_mobility/_terraform/covid19_google_mobility_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "covid19_google_mobility" { + dataset_id = "covid19_google_mobility" + project = var.project_id + description = "Terms of use\nIn order to download or use the data or reports, you must agree to the Google Terms of Service: https://policies.google.com/terms\n\nDescription\nThis dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential.\n\nThis dataset is intended to help remediate the impact of COVID-19. It shouldn\u2019t be used for medical diagnostic, prognostic, or treatment purposes. It also isn\u2019t intended to be used for guidance on personal travel plans.\n\nTo learn more about the dataset, the place categories, and how we calculate these trends and preserve privacy, do the following:\n\n\u2022 Visit the help center: https://support.google.com/covid19-mobility.\n\n\u2022 Or, read the dataset documentation: https://www.google.com/covid19/mobility/data_documentation.html." +} + +output "bigquery_dataset-covid19_google_mobility-dataset_id" { + value = google_bigquery_dataset.covid19_google_mobility.dataset_id +} diff --git a/datasets/covid19_google_mobility/_terraform/mobility_report_pipeline.tf b/datasets/covid19_google_mobility/_terraform/mobility_report_pipeline.tf new file mode 100644 index 000000000..30414bb72 --- /dev/null +++ b/datasets/covid19_google_mobility/_terraform/mobility_report_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "mobility_report" { + project = var.project_id + dataset_id = "covid19_google_mobility" + table_id = "mobility_report" + + description = "Terms of use By downloading or using the data, you agree to Google\u0027s Terms of Service: https://policies.google.com/terms Description This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. This dataset is intended to help remediate the impact of COVID-19. It shouldn\u2019t be used for medical diagnostic, prognostic, or treatment purposes. It also isn\u2019t intended to be used for guidance on personal travel plans. To learn more about the dataset, the place categories and how we calculate these trends and preserve privacy, read the data documentation: https://www.google.com/covid19/mobility/data_documentation.html" + + + + + depends_on = [ + google_bigquery_dataset.covid19_google_mobility + ] +} + +output "bigquery_table-mobility_report-table_id" { + value = google_bigquery_table.mobility_report.table_id +} + +output "bigquery_table-mobility_report-id" { + value = google_bigquery_table.mobility_report.id +} diff --git a/datasets/covid19_google_mobility/_terraform/provider.tf b/datasets/covid19_google_mobility/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/covid19_google_mobility/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/covid19_google_mobility/_terraform/variables.tf b/datasets/covid19_google_mobility/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/covid19_google_mobility/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/covid19_google_mobility/dataset.yaml b/datasets/covid19_google_mobility/dataset.yaml new file mode 100644 index 000000000..1c17b5dcd --- /dev/null +++ b/datasets/covid19_google_mobility/dataset.yaml @@ -0,0 +1,52 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: covid19_google_mobility + friendly_name: covid19_google_mobility + description: |- + Terms of use + In order to download or use the data or reports, you must agree to the Google Terms of Service: https://policies.google.com/terms + + Description + This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. + + This dataset is intended to help remediate the impact of COVID-19. It shouldn’t be used for medical diagnostic, prognostic, or treatment purposes. It also isn’t intended to be used for guidance on personal travel plans. + + To learn more about the dataset, the place categories, and how we calculate these trends and preserve privacy, do the following: + + • Visit the help center: https://support.google.com/covid19-mobility. + + • Or, read the dataset documentation: https://www.google.com/covid19/mobility/data_documentation.html. + dataset_sources: ~ + terms_of_use: ~ + + +resources: + - type: bigquery_dataset + dataset_id: covid19_google_mobility + description: |- + Terms of use + In order to download or use the data or reports, you must agree to the Google Terms of Service: https://policies.google.com/terms + + Description + This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. + + This dataset is intended to help remediate the impact of COVID-19. It shouldn’t be used for medical diagnostic, prognostic, or treatment purposes. It also isn’t intended to be used for guidance on personal travel plans. + + To learn more about the dataset, the place categories, and how we calculate these trends and preserve privacy, do the following: + + • Visit the help center: https://support.google.com/covid19-mobility. + + • Or, read the dataset documentation: https://www.google.com/covid19/mobility/data_documentation.html. diff --git a/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py b/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py new file mode 100644 index 000000000..e5fcfd717 --- /dev/null +++ b/datasets/covid19_google_mobility/mobility_report/mobility_report_dag.py @@ -0,0 +1,176 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="covid19_google_mobility.mobility_report", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + mobility_report_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="mobility_report_transform_csv", + startup_timeout_seconds=600, + name="mobility_report", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.covid19_google_mobility.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/covid19_google_mobility/mobility_report/data_output.csv", + "PIPELINE_NAME": "mobility_report", + "CSV_HEADERS": '["country_region_code" ,"country_region" ,"sub_region_1" ,"sub_region_2" ,"metro_area" ,"iso_3166_2_code" ,"census_fips_code" ,"place_id" ,"date" ,"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline"]', + "RENAME_MAPPINGS": '{"country_region_code":"country_region_code" ,"country_region":"country_region" ,"sub_region_1":"sub_region_1" ,"sub_region_2":"sub_region_2" ,"metro_area":"metro_area" ,"iso_3166_2_code":"iso_3166_2_code" ,"census_fips_code":"census_fips_code" ,"place_id":"place_id" ,"date":"date" ,"retail_and_recreation_percent_change_from_baseline":"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline":"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline":"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline":"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline":"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline":"residential_percent_change_from_baseline"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_mobility_report_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_mobility_report_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/covid19_google_mobility/mobility_report/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="covid19_google_mobility.mobility_report", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "country_region_code", + "type": "string", + "description": "2 letter alpha code for the country/region in which changes are measured relative to the baseline. These values correspond with the ISO 3166-1 alpha-2 codes", + "mode": "nullable", + }, + { + "name": "country_region", + "type": "string", + "description": "The country/region in which changes are measured relative to the baseline", + "mode": "nullable", + }, + { + "name": "sub_region_1", + "type": "string", + "description": "First geographic sub-region in which the data is aggregated. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities", + "mode": "nullable", + }, + { + "name": "sub_region_2", + "type": "string", + "description": "Second geographic sub-region in which the data is aggregated. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities", + "mode": "nullable", + }, + { + "name": "metro_area", + "type": "string", + "description": "A specific metro area to measure mobility within a given city/metro area. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities", + "mode": "nullable", + }, + { + "name": "iso_3166_2_code", + "type": "string", + "description": "Unique identifier for the geographic region as defined by ISO Standard 3166-2.", + "mode": "nullable", + }, + { + "name": "census_fips_code", + "type": "string", + "description": "Unique identifier for each US county as defined by the US Census Bureau. Maps to county_fips_code in other tables", + "mode": "nullable", + }, + { + "name": "place_id", + "type": "string", + "description": "A textual identifier that uniquely identifies a place in the Google Places database and on Google Maps (details). For example ChIJd_Y0eVIvkIARuQyDN0F1LBA. For details see the following link: https://developers.google.com/places/web-service/place-id", + "mode": "nullable", + }, + { + "name": "date", + "type": "date", + "description": "Changes for a given date as compared to baseline. Baseline is the median value for the corresponding day of the week during the 5-week period Jan 3–Feb 6 2020.", + "mode": "nullable", + }, + { + "name": "retail_and_recreation_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places like restaurants cafes shopping centers theme parks museums libraries and movie theaters.", + "mode": "nullable", + }, + { + "name": "grocery_and_pharmacy_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places like grocery markets food warehouses farmers markets specialty food shops drug stores and pharmacies.", + "mode": "nullable", + }, + { + "name": "parks_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places like local parks national parks public beaches marinas dog parks plazas and public gardens.", + "mode": "nullable", + }, + { + "name": "transit_stations_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places like public transport hubs such as subway bus and train stations.", + "mode": "nullable", + }, + { + "name": "workplaces_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places of work.", + "mode": "nullable", + }, + { + "name": "residential_percent_change_from_baseline", + "type": "integer", + "description": "Mobility trends for places of residence.", + "mode": "nullable", + }, + ], + ) + + mobility_report_transform_csv >> load_mobility_report_to_bq diff --git a/datasets/covid19_google_mobility/mobility_report/pipeline.yaml b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml new file mode 100644 index 000000000..e95e3cd33 --- /dev/null +++ b/datasets/covid19_google_mobility/mobility_report/pipeline.yaml @@ -0,0 +1,142 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: mobility_report + description: "Terms of use By downloading or using the data, you agree to Google's Terms of Service: https://policies.google.com/terms Description This dataset aims to provide insights into what has changed in response to policies aimed at combating COVID-19. It reports movement trends over time by geography, across different categories of places such as retail and recreation, groceries and pharmacies, parks, transit stations, workplaces, and residential. This dataset is intended to help remediate the impact of COVID-19. It shouldn’t be used for medical diagnostic, prognostic, or treatment purposes. It also isn’t intended to be used for guidance on personal travel plans. To learn more about the dataset, the place categories and how we calculate these trends and preserve privacy, read the data documentation: https://www.google.com/covid19/mobility/data_documentation.html" + +dag: + airflow_version: 1 + initialize: + dag_id: mobility_report + default_args: + owner: "Google" + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "mobility_report_transform_csv" + startup_timeout_seconds: 600 + name: "mobility_report" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.covid19_google_mobility.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/covid19_google_mobility/mobility_report/data_output.csv" + PIPELINE_NAME: "mobility_report" + CSV_HEADERS: >- + ["country_region_code" ,"country_region" ,"sub_region_1" ,"sub_region_2" ,"metro_area" ,"iso_3166_2_code" ,"census_fips_code" ,"place_id" ,"date" ,"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline"] + RENAME_MAPPINGS: >- + {"country_region_code":"country_region_code" ,"country_region":"country_region" ,"sub_region_1":"sub_region_1" ,"sub_region_2":"sub_region_2" ,"metro_area":"metro_area" ,"iso_3166_2_code":"iso_3166_2_code" ,"census_fips_code":"census_fips_code" ,"place_id":"place_id" ,"date":"date" ,"retail_and_recreation_percent_change_from_baseline":"retail_and_recreation_percent_change_from_baseline" ,"grocery_and_pharmacy_percent_change_from_baseline":"grocery_and_pharmacy_percent_change_from_baseline" ,"parks_percent_change_from_baseline":"parks_percent_change_from_baseline" ,"transit_stations_percent_change_from_baseline":"transit_stations_percent_change_from_baseline" ,"workplaces_percent_change_from_baseline":"workplaces_percent_change_from_baseline" ,"residential_percent_change_from_baseline":"residential_percent_change_from_baseline"} + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_mobility_report_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/covid19_google_mobility/mobility_report/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "covid19_google_mobility.mobility_report" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "country_region_code" + type: "string" + description: "2 letter alpha code for the country/region in which changes are measured relative to the baseline. These values correspond with the ISO 3166-1 alpha-2 codes" + mode: "nullable" + - name: "country_region" + type: "string" + description: "The country/region in which changes are measured relative to the baseline" + mode: "nullable" + - name: "sub_region_1" + type: "string" + description: "First geographic sub-region in which the data is aggregated. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities" + mode: "nullable" + - name: "sub_region_2" + type: "string" + description: "Second geographic sub-region in which the data is aggregated. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities" + mode: "nullable" + - name: "metro_area" + type: "string" + description: "A specific metro area to measure mobility within a given city/metro area. This varies by country/region to ensure privacy and public health value in consultation with local public health authorities" + mode: "nullable" + - name: "iso_3166_2_code" + type: "string" + description: "Unique identifier for the geographic region as defined by ISO Standard 3166-2." + mode: "nullable" + - name: "census_fips_code" + type: "string" + description: "Unique identifier for each US county as defined by the US Census Bureau. Maps to county_fips_code in other tables" + mode: "nullable" + - name: "place_id" + type: "string" + description: "A textual identifier that uniquely identifies a place in the Google Places database and on Google Maps (details). For example ChIJd_Y0eVIvkIARuQyDN0F1LBA. For details see the following link: https://developers.google.com/places/web-service/place-id" + mode: "nullable" + - name: "date" + type: "date" + description: "Changes for a given date as compared to baseline. Baseline is the median value for the corresponding day of the week during the 5-week period Jan 3–Feb 6 2020." + mode: "nullable" + - name: "retail_and_recreation_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places like restaurants cafes shopping centers theme parks museums libraries and movie theaters." + mode: "nullable" + - name: "grocery_and_pharmacy_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places like grocery markets food warehouses farmers markets specialty food shops drug stores and pharmacies." + mode: "nullable" + - name: "parks_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places like local parks national parks public beaches marinas dog parks plazas and public gardens." + mode: "nullable" + - name: "transit_stations_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places like public transport hubs such as subway bus and train stations." + mode: "nullable" + - name: "workplaces_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places of work." + mode: "nullable" + - name: "residential_percent_change_from_baseline" + type: "integer" + description: "Mobility trends for places of residence." + mode: "nullable" + + graph_paths: + - "mobility_report_transform_csv >> load_mobility_report_to_bq"