From 724c82469310859d16463fd05c9aee02205ed5d3 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Sat, 11 Sep 2021 04:44:47 +0000 Subject: [PATCH 01/12] feat: san-francisco 311 Service Requests - Untested do not use --- .../311_service_requests/pipeline.yaml | 157 ++++++++++++++++++ .../Dockerfile | 38 +++++ .../csv_transform.py | 150 +++++++++++++++++ .../requirements.txt | 3 + .../311_service_requests_pipeline.tf | 39 +++++ .../_terraform/provider.tf | 28 ++++ ..._francisco_311_service_requests_dataset.tf | 26 +++ .../_terraform/variables.tf | 23 +++ .../dataset.yaml | 27 +++ 9 files changed, 491 insertions(+) create mode 100644 datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml create mode 100644 datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/Dockerfile create mode 100644 datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py create mode 100644 datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/requirements.txt create mode 100644 datasets/san_francisco_311_service_requests/_terraform/311_service_requests_pipeline.tf create mode 100644 datasets/san_francisco_311_service_requests/_terraform/provider.tf create mode 100644 datasets/san_francisco_311_service_requests/_terraform/san_francisco_311_service_requests_dataset.tf create mode 100644 datasets/san_francisco_311_service_requests/_terraform/variables.tf create mode 100644 datasets/san_francisco_311_service_requests/dataset.yaml diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml new file mode 100644 index 000000000..893dba01e --- /dev/null +++ b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml @@ -0,0 +1,157 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "311_service_requests" + description: "san_francisco_311_service_requests" + +dag: + airflow_version: 2 + initialize: + dag_id: 311_service_requests + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "311_service_requests" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco_311_service_requests.container_registry.run_csv_transform_kub_311_service_requests }}" + env_vars: + SOURCE_URL: "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.values.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco_311_service_requests/311_service_requests/data_output.csv" + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.values.composer_bucket }}" + source_objects: ["data/san_francisco_311_service_requests/311_service_requests/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "san_francisco_311_service_requests.311_service_requests" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "unique_key" + type: "INTEGER" + description: "Unique case id" + mode: "REQUIRED" + - name: "created_date" + type: "TIMESTAMP" + description: "The date and time when the service request was made" + mode: "NULLABLE" + - name: "closed_date" + type: "TIMESTAMP" + description: "The date and time when the service request was closed" + mode: "NULLABLE" + - name: "resolution_action_updated_date" + type: "TIMESTAMP" + description: "The date and time when the service request was last modified. For requests with status=closed, this will be the date the request was closed" + mode: "NULLABLE" + - name: "status" + type: "STRING" + description: "The current status of the service request." + mode: "NULLABLE" + - name: "status_notes" + type: "STRING" + description: "Explanation of why status was changed to current state or more details on current status than conveyed with status alone" + mode: "NULLABLE" + - name: "agency_name" + type: "STRING" + description: "The agency responsible for fulfilling or otherwise addressing the service request." + mode: "NULLABLE" + - name: "category" + type: "STRING" + description: "The Human readable name of the specific service request type (service_name)" + mode: "NULLABLE" + - name: "complaint_type" + type: "STRING" + description: "More specific description of the problem related to the Category" + mode: "NULLABLE" + - name: "descriptor" + type: "STRING" + description: "More specific description of the problem related to the Request Type" + mode: "NULLABLE" + - name: "incident_address" + type: "STRING" + description: "Human readable address or description of location" + mode: "NULLABLE" + - name: "supervisor_district" + type: "INTEGER" + description: "" + mode: "NULLABLE" + - name: "neighborhood" + type: "STRING" + description: "" + mode: "NULLABLE" + - name: "location" + type: "STRING" + description: "Latitude and longitude using the (WGS84) projection." + mode: "NULLABLE" + - name: "source" + type: "STRING" + description: "How the service request was made" + mode: "NULLABLE" + - name: "media_url" + type: "STRING" + description: "Website URL" + mode: "NULLABLE" + - name: "latitude" + type: "FLOAT" + description: "Latitude using the (WGS84) projection." + mode: "NULLABLE" + - name: "longitude" + type: "FLOAT" + description: "Longitude using the (WGS84) projection." + mode: "NULLABLE" + - name: "police_district" + type: "STRING" + description: "" + mode: "NULLABLE" + + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/Dockerfile b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/Dockerfile new file mode 100644 index 000000000..85af90570 --- /dev/null +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/Dockerfile @@ -0,0 +1,38 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py new file mode 100644 index 000000000..c472537a0 --- /dev/null +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py @@ -0,0 +1,150 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# import modules +import datetime +import logging +import os +import pathlib + +import pdb +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + + logging.info(f"San Francisco - 311 Service Requests process started") + + logging.info("creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"downloading file {source_url}") + download_file(source_url, source_file) + + logging.info(f"Opening file {source_file}") + df = pd.read_csv(source_file) + + logging.info(f"Transformation Process Starting.. {source_file}") + + logging.info(f"Transform: Renaming Headers.. {source_file}") + rename_headers(df) + + df = df[ df["unique_key"] != ""] + + df["incident_address"] = df["incident_address"].strip() + + pdb.set_trace() + + df["latitude"].str.replace("(", "").replace(")", "") + df["longitude"].str.replace("(", "").replace(")", "") + + df["created_date"] = df["created_date"].apply(convert_dt_format) + + logging.info("Transform: Reordering headers..") + df = df[ + [ + "field1", + "field2", + ] + ] + + logging.info(f"Transformation Process complete .. {source_file}") + + logging.info(f"Saving to output file.. {target_file}") + + try: + save_to_new_file(df, file_path=str(target_file), index=False) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info(f"San Francisco - 311 Service Requests process completed") + + +def convert_dt_format(dt_str: str) -> str: + if dt_str is None or len(str(dt_str)) == 0 or str(dt_str) == "nan": + return str(dt_str) + else: + return str(datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime("%Y-%m-%d %H:%M:%S")) + + +def rename_headers(df: pd.DataFrame) -> None: + header_names = { + "CaseID": "unique_key", + "Opened": "created_date", + "Closed": "closed_date", + "Updated": "solution_action_updated_date", + "Status": "status", + "Status Notes": "status_notes", + "Responsible Agency": "agency_name", + "Category": "category", + "Request Type": "complaint_type", + "Request Details": "descriptor", + "Address": "incident_address", + "Supervisor District": "supervisor_district", + "Neighborhood": "neighborhood", + "Point": "location", + "Source": "source", + "Media URL": "media_url", + "Latitude": "latitude", + "Longitude": "longitude", + "Police District": "police_district", + } + + df = df.rename(columns=header_names, inplace=True) + + +def save_to_new_file(df: pd.DataFrame, file_path) -> None: + df.to_csv(file_path) + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + ) diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/requirements.txt b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/san_francisco_311_service_requests/_terraform/311_service_requests_pipeline.tf b/datasets/san_francisco_311_service_requests/_terraform/311_service_requests_pipeline.tf new file mode 100644 index 000000000..91e6699a6 --- /dev/null +++ b/datasets/san_francisco_311_service_requests/_terraform/311_service_requests_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "bqt_311_service_requests" { + project = var.project_id + dataset_id = "san_francisco_311_service_requests" + table_id = "311_service_requests" + + description = "san_francisco_311_service_requestsspc" + + + + + depends_on = [ + google_bigquery_dataset.san_francisco_311_service_requests + ] +} + +output "bigquery_table-311_service_requests-table_id" { + value = google_bigquery_table.bqt_311_service_requests.table_id +} + +output "bigquery_table-311_service_requests-id" { + value = google_bigquery_table.bqt_311_service_requests.id +} diff --git a/datasets/san_francisco_311_service_requests/_terraform/provider.tf b/datasets/san_francisco_311_service_requests/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/san_francisco_311_service_requests/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/san_francisco_311_service_requests/_terraform/san_francisco_311_service_requests_dataset.tf b/datasets/san_francisco_311_service_requests/_terraform/san_francisco_311_service_requests_dataset.tf new file mode 100644 index 000000000..fac313b71 --- /dev/null +++ b/datasets/san_francisco_311_service_requests/_terraform/san_francisco_311_service_requests_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "san_francisco_311_service_requests" { + dataset_id = "san_francisco_311_service_requests" + project = var.project_id + description = "san_francisco_311_service_requests" +} + +output "bigquery_dataset-san_francisco_311_service_requests-dataset_id" { + value = google_bigquery_dataset.san_francisco_311_service_requests.dataset_id +} diff --git a/datasets/san_francisco_311_service_requests/_terraform/variables.tf b/datasets/san_francisco_311_service_requests/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/san_francisco_311_service_requests/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/san_francisco_311_service_requests/dataset.yaml b/datasets/san_francisco_311_service_requests/dataset.yaml new file mode 100644 index 000000000..6008b87f4 --- /dev/null +++ b/datasets/san_francisco_311_service_requests/dataset.yaml @@ -0,0 +1,27 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: san_francisco_311_service_requests + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + + +resources: + + - type: bigquery_dataset + dataset_id: san_francisco_311_service_requests + description: san_francisco_311_service_requests From bd8dbda1c940bc33979ff5d7e97c29e6b278a566 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Tue, 14 Sep 2021 17:37:15 +0000 Subject: [PATCH 02/12] fix: runs locally --- .../311_service_requests/pipeline.yaml | 2 +- .../csv_transform.py | 55 ++++++++++++++----- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml index 893dba01e..b46885477 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml +++ b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml @@ -60,7 +60,7 @@ dag: TARGET_GCS_BUCKET: "{{ var.values.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco_311_service_requests/311_service_requests/data_output.csv" resources: - limit_memory: "2G" + limit_memory: "4G" limit_cpu: "1" - operator: "GoogleCloudStorageToBigQueryOperator" diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py index c472537a0..06197f297 100644 --- a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py @@ -12,13 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -# import modules import datetime import logging import os import pathlib -import pdb import pandas as pd import requests from google.cloud import storage @@ -48,22 +46,51 @@ def main( logging.info(f"Transform: Renaming Headers.. {source_file}") rename_headers(df) + logging.info(f"Transform: Remove rows with empty keys.. {source_file}") df = df[ df["unique_key"] != ""] - df["incident_address"] = df["incident_address"].strip() + logging.info(f"Transform: Strip whitespace from incident address.. {source_file}") + df['incident_address'] = df['incident_address'].apply(lambda x: str(x).strip()) - pdb.set_trace() - - df["latitude"].str.replace("(", "").replace(")", "") - df["longitude"].str.replace("(", "").replace(")", "") + logging.info(f"Transform: Remove parenthesis from latitude and longitude.. {source_file}") + df["latitude"].replace("(", "", regex=False, inplace=True) + df["latitude"].replace(")", "", regex=False, inplace=True) + df["longitude"].replace("(", "", regex=False, inplace=True) + df["longitude"].replace(")", "", regex=False, inplace=True) + logging.info(f"Transform: Convert Date Format.. {source_file}") df["created_date"] = df["created_date"].apply(convert_dt_format) + df["closed_date"] = df["closed_date"].apply(convert_dt_format) + df["resolution_action_updated_date"] = df["resolution_action_updated_date"].apply(convert_dt_format) + + logging.info(f"Transform: Remove newlines.. {source_file}") + df["incident_address"].replace({ r'\A\s+|\s+\Z': '', '\n' : ' '}, regex=True, inplace=True) + df["status_notes"].replace({ r'\A\s+|\s+\Z': '', '\n' : ' '}, regex=True, inplace=True) + df["descriptor"].replace({ r'\A\s+|\s+\Z': '', '\n' : ' '}, regex=True, inplace=True) logging.info("Transform: Reordering headers..") df = df[ [ - "field1", - "field2", + "unique_key", + "created_date", + "closed_date", + "resolution_action_updated_date", + "status", + "status_notes", + "agency_name", + "category", + "complaint_type", + "descriptor", + "incident_address", + "supervisor_district", + "neighborhood", + "location", + "source", + "media_url", + "latitude", + "longitude", + "police_district", + ] ] @@ -72,7 +99,7 @@ def main( logging.info(f"Saving to output file.. {target_file}") try: - save_to_new_file(df, file_path=str(target_file), index=False) + save_to_new_file(df, file_path=str(target_file)) except Exception as e: logging.error(f"Error saving output file: {e}.") @@ -85,8 +112,8 @@ def main( def convert_dt_format(dt_str: str) -> str: - if dt_str is None or len(str(dt_str)) == 0 or str(dt_str) == "nan": - return str(dt_str) + if dt_str is None or len(str(dt_str)) == 0 or str(dt_str).lower() == "nan" or str(dt_str) == "": + return str("") else: return str(datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime("%Y-%m-%d %H:%M:%S")) @@ -96,7 +123,7 @@ def rename_headers(df: pd.DataFrame) -> None: "CaseID": "unique_key", "Opened": "created_date", "Closed": "closed_date", - "Updated": "solution_action_updated_date", + "Updated": "resolution_action_updated_date", "Status": "status", "Status Notes": "status_notes", "Responsible Agency": "agency_name", @@ -118,7 +145,7 @@ def rename_headers(df: pd.DataFrame) -> None: def save_to_new_file(df: pd.DataFrame, file_path) -> None: - df.to_csv(file_path) + df.to_csv(file_path, index=False) def download_file(source_url: str, source_file: pathlib.Path) -> None: From 690dfd1c7a37b48d34e4983beb98405fc8caca29 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Tue, 14 Sep 2021 17:41:54 +0000 Subject: [PATCH 03/12] fix: resolved flake8 issues --- .../311_service_requests_dag.py | 199 ++++++++++++++++++ .../csv_transform.py | 40 ++-- 2 files changed, 227 insertions(+), 12 deletions(-) create mode 100644 datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py new file mode 100644 index 000000000..a51a12681 --- /dev/null +++ b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py @@ -0,0 +1,199 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="san_francisco_311_service_requests.311_service_requests", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="311_service_requests", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.san_francisco_311_service_requests.container_registry.run_csv_transform_kub_311_service_requests }}", + env_vars={ + "SOURCE_URL": "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.values.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco_311_service_requests/311_service_requests/data_output.csv", + }, + resources={"limit_memory": "4G", "limit_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.values.composer_bucket }}", + source_objects=[ + "data/san_francisco_311_service_requests/311_service_requests/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="san_francisco_311_service_requests.311_service_requests", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "unique_key", + "type": "INTEGER", + "description": "Unique case id", + "mode": "REQUIRED", + }, + { + "name": "created_date", + "type": "TIMESTAMP", + "description": "The date and time when the service request was made", + "mode": "NULLABLE", + }, + { + "name": "closed_date", + "type": "TIMESTAMP", + "description": "The date and time when the service request was closed", + "mode": "NULLABLE", + }, + { + "name": "resolution_action_updated_date", + "type": "TIMESTAMP", + "description": "The date and time when the service request was last modified. For requests with status=closed, this will be the date the request was closed", + "mode": "NULLABLE", + }, + { + "name": "status", + "type": "STRING", + "description": "The current status of the service request.", + "mode": "NULLABLE", + }, + { + "name": "status_notes", + "type": "STRING", + "description": "Explanation of why status was changed to current state or more details on current status than conveyed with status alone", + "mode": "NULLABLE", + }, + { + "name": "agency_name", + "type": "STRING", + "description": "The agency responsible for fulfilling or otherwise addressing the service request.", + "mode": "NULLABLE", + }, + { + "name": "category", + "type": "STRING", + "description": "The Human readable name of the specific service request type (service_name)", + "mode": "NULLABLE", + }, + { + "name": "complaint_type", + "type": "STRING", + "description": "More specific description of the problem related to the Category", + "mode": "NULLABLE", + }, + { + "name": "descriptor", + "type": "STRING", + "description": "More specific description of the problem related to the Request Type", + "mode": "NULLABLE", + }, + { + "name": "incident_address", + "type": "STRING", + "description": "Human readable address or description of location", + "mode": "NULLABLE", + }, + { + "name": "supervisor_district", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "neighborhood", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "location", + "type": "STRING", + "description": "Latitude and longitude using the (WGS84) projection.", + "mode": "NULLABLE", + }, + { + "name": "source", + "type": "STRING", + "description": "How the service request was made", + "mode": "NULLABLE", + }, + { + "name": "media_url", + "type": "STRING", + "description": "Website URL", + "mode": "NULLABLE", + }, + { + "name": "latitude", + "type": "FLOAT", + "description": "Latitude using the (WGS84) projection.", + "mode": "NULLABLE", + }, + { + "name": "longitude", + "type": "FLOAT", + "description": "Longitude using the (WGS84) projection.", + "mode": "NULLABLE", + }, + { + "name": "police_district", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py index 06197f297..022ec29bb 100644 --- a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py @@ -30,7 +30,7 @@ def main( target_gcs_path: str, ) -> None: - logging.info(f"San Francisco - 311 Service Requests process started") + logging.info("San Francisco - 311 Service Requests process started") logging.info("creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) @@ -47,12 +47,14 @@ def main( rename_headers(df) logging.info(f"Transform: Remove rows with empty keys.. {source_file}") - df = df[ df["unique_key"] != ""] + df = df[df["unique_key"] != ""] logging.info(f"Transform: Strip whitespace from incident address.. {source_file}") - df['incident_address'] = df['incident_address'].apply(lambda x: str(x).strip()) + df["incident_address"] = df["incident_address"].apply(lambda x: str(x).strip()) - logging.info(f"Transform: Remove parenthesis from latitude and longitude.. {source_file}") + logging.info( + f"Transform: Remove parenthesis from latitude and longitude.. {source_file}" + ) df["latitude"].replace("(", "", regex=False, inplace=True) df["latitude"].replace(")", "", regex=False, inplace=True) df["longitude"].replace("(", "", regex=False, inplace=True) @@ -61,12 +63,18 @@ def main( logging.info(f"Transform: Convert Date Format.. {source_file}") df["created_date"] = df["created_date"].apply(convert_dt_format) df["closed_date"] = df["closed_date"].apply(convert_dt_format) - df["resolution_action_updated_date"] = df["resolution_action_updated_date"].apply(convert_dt_format) + df["resolution_action_updated_date"] = df["resolution_action_updated_date"].apply( + convert_dt_format + ) logging.info(f"Transform: Remove newlines.. {source_file}") - df["incident_address"].replace({ r'\A\s+|\s+\Z': '', '\n' : ' '}, regex=True, inplace=True) - df["status_notes"].replace({ r'\A\s+|\s+\Z': '', '\n' : ' '}, regex=True, inplace=True) - df["descriptor"].replace({ r'\A\s+|\s+\Z': '', '\n' : ' '}, regex=True, inplace=True) + df["incident_address"].replace( + {r"\A\s+|\s+\Z": "", "\n": " "}, regex=True, inplace=True + ) + df["status_notes"].replace( + {r"\A\s+|\s+\Z": "", "\n": " "}, regex=True, inplace=True + ) + df["descriptor"].replace({r"\A\s+|\s+\Z": "", "\n": " "}, regex=True, inplace=True) logging.info("Transform: Reordering headers..") df = df[ @@ -90,7 +98,6 @@ def main( "latitude", "longitude", "police_district", - ] ] @@ -108,14 +115,23 @@ def main( ) upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - logging.info(f"San Francisco - 311 Service Requests process completed") + logging.info("San Francisco - 311 Service Requests process completed") def convert_dt_format(dt_str: str) -> str: - if dt_str is None or len(str(dt_str)) == 0 or str(dt_str).lower() == "nan" or str(dt_str) == "": + if ( + dt_str is None + or len(str(dt_str)) == 0 + or str(dt_str).lower() == "nan" + or str(dt_str) == "" + ): return str("") else: - return str(datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime("%Y-%m-%d %H:%M:%S")) + return str( + datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime( + "%Y-%m-%d %H:%M:%S" + ) + ) def rename_headers(df: pd.DataFrame) -> None: From 43a4c9d2da55d42a01e40ff8f2206ed60e9e5112 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Tue, 14 Sep 2021 17:55:27 +0000 Subject: [PATCH 04/12] fix: yamllint errors fix to pipeline.yaml --- .../311_service_requests/pipeline.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml index b46885477..923c2129d 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml +++ b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml @@ -77,7 +77,7 @@ dag: schema_fields: - name: "unique_key" type: "INTEGER" - description: "Unique case id" + description: "Unique case id" mode: "REQUIRED" - name: "created_date" type: "TIMESTAMP" From c1e06dce29318aee97052ac94a4dfc95314d792b Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Tue, 14 Sep 2021 18:26:31 +0000 Subject: [PATCH 05/12] fix: attempted fix at resolving jinja error in airflow 2 when executing DAG --- .../311_service_requests/311_service_requests_dag.py | 4 ++-- .../311_service_requests/pipeline.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py index a51a12681..14bd8efd8 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py +++ b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py @@ -61,7 +61,7 @@ "SOURCE_URL": "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", - "TARGET_GCS_BUCKET": "{{ var.values.composer_bucket }}", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/san_francisco_311_service_requests/311_service_requests/data_output.csv", }, resources={"limit_memory": "4G", "limit_cpu": "1"}, @@ -70,7 +70,7 @@ # Task to load CSV data to a BigQuery table load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", - bucket="{{ var.values.composer_bucket }}", + bucket="{{ var.value.composer_bucket }}", source_objects=[ "data/san_francisco_311_service_requests/311_service_requests/data_output.csv" ], diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml index 923c2129d..1ed47fb28 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml +++ b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml @@ -57,7 +57,7 @@ dag: SOURCE_URL: "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" - TARGET_GCS_BUCKET: "{{ var.values.composer_bucket }}" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco_311_service_requests/311_service_requests/data_output.csv" resources: limit_memory: "4G" @@ -68,7 +68,7 @@ dag: args: task_id: "load_to_bq" - bucket: "{{ var.values.composer_bucket }}" + bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/san_francisco_311_service_requests/311_service_requests/data_output.csv"] source_format: "CSV" destination_project_dataset_table: "san_francisco_311_service_requests.311_service_requests" From a4548806566b88beb4a1697ec5fae49502a9e909 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Tue, 14 Sep 2021 19:31:30 +0000 Subject: [PATCH 06/12] fix: fixed jinja error in airflow 2 when executing DAG --- .../311_service_requests/311_service_requests_dag.py | 2 +- .../311_service_requests/pipeline.yaml | 2 +- .../Dockerfile | 0 .../csv_transform.py | 0 .../requirements.txt | 0 5 files changed, 2 insertions(+), 2 deletions(-) rename datasets/san_francisco_311_service_requests/_images/{run_csv_transform_kub_311_service_requests => run_csv_transform_kub}/Dockerfile (100%) rename datasets/san_francisco_311_service_requests/_images/{run_csv_transform_kub_311_service_requests => run_csv_transform_kub}/csv_transform.py (100%) rename datasets/san_francisco_311_service_requests/_images/{run_csv_transform_kub_311_service_requests => run_csv_transform_kub}/requirements.txt (100%) diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py index 14bd8efd8..c721de056 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py +++ b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py @@ -56,7 +56,7 @@ } }, image_pull_policy="Always", - image="{{ var.json.san_francisco_311_service_requests.container_registry.run_csv_transform_kub_311_service_requests }}", + image="{{ var.json.san_francisco_311_service_requests.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv", "SOURCE_FILE": "files/data.csv", diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml index 1ed47fb28..f3ab416b8 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml +++ b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml @@ -52,7 +52,7 @@ dag: values: - "pool-e2-standard-4" image_pull_policy: "Always" - image: "{{ var.json.san_francisco_311_service_requests.container_registry.run_csv_transform_kub_311_service_requests }}" + image: "{{ var.json.san_francisco_311_service_requests.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv" SOURCE_FILE: "files/data.csv" diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/Dockerfile b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/Dockerfile similarity index 100% rename from datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/Dockerfile rename to datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/Dockerfile diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py similarity index 100% rename from datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/csv_transform.py rename to datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/requirements.txt b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/requirements.txt similarity index 100% rename from datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub_311_service_requests/requirements.txt rename to datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/requirements.txt From 6336126b6e11833ac9124dff01bf3968c7c84ce7 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Wed, 15 Sep 2021 18:50:32 +0000 Subject: [PATCH 07/12] fix: works in airflow2 --- .../311_service_requests_dag.py | 3 +- .../311_service_requests/pipeline.yaml | 5 +- .../run_csv_transform_kub/csv_transform.py | 96 +++++++++++-------- 3 files changed, 60 insertions(+), 44 deletions(-) diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py index c721de056..4b302f6ba 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py +++ b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py @@ -61,10 +61,11 @@ "SOURCE_URL": "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "500000", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/san_francisco_311_service_requests/311_service_requests/data_output.csv", }, - resources={"limit_memory": "4G", "limit_cpu": "1"}, + resources={"limit_memory": "8G", "limit_cpu": "2"}, ) # Task to load CSV data to a BigQuery table diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml index f3ab416b8..54b195c1f 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml +++ b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml @@ -57,11 +57,12 @@ dag: SOURCE_URL: "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "500000" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco_311_service_requests/311_service_requests/data_output.csv" resources: - limit_memory: "4G" - limit_cpu: "1" + limit_memory: "8G" + limit_cpu: "2" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py index 022ec29bb..f242fe7aa 100644 --- a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py @@ -16,6 +16,7 @@ import logging import os import pathlib +import subprocess import pandas as pd import requests @@ -26,6 +27,7 @@ def main( source_url: str, source_file: pathlib.Path, target_file: pathlib.Path, + chunksize: str, target_gcs_bucket: str, target_gcs_path: str, ) -> None: @@ -38,45 +40,73 @@ def main( logging.info(f"downloading file {source_url}") download_file(source_url, source_file) - logging.info(f"Opening file {source_file}") - df = pd.read_csv(source_file) + chunksz = int(chunksize) - logging.info(f"Transformation Process Starting.. {source_file}") + logging.info(f"Opening batch file {source_file}") + with pd.read_csv( + source_file, engine="python", encoding="utf-8", quotechar='"', chunksize=chunksz + ) as reader: + for chunk_number, chunk in enumerate(reader): + logging.info(f"Processing batch {chunk_number}") + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + processChunk(df, target_file_batch) + logging.info(f"Appending batch {chunk_number} to {target_file}") + if chunk_number == 0: + subprocess.run(["cp", target_file_batch, target_file]) + else: + subprocess.check_call(f"sed -i '1d' {target_file_batch}", shell=True) + subprocess.check_call( + f"cat {target_file_batch} >> {target_file}", shell=True + ) + subprocess.run(["rm", target_file_batch]) + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - logging.info(f"Transform: Renaming Headers.. {source_file}") + logging.info("San Francisco - 311 Service Requests process completed") + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: + + logging.info("Renaming Headers") rename_headers(df) - logging.info(f"Transform: Remove rows with empty keys.. {source_file}") + logging.info("Remove rows with empty keys") df = df[df["unique_key"] != ""] - logging.info(f"Transform: Strip whitespace from incident address.. {source_file}") + logging.info("Strip whitespace from incident address") df["incident_address"] = df["incident_address"].apply(lambda x: str(x).strip()) - logging.info( - f"Transform: Remove parenthesis from latitude and longitude.. {source_file}" - ) + logging.info("Remove parenthesis from latitude and longitude") df["latitude"].replace("(", "", regex=False, inplace=True) df["latitude"].replace(")", "", regex=False, inplace=True) df["longitude"].replace("(", "", regex=False, inplace=True) df["longitude"].replace(")", "", regex=False, inplace=True) - logging.info(f"Transform: Convert Date Format.. {source_file}") + logging.info("Convert Date Format") df["created_date"] = df["created_date"].apply(convert_dt_format) df["closed_date"] = df["closed_date"].apply(convert_dt_format) df["resolution_action_updated_date"] = df["resolution_action_updated_date"].apply( convert_dt_format ) - logging.info(f"Transform: Remove newlines.. {source_file}") - df["incident_address"].replace( - {r"\A\s+|\s+\Z": "", "\n": " "}, regex=True, inplace=True - ) - df["status_notes"].replace( - {r"\A\s+|\s+\Z": "", "\n": " "}, regex=True, inplace=True - ) - df["descriptor"].replace({r"\A\s+|\s+\Z": "", "\n": " "}, regex=True, inplace=True) - - logging.info("Transform: Reordering headers..") + logging.info(" Transform: Reordering headers..") df = df[ [ "unique_key", @@ -101,21 +131,14 @@ def main( ] ] - logging.info(f"Transformation Process complete .. {source_file}") - - logging.info(f"Saving to output file.. {target_file}") + logging.info(f" Saving to target file.. {target_file_batch}") try: - save_to_new_file(df, file_path=str(target_file)) + save_to_new_file(df, file_path=str(target_file_batch)) except Exception as e: - logging.error(f"Error saving output file: {e}.") + logging.error(f"Error saving to target file: {e}.") - logging.info( - f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" - ) - upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - - logging.info("San Francisco - 311 Service Requests process completed") + logging.info(f"Saved transformed source data to target file .. {target_file_batch}") def convert_dt_format(dt_str: str) -> str: @@ -164,16 +187,6 @@ def save_to_new_file(df: pd.DataFrame, file_path) -> None: df.to_csv(file_path, index=False) -def download_file(source_url: str, source_file: pathlib.Path) -> None: - r = requests.get(source_url, stream=True) - if r.status_code == 200: - with open(source_file, "wb") as f: - for chunk in r: - f.write(chunk) - else: - logging.error(f"Couldn't download {source_url}: {r.text}") - - def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: storage_client = storage.Client() bucket = storage_client.bucket(gcs_bucket) @@ -188,6 +201,7 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) source_url=os.environ["SOURCE_URL"], source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], target_gcs_path=os.environ["TARGET_GCS_PATH"], ) From 65f59cc91adc2e3ef752aa7c289b982b98cab111 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Wed, 22 Sep 2021 21:25:37 +0000 Subject: [PATCH 08/12] fix: added allow_quoted_newlines=True to pipeline.yaml to attempt to resolve broken quotes errors when inserting into bigquery --- .../311_service_requests/311_service_requests_dag.py | 1 + .../311_service_requests/pipeline.yaml | 1 + 2 files changed, 2 insertions(+) diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py index 4b302f6ba..5a1899d59 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py +++ b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py @@ -78,6 +78,7 @@ source_format="CSV", destination_project_dataset_table="san_francisco_311_service_requests.311_service_requests", skip_leading_rows=1, + allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", schema_fields=[ { diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml index 54b195c1f..dd6ce958a 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml +++ b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml @@ -74,6 +74,7 @@ dag: source_format: "CSV" destination_project_dataset_table: "san_francisco_311_service_requests.311_service_requests" skip_leading_rows: 1 + allow_quoted_newlines: True write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "unique_key" From 7389d4996be6a5e722b88c02a833fb96dd598680 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 23 Sep 2021 04:26:59 +0000 Subject: [PATCH 09/12] fix: resolved casting issue. Now runs successfully in Airflow --- .../_images/run_csv_transform_kub/csv_transform.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py index f242fe7aa..dc0ab882d 100644 --- a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py @@ -90,6 +90,8 @@ def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: logging.info("Remove rows with empty keys") df = df[df["unique_key"] != ""] + df['supervisor_district'] = df['supervisor_district'].astype('Int64') + logging.info("Strip whitespace from incident address") df["incident_address"] = df["incident_address"].apply(lambda x: str(x).strip()) From d81ad1d15375ab52e1aa4e3eeb9750f4359513cc Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 23 Sep 2021 04:29:49 +0000 Subject: [PATCH 10/12] fix: resolved flake8 issue. Now runs successfully in Airflow --- .../_images/run_csv_transform_kub/csv_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py index dc0ab882d..e6cf1d482 100644 --- a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py @@ -90,7 +90,7 @@ def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: logging.info("Remove rows with empty keys") df = df[df["unique_key"] != ""] - df['supervisor_district'] = df['supervisor_district'].astype('Int64') + df["supervisor_district"] = df["supervisor_district"].astype("Int64") logging.info("Strip whitespace from incident address") df["incident_address"] = df["incident_address"].apply(lambda x: str(x).strip()) From 9fa6367fb058c66c19a664590b8c8a12d6c7098f Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 30 Sep 2021 20:31:22 +0000 Subject: [PATCH 11/12] fix: resolve code review points as per PR. Plus cleanup --- .../311_service_requests_dag.py | 4 +- .../311_service_requests/pipeline.yaml | 4 +- .../_images/run_csv_transform_kub/Dockerfile | 17 -- .../run_csv_transform_kub/csv_transform.py | 219 +++++++++++------- 4 files changed, 133 insertions(+), 111 deletions(-) diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py index 5a1899d59..d5014660f 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py +++ b/datasets/san_francisco_311_service_requests/311_service_requests/311_service_requests_dag.py @@ -61,11 +61,11 @@ "SOURCE_URL": "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", - "CHUNKSIZE": "500000", + "CHUNKSIZE": "750000", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/san_francisco_311_service_requests/311_service_requests/data_output.csv", }, - resources={"limit_memory": "8G", "limit_cpu": "2"}, + resources={"limit_memory": "8G", "limit_cpu": "3"}, ) # Task to load CSV data to a BigQuery table diff --git a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml index dd6ce958a..dde4fcb25 100644 --- a/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml +++ b/datasets/san_francisco_311_service_requests/311_service_requests/pipeline.yaml @@ -57,12 +57,12 @@ dag: SOURCE_URL: "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" - CHUNKSIZE: "500000" + CHUNKSIZE: "750000" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco_311_service_requests/311_service_requests/data_output.csv" resources: limit_memory: "8G" - limit_cpu: "2" + limit_cpu: "3" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/Dockerfile b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/Dockerfile index 85af90570..748bc3bec 100644 --- a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/Dockerfile +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/Dockerfile @@ -12,27 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -# The base image for this build -# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim FROM python:3.8 - -# Allow statements and log messages to appear in Cloud logs ENV PYTHONUNBUFFERED True - -# Copy the requirements file into the image COPY requirements.txt ./ - -# Install the packages specified in the requirements file RUN python3 -m pip install --no-cache-dir -r requirements.txt - -# The WORKDIR instruction sets the working directory for any RUN, CMD, -# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. -# If the WORKDIR doesn’t exist, it will be created even if it’s not used in -# any subsequent Dockerfile instruction WORKDIR /custom - -# Copy the specific data processing script/s in the image under /custom/* COPY ./csv_transform.py . - -# Command to run the data processing script when the container is run CMD ["python3", "csv_transform.py"] diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py index e6cf1d482..7a6732cb7 100644 --- a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py @@ -16,7 +16,6 @@ import logging import os import pathlib -import subprocess import pandas as pd import requests @@ -34,81 +33,148 @@ def main( logging.info("San Francisco - 311 Service Requests process started") - logging.info("creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - - logging.info(f"downloading file {source_url}") download_file(source_url, source_file) chunksz = int(chunksize) - logging.info(f"Opening batch file {source_file}") + logging.info(f"Opening source file {source_file}") with pd.read_csv( - source_file, engine="python", encoding="utf-8", quotechar='"', chunksize=chunksz + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + sep=",", + chunksize=chunksz, ) as reader: for chunk_number, chunk in enumerate(reader): - logging.info(f"Processing batch {chunk_number}") target_file_batch = str(target_file).replace( ".csv", "-" + str(chunk_number) + ".csv" ) df = pd.DataFrame() df = pd.concat([df, chunk]) - processChunk(df, target_file_batch) - logging.info(f"Appending batch {chunk_number} to {target_file}") - if chunk_number == 0: - subprocess.run(["cp", target_file_batch, target_file]) - else: - subprocess.check_call(f"sed -i '1d' {target_file_batch}", shell=True) - subprocess.check_call( - f"cat {target_file_batch} >> {target_file}", shell=True - ) - subprocess.run(["rm", target_file_batch]) - - logging.info( - f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" - ) + process_chunk(df, target_file_batch, target_file, (not chunk_number == 0)) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) logging.info("San Francisco - 311 Service Requests process completed") def download_file(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"downloading file {source_file} from {source_url}") r = requests.get(source_url, stream=True) - if r.status_code == 200: - with open(source_file, "wb") as f: - for chunk in r: - f.write(chunk) - else: - logging.error(f"Couldn't download {source_url}: {r.text}") + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + + +def process_chunk( + df: pd.DataFrame, target_file_batch: str, target_file: str, skip_header: bool +) -> None: + logging.info(f"Processing batch file {target_file_batch}") + df = rename_headers(df) + df = remove_empty_key_rows(df, 'unique_key') + df = resolve_datatypes(df) + df = remove_parenthesis_long_lat(df) + df = strip_whitespace(df) + df = resolve_date_format(df) + df = reorder_headers(df) + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header) + logging.info(f"Processing batch file {target_file_batch} completed") + + +def rename_headers(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Renaming headers") + header_names = { + "CaseID": "unique_key", + "Opened": "created_date", + "Closed": "closed_date", + "Updated": "resolution_action_updated_date", + "Status": "status", + "Status Notes": "status_notes", + "Responsible Agency": "agency_name", + "Category": "category", + "Request Type": "complaint_type", + "Request Details": "descriptor", + "Address": "incident_address", + "Supervisor District": "supervisor_district", + "Neighborhood": "neighborhood", + "Point": "location", + "Source": "source", + "Media URL": "media_url", + "Latitude": "latitude", + "Longitude": "longitude", + "Police District": "police_district", + } + df = df.rename(columns=header_names) + return df -def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: - logging.info("Renaming Headers") - rename_headers(df) +def remove_empty_key_rows(df: pd.DataFrame, key_field: str) -> pd.DataFrame: + logging.info("Removing rows with empty keys") + df = df[df[key_field] != ''] - logging.info("Remove rows with empty keys") - df = df[df["unique_key"] != ""] + return df + +def resolve_datatypes(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Resolving datatypes") df["supervisor_district"] = df["supervisor_district"].astype("Int64") - logging.info("Strip whitespace from incident address") - df["incident_address"] = df["incident_address"].apply(lambda x: str(x).strip()) + return df + - logging.info("Remove parenthesis from latitude and longitude") +def remove_parenthesis_long_lat(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Removing parenthesis from latitude and longitude") df["latitude"].replace("(", "", regex=False, inplace=True) df["latitude"].replace(")", "", regex=False, inplace=True) df["longitude"].replace("(", "", regex=False, inplace=True) df["longitude"].replace(")", "", regex=False, inplace=True) - logging.info("Convert Date Format") - df["created_date"] = df["created_date"].apply(convert_dt_format) - df["closed_date"] = df["closed_date"].apply(convert_dt_format) - df["resolution_action_updated_date"] = df["resolution_action_updated_date"].apply( - convert_dt_format - ) + return df + + +def strip_whitespace(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Stripping whitespace") + ws_fields = ["incident_address"] + + for ws_fld in ws_fields: + df[ws_fld] = df[ws_fld].apply(lambda x: str(x).strip()) + + return df + + +def resolve_date_format(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Resolving date formats") + date_fields = [ + "created_date", + "closed_date", + "resolution_action_updated_date", + ] + + for dt_fld in date_fields: + df[dt_fld] = df[dt_fld].apply(convert_dt_format) + + return df - logging.info(" Transform: Reordering headers..") + +def convert_dt_format(dt_str: str) -> str: + if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": + return "" + elif ( + dt_str.strip()[2] == "/" + ): # if there is a '/' in 3rd position, then we have a date format mm/dd/yyyy + return datetime.datetime.strptime(dt_str, "%m/%d/%Y %H:%M:%S %p").strftime( + "%Y-%m-%d %H:%M:%S" + ) + else: + return str(dt_str) + + +def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Reordering headers") df = df[ [ "unique_key", @@ -133,60 +199,33 @@ def processChunk(df: pd.DataFrame, target_file_batch: str) -> None: ] ] - logging.info(f" Saving to target file.. {target_file_batch}") + return df - try: - save_to_new_file(df, file_path=str(target_file_batch)) - except Exception as e: - logging.error(f"Error saving to target file: {e}.") - logging.info(f"Saved transformed source data to target file .. {target_file_batch}") +def save_to_new_file(df: pd.DataFrame, file_path) -> None: + df.to_csv(file_path, index=False) -def convert_dt_format(dt_str: str) -> str: - if ( - dt_str is None - or len(str(dt_str)) == 0 - or str(dt_str).lower() == "nan" - or str(dt_str) == "" - ): - return str("") +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool +) -> None: + data_file = open(batch_file_path, "r") + if os.path.exists(target_file_path): + target_file = open(target_file_path, "a+") else: - return str( - datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime( - "%Y-%m-%d %H:%M:%S" - ) + target_file = open(target_file_path, "w") + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" ) - - -def rename_headers(df: pd.DataFrame) -> None: - header_names = { - "CaseID": "unique_key", - "Opened": "created_date", - "Closed": "closed_date", - "Updated": "resolution_action_updated_date", - "Status": "status", - "Status Notes": "status_notes", - "Responsible Agency": "agency_name", - "Category": "category", - "Request Type": "complaint_type", - "Request Details": "descriptor", - "Address": "incident_address", - "Supervisor District": "supervisor_district", - "Neighborhood": "neighborhood", - "Point": "location", - "Source": "source", - "Media URL": "media_url", - "Latitude": "latitude", - "Longitude": "longitude", - "Police District": "police_district", - } - - df = df.rename(columns=header_names, inplace=True) - - -def save_to_new_file(df: pd.DataFrame, file_path) -> None: - df.to_csv(file_path, index=False) + next(data_file) + else: + logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: From 8b0f4bac93648026d178e67a7ff308d74abc8998 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 30 Sep 2021 20:33:19 +0000 Subject: [PATCH 12/12] fix: resolve flake8 issues --- .../_images/run_csv_transform_kub/csv_transform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py index 7a6732cb7..063c9da7d 100644 --- a/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_311_service_requests/_images/run_csv_transform_kub/csv_transform.py @@ -73,7 +73,7 @@ def process_chunk( ) -> None: logging.info(f"Processing batch file {target_file_batch}") df = rename_headers(df) - df = remove_empty_key_rows(df, 'unique_key') + df = remove_empty_key_rows(df, "unique_key") df = resolve_datatypes(df) df = remove_parenthesis_long_lat(df) df = strip_whitespace(df) @@ -114,7 +114,7 @@ def rename_headers(df: pd.DataFrame) -> pd.DataFrame: def remove_empty_key_rows(df: pd.DataFrame, key_field: str) -> pd.DataFrame: logging.info("Removing rows with empty keys") - df = df[df[key_field] != ''] + df = df[df[key_field] != ""] return df