From b4fbaad99e94a052d039c99bc8ca9ad842461e4a Mon Sep 17 00:00:00 2001 From: Dipannita Banerjee <84458018+dipannitab2392@users.noreply.github.com> Date: Wed, 6 Oct 2021 20:13:15 +0530 Subject: [PATCH] feat: Onboard Austin Crime dataset (#174) --- .../_images/run_csv_transform_kub/Dockerfile | 43 +++ .../run_csv_transform_kub/csv_transform.py | 260 ++++++++++++++++++ .../run_csv_transform_kub/requirements.txt | 2 + .../_terraform/austin_crime_dataset.tf | 26 ++ .../austin_crime/_terraform/crime_pipeline.tf | 39 +++ datasets/austin_crime/_terraform/provider.tf | 28 ++ datasets/austin_crime/_terraform/variables.tf | 23 ++ datasets/austin_crime/crime/crime_dag.py | 194 +++++++++++++ datasets/austin_crime/crime/pipeline.yaml | 158 +++++++++++ datasets/austin_crime/dataset.yaml | 27 ++ 10 files changed, 800 insertions(+) create mode 100644 datasets/austin_crime/_images/run_csv_transform_kub/Dockerfile create mode 100644 datasets/austin_crime/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/austin_crime/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/austin_crime/_terraform/austin_crime_dataset.tf create mode 100644 datasets/austin_crime/_terraform/crime_pipeline.tf create mode 100644 datasets/austin_crime/_terraform/provider.tf create mode 100644 datasets/austin_crime/_terraform/variables.tf create mode 100644 datasets/austin_crime/crime/crime_dag.py create mode 100644 datasets/austin_crime/crime/pipeline.yaml create mode 100644 datasets/austin_crime/dataset.yaml diff --git a/datasets/austin_crime/_images/run_csv_transform_kub/Dockerfile b/datasets/austin_crime/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..a72f392ab --- /dev/null +++ b/datasets/austin_crime/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,43 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +RUN apt-get -y update && apt-get install -y apt-transport-https ca-certificates gnupg &&\ + echo "deb https://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list &&\ + curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - &&\ + apt-get -y update && apt-get install -y google-cloud-sdk + + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] \ No newline at end of file diff --git a/datasets/austin_crime/_images/run_csv_transform_kub/csv_transform.py b/datasets/austin_crime/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..934d4fbd5 --- /dev/null +++ b/datasets/austin_crime/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,260 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime +import glob +import json +import logging +import math +import os +import pathlib +import re +import subprocess +import typing + +import pandas as pd +from google.cloud import storage + + +def main( + source_url: typing.List[str], + source_file: typing.List[pathlib.Path], + source_files_path: str, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, + headers: typing.List[str], + rename_mappings: dict, +) -> None: + + logging.info( + "Austin crime process started at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + logging.info("Creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info("Downloading file ...") + download_file(source_url, source_file) + + logging.info("Opening files...") + df = read_files(source_files_path) + + logging.info("Transform: Rename columns...") + rename_headers(df, rename_mappings) + + logging.info("Transform: Cleaning up column location_description...") + + # Removing two consecutive white soaces from location_description column + df["location_description"] = ( + df["location_description"] + .astype("|S") + .str.decode("utf-8") + .apply(reg_exp_tranformation, args=(r"\s{2,}", "")) + ) + + logging.info("Transform: Converting to integer string...") + df["zipcode"] = df["zipcode"].apply(convert_to_integer_string) + df["council_district_code"] = df["council_district_code"].apply( + convert_to_integer_string + ) + df["x_coordinate"] = df["x_coordinate"].apply(convert_to_integer_string) + df["y_coordinate"] = df["y_coordinate"].apply(convert_to_integer_string) + + logging.info("Transform: Creating a new column - address...") + df["address"] = df["temp_address"] + df["address"] = ( + df["address"] + .fillna( + df["location_description"].replace("nan", "") + + " Austin, TX " + + df["zipcode"] + ) + .str.strip() + ) + + logging.info("Transform: Converting date format...") + df["timestamp"] = df["timestamp"].apply(convert_dt_format) + df["clearance_date"] = df["clearance_date"].apply(convert_dt_format) + + logging.info("Transform: Creating a new column - year...") + df["year"] = df["timestamp"].apply(extract_year) + + logging.info("Transform: Replacing values...") + df["address"] = df["address"].apply(reg_exp_tranformation, args=(r"\n", " ")) + df = df.replace( + to_replace={ + "clearance_status": { + "C": "Cleared by Arrest", + "O": "Cleared by Exception", + "N": "Not cleared", + }, + "address": {"sAustin": "Austin"}, + } + ) + + logging.info("Transform: Converting exponential values to integer...") + df["unique_key"] = ( + df["unique_key"] + .apply(convert_exp_to_float) + .astype(float) + .apply(convert_to_integer_string) + ) + + logging.info("Transform: Creating a new column - latitude...") + # If address is 'Austin, TX (30.264979, -97.746598)' below code will extract + # value 30.264979 from the address and assign it to latitude column + df["latitude"] = ( + df["address"] + .apply(search_string) + .apply(extract_lat_long, args=[r".*\((\d+\.\d+),.*"]) + ) + + logging.info("Transform: Creating a new column - longitude...") + # If address is 'Austin, TX (30.264979, -97.746598)' below code will extract + # value -97.746598 from the address and assign it to longitude column + df["longitude"] = ( + df["address"] + .apply(search_string) + .apply(extract_lat_long, args=[r".*(\-\d+\.\d+)\)"]) + ) + + logging.info("Transform: Creating a new column - location...") + df["location"] = "(" + df["latitude"] + "," + df["longitude"] + ")" + df["location"] = df["location"].replace("(,)", "") + + logging.info("Transform: Dropping column - temp_address...") + delete_column(df, "temp_address") + + logging.info("Transform: Reordering headers...") + df = df[headers] + + logging.info(f"Saving to output file.. {target_file}") + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info( + "Austin crime process completed at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + +def download_file( + source_url: typing.List[str], source_file: typing.List[pathlib.Path] +) -> None: + for url, file in zip(source_url, source_file): + logging.info(f"Downloading file from {url} ...") + subprocess.check_call(["gsutil", "cp", f"{url}", f"{file}"]) + + +def read_files(path: pathlib.Path) -> pd.DataFrame: + all_files = glob.glob(path + "/*.csv") + df_temp = [] + for filename in all_files: + frame = pd.read_csv(filename, index_col=None, header=0) + df_temp.append(frame) + df = pd.concat(df_temp, axis=0, ignore_index=True) + return df + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df.rename(columns=rename_mappings, inplace=True) + + +def reg_exp_tranformation(str_value: str, search_pattern: str, replace_val: str) -> str: + str_value = re.sub(search_pattern, replace_val, str_value) + return str_value + + +def convert_to_integer_string(input: typing.Union[str, float]) -> str: + str_val = "" + if not input or (math.isnan(input)): + str_val = "" + else: + str_val = str(int(round(input, 0))) + return str_val + + +def convert_dt_format(dt_str: str) -> str: + a = "" + if not dt_str or str(dt_str) == "nan": + return str(a) + else: + return datetime.datetime.strptime(str(dt_str), "%m/%d/%Y %H:%M:%S %p").strftime( + "%Y-%m-%d %H:%M:%S" + ) + + +def extract_year(string_val: str) -> str: + string_val = string_val[0:4] + return string_val + + +def convert_exp_to_float(exp_val: str) -> str: + float_val = "{:f}".format(float(exp_val)) + return float_val + + +def search_string(str_value: str) -> str: + if re.search(r".*\(.*\)", str_value): + return str(str_value) + else: + return str("") + + +def extract_lat_long(str_val: str, patter: str) -> str: + m = re.match(patter, str_val) + if m: + return m.group(1) + else: + return "" + + +def delete_column(df: pd.DataFrame, column_name: str) -> None: + df = df.drop(column_name, axis=1, inplace=True) + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +def save_to_new_file(df: pd.DataFrame, file_path: str) -> None: + df.to_csv(file_path, index=False) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=json.loads(os.environ["SOURCE_URL"]), + source_file=json.loads(os.environ["SOURCE_FILE"]), + source_files_path=os.environ["FILE_PATH"], + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + headers=json.loads(os.environ["CSV_HEADERS"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + ) diff --git a/datasets/austin_crime/_images/run_csv_transform_kub/requirements.txt b/datasets/austin_crime/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..a13f29317 --- /dev/null +++ b/datasets/austin_crime/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,2 @@ +pandas +google-cloud-storage diff --git a/datasets/austin_crime/_terraform/austin_crime_dataset.tf b/datasets/austin_crime/_terraform/austin_crime_dataset.tf new file mode 100644 index 000000000..024a0144d --- /dev/null +++ b/datasets/austin_crime/_terraform/austin_crime_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "austin_crime" { + dataset_id = "austin_crime" + project = var.project_id + description = "Austin Crime dataset" +} + +output "bigquery_dataset-austin_crime-dataset_id" { + value = google_bigquery_dataset.austin_crime.dataset_id +} diff --git a/datasets/austin_crime/_terraform/crime_pipeline.tf b/datasets/austin_crime/_terraform/crime_pipeline.tf new file mode 100644 index 000000000..a2d834e8c --- /dev/null +++ b/datasets/austin_crime/_terraform/crime_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "crime" { + project = var.project_id + dataset_id = "austin_crime" + table_id = "crime" + + description = "Austin Crime table" + + + + + depends_on = [ + google_bigquery_dataset.austin_crime + ] +} + +output "bigquery_table-crime-table_id" { + value = google_bigquery_table.crime.table_id +} + +output "bigquery_table-crime-id" { + value = google_bigquery_table.crime.id +} diff --git a/datasets/austin_crime/_terraform/provider.tf b/datasets/austin_crime/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/austin_crime/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/austin_crime/_terraform/variables.tf b/datasets/austin_crime/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/austin_crime/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/austin_crime/crime/crime_dag.py b/datasets/austin_crime/crime/crime_dag.py new file mode 100644 index 000000000..347f22304 --- /dev/null +++ b/datasets/austin_crime/crime/crime_dag.py @@ -0,0 +1,194 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="austin_crime.crime", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + austin_crime_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="austin_crime_transform_csv", + name="crime", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.austin_crime.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": '["gs://pdp-feeds-staging/Austin_Crime/Annual_Crime_2014.csv","gs://pdp-feeds-staging/Austin_Crime/Annual_Crime_Dataset_2015.csv","gs://pdp-feeds-staging/Austin_Crime/2016_Annual_Crime_Data.csv"]', + "SOURCE_FILE": '["files/data1.csv","files/data2.csv","files/data3.csv"]', + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/austin_crime/crime/data_output.csv", + "FILE_PATH": "files/", + "CSV_HEADERS": '["unique_key","address","census_tract","clearance_date","clearance_status","council_district_code","description","district","latitude","longitude","location","location_description","primary_type","timestamp","x_coordinate","y_coordinate","year","zipcode"]', + "RENAME_MAPPINGS": '{"GO Primary Key" : "unique_key","Council District" : "council_district_code","GO Highest Offense Desc" : "description","Highest NIBRS/UCR Offense Description" : "primary_type","GO Report Date" : "timestamp","GO Location" : "location_description","Clearance Status" : "clearance_status","Clearance Date" : "clearance_date","GO District" : "district","GO Location Zip" : "zipcode","GO Census Tract" : "census_tract","GO X Coordinate" : "x_coordinate","GO Y Coordinate" : "y_coordinate","Location_1" : "temp_address"}', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_austin_crime_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_austin_crime_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/austin_crime/crime/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="austin_crime.crime", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "unique_key", + "type": "integer", + "description": "Unique identifier for the record.", + "mode": "nullable", + }, + { + "name": "address", + "type": "string", + "description": "Full address where the incident occurred.", + "mode": "nullable", + }, + { + "name": "census_tract", + "type": "float", + "description": "", + "mode": "nullable", + }, + { + "name": "clearance_date", + "type": "timestamp", + "description": "", + "mode": "nullable", + }, + { + "name": "clearance_status", + "type": "string", + "description": "", + "mode": "nullable", + }, + { + "name": "council_district_code", + "type": "integer", + "description": "Indicates the council district code where the incident occurred.", + "mode": "nullable", + }, + { + "name": "description", + "type": "string", + "description": "The subcategory of the primary description.", + "mode": "nullable", + }, + { + "name": "district", + "type": "string", + "description": "Indicates the police district where the incident occurred.", + "mode": "nullable", + }, + { + "name": "latitude", + "type": "float", + "description": "", + "mode": "nullable", + }, + { + "name": "longitude", + "type": "float", + "description": "", + "mode": "nullable", + }, + { + "name": "location", + "type": "string", + "description": "", + "mode": "nullable", + }, + { + "name": "location_description", + "type": "string", + "description": "Description of the location where the incident occurred.", + "mode": "nullable", + }, + { + "name": "primary_type", + "type": "string", + "description": "The primary description of the NIBRS/UCR code.", + "mode": "nullable", + }, + { + "name": "timestamp", + "type": "timestamp", + "description": "Time when the incident occurred. This is sometimes a best estimate.", + "mode": "nullable", + }, + { + "name": "x_coordinate", + "type": "integer", + "description": "The x coordinate of the location where the incident occurred", + "mode": "nullable", + }, + { + "name": "y_coordinate", + "type": "integer", + "description": "The y coordinate of the location where the incident occurred", + "mode": "nullable", + }, + { + "name": "year", + "type": "integer", + "description": "Indicates the year in which the incident occurred.", + "mode": "nullable", + }, + { + "name": "zipcode", + "type": "string", + "description": "Indicates the zipcode where the incident occurred.", + "mode": "nullable", + }, + ], + ) + + austin_crime_transform_csv >> load_austin_crime_to_bq diff --git a/datasets/austin_crime/crime/pipeline.yaml b/datasets/austin_crime/crime/pipeline.yaml new file mode 100644 index 000000000..87c7e581f --- /dev/null +++ b/datasets/austin_crime/crime/pipeline.yaml @@ -0,0 +1,158 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: crime + description: "Austin Crime table" + +dag: + airflow_version: 1 + initialize: + dag_id: crime + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "austin_crime_transform_csv" + name: "crime" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.austin_crime.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: >- + ["gs://pdp-feeds-staging/Austin_Crime/Annual_Crime_2014.csv","gs://pdp-feeds-staging/Austin_Crime/Annual_Crime_Dataset_2015.csv","gs://pdp-feeds-staging/Austin_Crime/2016_Annual_Crime_Data.csv"] + SOURCE_FILE: >- + ["files/data1.csv","files/data2.csv","files/data3.csv"] + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/austin_crime/crime/data_output.csv" + FILE_PATH: "files/" + CSV_HEADERS: >- + ["unique_key","address","census_tract","clearance_date","clearance_status","council_district_code","description","district","latitude","longitude","location","location_description","primary_type","timestamp","x_coordinate","y_coordinate","year","zipcode"] + RENAME_MAPPINGS: >- + {"GO Primary Key" : "unique_key","Council District" : "council_district_code","GO Highest Offense Desc" : "description","Highest NIBRS/UCR Offense Description" : "primary_type","GO Report Date" : "timestamp","GO Location" : "location_description","Clearance Status" : "clearance_status","Clearance Date" : "clearance_date","GO District" : "district","GO Location Zip" : "zipcode","GO Census Tract" : "census_tract","GO X Coordinate" : "x_coordinate","GO Y Coordinate" : "y_coordinate","Location_1" : "temp_address"} + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_austin_crime_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/austin_crime/crime/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "austin_crime.crime" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "unique_key" + type: "integer" + description: "Unique identifier for the record." + mode: "nullable" + - name: "address" + type: "string" + description: "Full address where the incident occurred." + mode: "nullable" + - name: "census_tract" + type: "float" + description: "" + mode: "nullable" + - name: "clearance_date" + type: "timestamp" + description: "" + mode: "nullable" + - name: "clearance_status" + type: "string" + description: "" + mode: "nullable" + - name: "council_district_code" + type: "integer" + description: "Indicates the council district code where the incident occurred." + mode: "nullable" + - name: "description" + type: "string" + description: "The subcategory of the primary description." + mode: "nullable" + - name: "district" + type: "string" + description: "Indicates the police district where the incident occurred." + mode: "nullable" + - name: "latitude" + type: "float" + description: "" + mode: "nullable" + - name: "longitude" + type: "float" + description: "" + mode: "nullable" + - name: "location" + type: "string" + description: "" + mode: "nullable" + - name: "location_description" + type: "string" + description: "Description of the location where the incident occurred." + mode: "nullable" + - name: "primary_type" + type: "string" + description: "The primary description of the NIBRS/UCR code." + mode: "nullable" + - name: "timestamp" + type: "timestamp" + description: "Time when the incident occurred. This is sometimes a best estimate." + mode: "nullable" + - name: "x_coordinate" + type: "integer" + description: "The x coordinate of the location where the incident occurred" + mode: "nullable" + - name: "y_coordinate" + type: "integer" + description: "The y coordinate of the location where the incident occurred" + mode: "nullable" + - name: "year" + type: "integer" + description: "Indicates the year in which the incident occurred." + mode: "nullable" + - name: "zipcode" + type: "string" + description: "Indicates the zipcode where the incident occurred." + mode: "nullable" + + graph_paths: + - "austin_crime_transform_csv >> load_austin_crime_to_bq" diff --git a/datasets/austin_crime/dataset.yaml b/datasets/austin_crime/dataset.yaml new file mode 100644 index 000000000..cd157b9cf --- /dev/null +++ b/datasets/austin_crime/dataset.yaml @@ -0,0 +1,27 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + + name: austin_crime + friendly_name: austin_crime + description: Austin Crime dataset + dataset_sources: ~ + terms_of_use: ~ + + +resources: + - type: bigquery_dataset + dataset_id: austin_crime + description: Austin Crime dataset