From 4b495cc24d7f769b86edfad172e2f7c3fc5d3fd7 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Mon, 20 Sep 2021 15:09:33 +0000 Subject: [PATCH 1/5] feat: San Francisco Film Locations --- .../_images/run_csv_transform_kub/Dockerfile | 38 ++++ .../run_csv_transform_kub/csv_transform.py | 171 ++++++++++++++++++ .../run_csv_transform_kub/requirements.txt | 3 + .../_terraform/film_locations_pipeline.tf | 39 ++++ .../_terraform/provider.tf | 28 +++ .../san_francisco_film_locations_dataset.tf | 26 +++ .../_terraform/variables.tf | 23 +++ .../san_francisco_film_locations/dataset.yaml | 27 +++ .../film_locations/film_locations_dag.py | 142 +++++++++++++++ .../film_locations/pipeline.yaml | 125 +++++++++++++ 10 files changed, 622 insertions(+) create mode 100644 datasets/san_francisco_film_locations/_images/run_csv_transform_kub/Dockerfile create mode 100644 datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/san_francisco_film_locations/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/san_francisco_film_locations/_terraform/film_locations_pipeline.tf create mode 100644 datasets/san_francisco_film_locations/_terraform/provider.tf create mode 100644 datasets/san_francisco_film_locations/_terraform/san_francisco_film_locations_dataset.tf create mode 100644 datasets/san_francisco_film_locations/_terraform/variables.tf create mode 100644 datasets/san_francisco_film_locations/dataset.yaml create mode 100644 datasets/san_francisco_film_locations/film_locations/film_locations_dag.py create mode 100644 datasets/san_francisco_film_locations/film_locations/pipeline.yaml diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/Dockerfile b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..85af90570 --- /dev/null +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,38 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..e1d1216ac --- /dev/null +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,171 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# import modules +import datetime +import logging +import os +import pathlib +import subprocess + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + + logging.info("San Francisco - Film Locations process started") + + logging.info("Creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file {source_url}") + download_file(source_url, source_file) + + chunksz = int(chunksize) + + logging.info(f"Opening batch file {source_file}") + with pd.read_csv( + source_file, engine="python", encoding="utf-8", quotechar='"', chunksize=chunksz + ) as reader: + for chunk_number, chunk in enumerate(reader): + logging.info(f"Processing batch {chunk_number}") + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + processChunk(df, target_file_batch) + logging.info(f"Appending batch {chunk_number} to {target_file}") + if chunk_number == 0: + subprocess.run(["cp", target_file_batch, target_file]) + else: + subprocess.check_call(f"sed -i '1d' {target_file_batch}", shell=True) + subprocess.check_call( + f"cat {target_file_batch} >> {target_file}", shell=True + ) + subprocess.run(["rm", target_file_batch]) + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info("San Francisco - Film Locations process completed") + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: + + logging.info(f"Transformation Process Starting") + + logging.info(f"Renaming Headers") + rename_headers(df) + + logging.info("Trimming Whitespace") + df["distributor"] = df["distributor"].apply(lambda x: str(x).strip()) + df["director"] = df["director"].apply(lambda x: str(x).strip()) + df["actor_2"] = df["actor_2"].apply(lambda x: str(x).strip()) + + logging.info("Reordering headers..") + df = df[ + [ + "title", + "release_year", + "locations", + "fun_facts", + "production_company", + "distributor", + "director", + "writer", + "actor_1", + "actor_2", + "actor_3" + ] + ] + + logging.info(f"Saving to target file.. {target_file_batch}") + + try: + save_to_new_file(df, file_path=str(target_file_batch)) + except Exception as e: + logging.error(f"Error saving to target file: {e}.") + + logging.info(f"Saved transformed source data to target file .. {target_file_batch}") + + +def convert_dt_format(dt_str: str) -> str: + if not dt_str or dt_str == "nan": + return str("") + else: + return str(datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime("%Y-%m-%d %H:%M:%S")) + + +def rename_headers(df: pd.DataFrame) -> None: + header_names = { + "Title": "title", + "Release Year": "release_year", + "Locations": "locations", + "Fun Facts": "fun_facts", + "Production Company": "production_company", + "Distributor": "distributor", + "Director": "director", + "Writer": "writer", + "Actor 1": "actor_1", + "Actor 2": "actor_2", + "Actor 3": "actor_3" + } + + df = df.rename(columns=header_names, inplace=True) + + +def save_to_new_file(df: pd.DataFrame, file_path) -> None: + df.to_csv(file_path, index=False) + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + ) diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/requirements.txt b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/san_francisco_film_locations/_terraform/film_locations_pipeline.tf b/datasets/san_francisco_film_locations/_terraform/film_locations_pipeline.tf new file mode 100644 index 000000000..11566f97d --- /dev/null +++ b/datasets/san_francisco_film_locations/_terraform/film_locations_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "film_locations" { + project = var.project_id + dataset_id = "san_francisco_film_locations" + table_id = "film_locations" + + description = "san_francisco_film_locationsspc" + + + + + depends_on = [ + google_bigquery_dataset.san_francisco_film_locations + ] +} + +output "bigquery_table-film_locations-table_id" { + value = google_bigquery_table.film_locations.table_id +} + +output "bigquery_table-film_locations-id" { + value = google_bigquery_table.film_locations.id +} diff --git a/datasets/san_francisco_film_locations/_terraform/provider.tf b/datasets/san_francisco_film_locations/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/san_francisco_film_locations/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/san_francisco_film_locations/_terraform/san_francisco_film_locations_dataset.tf b/datasets/san_francisco_film_locations/_terraform/san_francisco_film_locations_dataset.tf new file mode 100644 index 000000000..35ace259d --- /dev/null +++ b/datasets/san_francisco_film_locations/_terraform/san_francisco_film_locations_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "san_francisco_film_locations" { + dataset_id = "san_francisco_film_locations" + project = var.project_id + description = "san_francisco_film_locations" +} + +output "bigquery_dataset-san_francisco_film_locations-dataset_id" { + value = google_bigquery_dataset.san_francisco_film_locations.dataset_id +} diff --git a/datasets/san_francisco_film_locations/_terraform/variables.tf b/datasets/san_francisco_film_locations/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/san_francisco_film_locations/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/san_francisco_film_locations/dataset.yaml b/datasets/san_francisco_film_locations/dataset.yaml new file mode 100644 index 000000000..c539636d3 --- /dev/null +++ b/datasets/san_francisco_film_locations/dataset.yaml @@ -0,0 +1,27 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: san_francisco_film_locations + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + + +resources: + + - type: bigquery_dataset + dataset_id: san_francisco_film_locations + description: san_francisco_film_locations diff --git a/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py b/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py new file mode 100644 index 000000000..e96c686a2 --- /dev/null +++ b/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py @@ -0,0 +1,142 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="san_francisco_film_locations.film_locations", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="film_locations", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.san_francisco_film_locations.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://data.sfgov.org/api/views/yitu-d5am/rows.csv", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "1000000", + "TARGET_GCS_BUCKET": "{{ var.values.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco_film_locations/film_locations/data_output.csv", + }, + resources={"limit_memory": "2G", "limit_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.values.composer_bucket }}", + source_objects=[ + "data/san_francisco_film_locations/film_locations/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="san_francisco_film_locations.film_locations", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "title", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "release_year", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "locations", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "fun_facts", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "production_company", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "distributor", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "director", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + {"name": "writer", "type": "STRING", "description": "", "mode": "NULLABLE"}, + { + "name": "actor_1", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "actor_2", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "actor_3", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/san_francisco_film_locations/film_locations/pipeline.yaml b/datasets/san_francisco_film_locations/film_locations/pipeline.yaml new file mode 100644 index 000000000..7a18496e3 --- /dev/null +++ b/datasets/san_francisco_film_locations/film_locations/pipeline.yaml @@ -0,0 +1,125 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "film_locations" + description: "If you love movies, and you love San Francisco, you're bound to love this -- a listing of filming locations of movies shot in San Francisco starting from 1924. You'll find the titles, locations, fun facts, names of the director, writer, actors, and studio for most of these films." + +dag: + airflow_version: 2 + initialize: + dag_id: film_locations + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "film_locations" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco_film_locations.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://data.sfgov.org/api/views/yitu-d5am/rows.csv" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "1000000" + TARGET_GCS_BUCKET: "{{ var.values.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco_film_locations/film_locations/data_output.csv" + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.values.composer_bucket }}" + source_objects: ["data/san_francisco_film_locations/film_locations/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "san_francisco_film_locations.film_locations" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - "name": "title" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "release_year" + "type": "INTEGER" + "description": "" + "mode": "NULLABLE" + - "name": "locations" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "fun_facts" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "production_company" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "distributor" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "director" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "writer" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "actor_1" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "actor_2" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "actor_3" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" From 03e37442ba888e128824ada74ddf600d6ec478dd Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Tue, 21 Sep 2021 17:08:22 +0000 Subject: [PATCH 2/5] fix: resolved BQ write error by adding 'added_quoted_newlines' to pipeline.yaml --- .../film_locations/film_locations_dag.py | 5 +++-- .../film_locations/pipeline.yaml | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py b/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py index e96c686a2..52360ab96 100644 --- a/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py +++ b/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py @@ -62,7 +62,7 @@ "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", "CHUNKSIZE": "1000000", - "TARGET_GCS_BUCKET": "{{ var.values.composer_bucket }}", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/san_francisco_film_locations/film_locations/data_output.csv", }, resources={"limit_memory": "2G", "limit_cpu": "1"}, @@ -71,13 +71,14 @@ # Task to load CSV data to a BigQuery table load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", - bucket="{{ var.values.composer_bucket }}", + bucket="{{ var.value.composer_bucket }}", source_objects=[ "data/san_francisco_film_locations/film_locations/data_output.csv" ], source_format="CSV", destination_project_dataset_table="san_francisco_film_locations.film_locations", skip_leading_rows=1, + allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", schema_fields=[ {"name": "title", "type": "STRING", "description": "", "mode": "NULLABLE"}, diff --git a/datasets/san_francisco_film_locations/film_locations/pipeline.yaml b/datasets/san_francisco_film_locations/film_locations/pipeline.yaml index 7a18496e3..5a3a23996 100644 --- a/datasets/san_francisco_film_locations/film_locations/pipeline.yaml +++ b/datasets/san_francisco_film_locations/film_locations/pipeline.yaml @@ -58,7 +58,7 @@ dag: SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" CHUNKSIZE: "1000000" - TARGET_GCS_BUCKET: "{{ var.values.composer_bucket }}" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco_film_locations/film_locations/data_output.csv" resources: limit_memory: "2G" @@ -69,11 +69,12 @@ dag: args: task_id: "load_to_bq" - bucket: "{{ var.values.composer_bucket }}" + bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/san_francisco_film_locations/film_locations/data_output.csv"] source_format: "CSV" destination_project_dataset_table: "san_francisco_film_locations.film_locations" skip_leading_rows: 1 + allow_quoted_newlines: True write_disposition: "WRITE_TRUNCATE" schema_fields: - "name": "title" From f09b39d21391be4669a89120e115801cc4846e2b Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Tue, 21 Sep 2021 17:36:33 +0000 Subject: [PATCH 3/5] fix: resolved flake8 issues --- .../_images/run_csv_transform_kub/csv_transform.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py index e1d1216ac..12acb929a 100644 --- a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py @@ -85,9 +85,9 @@ def download_file(source_url: str, source_file: pathlib.Path) -> None: def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: - logging.info(f"Transformation Process Starting") + logging.info("Transformation Process Starting") - logging.info(f"Renaming Headers") + logging.info("Renaming Headers") rename_headers(df) logging.info("Trimming Whitespace") @@ -108,7 +108,7 @@ def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: "writer", "actor_1", "actor_2", - "actor_3" + "actor_3", ] ] @@ -126,7 +126,11 @@ def convert_dt_format(dt_str: str) -> str: if not dt_str or dt_str == "nan": return str("") else: - return str(datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime("%Y-%m-%d %H:%M:%S")) + return str( + datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime( + "%Y-%m-%d %H:%M:%S" + ) + ) def rename_headers(df: pd.DataFrame) -> None: @@ -141,7 +145,7 @@ def rename_headers(df: pd.DataFrame) -> None: "Writer": "writer", "Actor 1": "actor_1", "Actor 2": "actor_2", - "Actor 3": "actor_3" + "Actor 3": "actor_3", } df = df.rename(columns=header_names, inplace=True) From 8b2186a6129380d4f1871d1d1ceb9d619bd041a7 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Fri, 8 Oct 2021 16:58:42 +0000 Subject: [PATCH 4/5] fix: improved chunking format --- .../run_csv_transform_kub/csv_transform.py | 130 +++++++++--------- .../film_locations/film_locations_dag.py | 4 +- .../film_locations/pipeline.yaml | 6 +- 3 files changed, 68 insertions(+), 72 deletions(-) diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py index 12acb929a..2be1bcd75 100644 --- a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py @@ -12,12 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -# import modules -import datetime import logging import os import pathlib -import subprocess import pandas as pd import requests @@ -35,17 +32,17 @@ def main( logging.info("San Francisco - Film Locations process started") - logging.info("Creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - - logging.info(f"Downloading file {source_url}") download_file(source_url, source_file) chunksz = int(chunksize) - logging.info(f"Opening batch file {source_file}") with pd.read_csv( - source_file, engine="python", encoding="utf-8", quotechar='"', chunksize=chunksz + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=chunksz ) as reader: for chunk_number, chunk in enumerate(reader): logging.info(f"Processing batch {chunk_number}") @@ -54,20 +51,8 @@ def main( ) df = pd.DataFrame() df = pd.concat([df, chunk]) - processChunk(df, target_file_batch) - logging.info(f"Appending batch {chunk_number} to {target_file}") - if chunk_number == 0: - subprocess.run(["cp", target_file_batch, target_file]) - else: - subprocess.check_call(f"sed -i '1d' {target_file_batch}", shell=True) - subprocess.check_call( - f"cat {target_file_batch} >> {target_file}", shell=True - ) - subprocess.run(["rm", target_file_batch]) - - logging.info( - f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" - ) + process_chunk(df, target_file_batch, target_file, (not chunk_number == 0)) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) logging.info("San Francisco - Film Locations process completed") @@ -75,26 +60,52 @@ def main( def download_file(source_url: str, source_file: pathlib.Path) -> None: r = requests.get(source_url, stream=True) - if r.status_code == 200: - with open(source_file, "wb") as f: - for chunk in r: - f.write(chunk) - else: - logging.error(f"Couldn't download {source_url}: {r.text}") + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) -def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: +def process_chunk( + df: pd.DataFrame, target_file_batch: str, target_file: str, skip_header: bool +) -> None: + df = rename_headers(df) + df = trim_whitespace(df) + df = reorder_headers(df) + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header, not(skip_header)) - logging.info("Transformation Process Starting") +def rename_headers(df: pd.DataFrame) -> None: logging.info("Renaming Headers") - rename_headers(df) + header_names = { + "Title": "title", + "Release Year": "release_year", + "Locations": "locations", + "Fun Facts": "fun_facts", + "Production Company": "production_company", + "Distributor": "distributor", + "Director": "director", + "Writer": "writer", + "Actor 1": "actor_1", + "Actor 2": "actor_2", + "Actor 3": "actor_3", + } + + df = df.rename(columns=header_names) + return df + + +def trim_whitespace(df: pd.DataFrame) -> None: logging.info("Trimming Whitespace") df["distributor"] = df["distributor"].apply(lambda x: str(x).strip()) df["director"] = df["director"].apply(lambda x: str(x).strip()) df["actor_2"] = df["actor_2"].apply(lambda x: str(x).strip()) + return df + + +def reorder_headers(df: pd.DataFrame) -> None: logging.info("Reordering headers..") df = df[ [ @@ -112,47 +123,32 @@ def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: ] ] - logging.info(f"Saving to target file.. {target_file_batch}") + return df - try: - save_to_new_file(df, file_path=str(target_file_batch)) - except Exception as e: - logging.error(f"Error saving to target file: {e}.") - logging.info(f"Saved transformed source data to target file .. {target_file_batch}") +def save_to_new_file(df: pd.DataFrame, file_path) -> None: + df.to_csv(file_path, index=False) -def convert_dt_format(dt_str: str) -> str: - if not dt_str or dt_str == "nan": - return str("") - else: - return str( - datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime( - "%Y-%m-%d %H:%M:%S" - ) +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + data_file = open(batch_file_path, "r") + if truncate_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" ) - - -def rename_headers(df: pd.DataFrame) -> None: - header_names = { - "Title": "title", - "Release Year": "release_year", - "Locations": "locations", - "Fun Facts": "fun_facts", - "Production Company": "production_company", - "Distributor": "distributor", - "Director": "director", - "Writer": "writer", - "Actor 1": "actor_1", - "Actor 2": "actor_2", - "Actor 3": "actor_3", - } - - df = df.rename(columns=header_names, inplace=True) - - -def save_to_new_file(df: pd.DataFrame, file_path) -> None: - df.to_csv(file_path, index=False) + next(data_file) + else: + logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: diff --git a/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py b/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py index 52360ab96..664ca24b3 100644 --- a/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py +++ b/datasets/san_francisco_film_locations/film_locations/film_locations_dag.py @@ -61,11 +61,11 @@ "SOURCE_URL": "https://data.sfgov.org/api/views/yitu-d5am/rows.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", - "CHUNKSIZE": "1000000", + "CHUNKSIZE": "750000", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/san_francisco_film_locations/film_locations/data_output.csv", }, - resources={"limit_memory": "2G", "limit_cpu": "1"}, + resources={"limit_memory": "8G", "limit_cpu": "3"}, ) # Task to load CSV data to a BigQuery table diff --git a/datasets/san_francisco_film_locations/film_locations/pipeline.yaml b/datasets/san_francisco_film_locations/film_locations/pipeline.yaml index 5a3a23996..d77b4829e 100644 --- a/datasets/san_francisco_film_locations/film_locations/pipeline.yaml +++ b/datasets/san_francisco_film_locations/film_locations/pipeline.yaml @@ -57,12 +57,12 @@ dag: SOURCE_URL: "https://data.sfgov.org/api/views/yitu-d5am/rows.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" - CHUNKSIZE: "1000000" + CHUNKSIZE: "750000" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco_film_locations/film_locations/data_output.csv" resources: - limit_memory: "2G" - limit_cpu: "1" + limit_memory: "8G" + limit_cpu: "3" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" From 95dae9801c8dcdc85f47b9edb3a251359b2c86f3 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Fri, 8 Oct 2021 17:54:00 +0000 Subject: [PATCH 5/5] fix: Resolved pre-commit issues --- .../_images/run_csv_transform_kub/csv_transform.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py index 2be1bcd75..5b2ad9057 100644 --- a/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_film_locations/_images/run_csv_transform_kub/csv_transform.py @@ -38,11 +38,7 @@ def main( chunksz = int(chunksize) with pd.read_csv( - source_file, - engine="python", - encoding="utf-8", - quotechar='"', - chunksize=chunksz + source_file, engine="python", encoding="utf-8", quotechar='"', chunksize=chunksz ) as reader: for chunk_number, chunk in enumerate(reader): logging.info(f"Processing batch {chunk_number}") @@ -72,7 +68,7 @@ def process_chunk( df = trim_whitespace(df) df = reorder_headers(df) save_to_new_file(df, file_path=str(target_file_batch)) - append_batch_file(target_file_batch, target_file, skip_header, not(skip_header)) + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) def rename_headers(df: pd.DataFrame) -> None: