From 2284e09d7c775d3d9826037f0c8cebd572090f8d Mon Sep 17 00:00:00 2001 From: Nicholas Large <84149918+nlarge-google@users.noreply.github.com> Date: Wed, 13 Oct 2021 09:44:32 -0500 Subject: [PATCH] feat: Onboard San francisco film locations (#190) --- .../_images/run_csv_transform_kub/Dockerfile | 38 ++++ .../run_csv_transform_kub/csv_transform.py | 167 ++++++++++++++++++ .../run_csv_transform_kub/requirements.txt | 3 + .../_terraform/film_locations_pipeline.tf | 39 ++++ .../_terraform/provider.tf | 28 +++ .../san_francisco_film_locations_dataset.tf | 26 +++ .../_terraform/variables.tf | 23 +++ .../san_francisco_film_locations/dataset.yaml | 27 +++ .../film_locations/film_locations_dag.py | 143 +++++++++++++++ .../film_locations/pipeline.yaml | 126 +++++++++++++ 10 files changed, 620 insertions(+) create mode 100644 datasets/san_francisco_film_locations/_images/run_csv_transform_kub/Dockerfile create mode 100644 datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/san_francisco_film_locations/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/san_francisco_film_locations/_terraform/film_locations_pipeline.tf create mode 100644 datasets/san_francisco_film_locations/_terraform/provider.tf create mode 100644 datasets/san_francisco_film_locations/_terraform/san_francisco_film_locations_dataset.tf create mode 100644 datasets/san_francisco_film_locations/_terraform/variables.tf create mode 100644 datasets/san_francisco_film_locations/dataset.yaml create mode 100644 datasets/san_francisco_film_locations/film_locations/film_locations_dag.py create mode 100644 datasets/san_francisco_film_locations/film_locations/pipeline.yaml diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/Dockerfile b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..85af90570 --- /dev/null +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,38 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..5b2ad9057 --- /dev/null +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,167 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import pathlib + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + + logging.info("San Francisco - Film Locations process started") + + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + download_file(source_url, source_file) + + chunksz = int(chunksize) + + with pd.read_csv( + source_file, engine="python", encoding="utf-8", quotechar='"', chunksize=chunksz + ) as reader: + for chunk_number, chunk in enumerate(reader): + logging.info(f"Processing batch {chunk_number}") + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk(df, target_file_batch, target_file, (not chunk_number == 0)) + + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info("San Francisco - Film Locations process completed") + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + r = requests.get(source_url, stream=True) + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + + +def process_chunk( + df: pd.DataFrame, target_file_batch: str, target_file: str, skip_header: bool +) -> None: + df = rename_headers(df) + df = trim_whitespace(df) + df = reorder_headers(df) + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + + +def rename_headers(df: pd.DataFrame) -> None: + logging.info("Renaming Headers") + header_names = { + "Title": "title", + "Release Year": "release_year", + "Locations": "locations", + "Fun Facts": "fun_facts", + "Production Company": "production_company", + "Distributor": "distributor", + "Director": "director", + "Writer": "writer", + "Actor 1": "actor_1", + "Actor 2": "actor_2", + "Actor 3": "actor_3", + } + + df = df.rename(columns=header_names) + + return df + + +def trim_whitespace(df: pd.DataFrame) -> None: + logging.info("Trimming Whitespace") + df["distributor"] = df["distributor"].apply(lambda x: str(x).strip()) + df["director"] = df["director"].apply(lambda x: str(x).strip()) + df["actor_2"] = df["actor_2"].apply(lambda x: str(x).strip()) + + return df + + +def reorder_headers(df: pd.DataFrame) -> None: + logging.info("Reordering headers..") + df = df[ + [ + "title", + "release_year", + "locations", + "fun_facts", + "production_company", + "distributor", + "director", + "writer", + "actor_1", + "actor_2", + "actor_3", + ] + ] + + return df + + +def save_to_new_file(df: pd.DataFrame, file_path) -> None: + df.to_csv(file_path, index=False) + + +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + data_file = open(batch_file_path, "r") + if truncate_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + ) diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/requirements.txt b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/san_francisco_film_locations/_terraform/film_locations_pipeline.tf b/datasets/san_francisco_film_locations/_terraform/film_locations_pipeline.tf new file mode 100644 index 000000000..11566f97d --- /dev/null +++ b/datasets/san_francisco_film_locations/_terraform/film_locations_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "film_locations" { + project = var.project_id + dataset_id = "san_francisco_film_locations" + table_id = "film_locations" + + description = "san_francisco_film_locationsspc" + + + + + depends_on = [ + google_bigquery_dataset.san_francisco_film_locations + ] +} + +output "bigquery_table-film_locations-table_id" { + value = google_bigquery_table.film_locations.table_id +} + +output "bigquery_table-film_locations-id" { + value = google_bigquery_table.film_locations.id +} diff --git a/datasets/san_francisco_film_locations/_terraform/provider.tf b/datasets/san_francisco_film_locations/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/san_francisco_film_locations/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/san_francisco_film_locations/_terraform/san_francisco_film_locations_dataset.tf b/datasets/san_francisco_film_locations/_terraform/san_francisco_film_locations_dataset.tf new file mode 100644 index 000000000..35ace259d --- /dev/null +++ b/datasets/san_francisco_film_locations/_terraform/san_francisco_film_locations_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "san_francisco_film_locations" { + dataset_id = "san_francisco_film_locations" + project = var.project_id + description = "san_francisco_film_locations" +} + +output "bigquery_dataset-san_francisco_film_locations-dataset_id" { + value = google_bigquery_dataset.san_francisco_film_locations.dataset_id +} diff --git a/datasets/san_francisco_film_locations/_terraform/variables.tf b/datasets/san_francisco_film_locations/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/san_francisco_film_locations/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/san_francisco_film_locations/dataset.yaml b/datasets/san_francisco_film_locations/dataset.yaml new file mode 100644 index 000000000..c539636d3 --- /dev/null +++ b/datasets/san_francisco_film_locations/dataset.yaml @@ -0,0 +1,27 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: san_francisco_film_locations + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + + +resources: + + - type: bigquery_dataset + dataset_id: san_francisco_film_locations + description: san_francisco_film_locations diff --git a/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py b/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py new file mode 100644 index 000000000..664ca24b3 --- /dev/null +++ b/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py @@ -0,0 +1,143 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="san_francisco_film_locations.film_locations", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="film_locations", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.san_francisco_film_locations.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://data.sfgov.org/api/views/yitu-d5am/rows.csv", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "750000", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco_film_locations/film_locations/data_output.csv", + }, + resources={"limit_memory": "8G", "limit_cpu": "3"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=[ + "data/san_francisco_film_locations/film_locations/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="san_francisco_film_locations.film_locations", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "title", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "release_year", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "locations", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "fun_facts", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "production_company", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "distributor", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "director", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "writer", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "actor_1", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "actor_2", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "actor_3", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/san_francisco_film_locations/film_locations/pipeline.yaml b/datasets/san_francisco_film_locations/film_locations/pipeline.yaml new file mode 100644 index 000000000..d77b4829e --- /dev/null +++ b/datasets/san_francisco_film_locations/film_locations/pipeline.yaml @@ -0,0 +1,126 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "film_locations" + description: "If you love movies, and you love San Francisco, you're bound to love this -- a listing of filming locations of movies shot in San Francisco starting from 1924. You'll find the titles, locations, fun facts, names of the director, writer, actors, and studio for most of these films." + +dag: + airflow_version: 2 + initialize: + dag_id: film_locations + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "film_locations" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco_film_locations.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://data.sfgov.org/api/views/yitu-d5am/rows.csv" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "750000" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco_film_locations/film_locations/data_output.csv" + resources: + limit_memory: "8G" + limit_cpu: "3" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/san_francisco_film_locations/film_locations/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "san_francisco_film_locations.film_locations" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - "name": "title" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "release_year" + "type": "INTEGER" + "description": "" + "mode": "NULLABLE" + - "name": "locations" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "fun_facts" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "production_company" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "distributor" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "director" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "writer" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "actor_1" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "actor_2" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "actor_3" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq"