From 6f95e811e9186367fb084f8526ced57efc507889 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Sat, 11 Sep 2021 03:47:32 +0000 Subject: [PATCH 1/5] feat: san-francisco bikeshare status --- .../Dockerfile | 38 ++++ .../csv_transform.py | 175 ++++++++++++++++++ .../requirements.txt | 3 + .../_terraform/bikeshare_status_pipeline.tf | 39 ++++ .../_terraform/provider.tf | 28 +++ .../san_francisco_bikeshare_status_dataset.tf | 26 +++ .../_terraform/variables.tf | 23 +++ .../bikeshare_status/pipeline.yaml | 83 +++++++++ .../dataset.yaml | 58 ++++++ 9 files changed, 473 insertions(+) create mode 100644 datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/Dockerfile create mode 100644 datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/csv_transform.py create mode 100644 datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/requirements.txt create mode 100644 datasets/san_francisco_bikeshare_status/_terraform/bikeshare_status_pipeline.tf create mode 100644 datasets/san_francisco_bikeshare_status/_terraform/provider.tf create mode 100644 datasets/san_francisco_bikeshare_status/_terraform/san_francisco_bikeshare_status_dataset.tf create mode 100644 datasets/san_francisco_bikeshare_status/_terraform/variables.tf create mode 100644 datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml create mode 100644 datasets/san_francisco_bikeshare_status/dataset.yaml diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/Dockerfile b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/Dockerfile new file mode 100644 index 000000000..85af90570 --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/Dockerfile @@ -0,0 +1,38 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/csv_transform.py b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/csv_transform.py new file mode 100644 index 000000000..1ad8ffe1a --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/csv_transform.py @@ -0,0 +1,175 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import logging +import os +import pathlib +from shutil import copyfile + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + + logging.info(f"San Francisco - Bikeshare Status process started") + + logging.info("creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + logging.info("creating 'templates' folder") + pathlib.Path("./templates").mkdir(parents=True, exist_ok=True) + + logging.info(f"Extracting URL for status: {source_url}") + source_file_status_csv = str(source_file).replace(".csv", "") + "_status.csv" + source_file_status_json = str(source_file).replace(".csv", "") + "_status.json" + + logging.info(f"Downloading states json file {source_url}") + download_file_json( + source_url, source_file_status_json, source_file_status_csv + ) + copyfile(source_file_status_json, "./templates/bikeshare_status.json") + + logging.info(f"Opening status file {source_file_status_csv}") + df = pd.read_csv(source_file_status_csv) + + logging.info(f"Transformation Process Starting.. {source_file}") + + logging.info(f"Renaming Columns {source_file_status_csv}") + rename_headers(df) + + df = df[ df["station_id"] != "" ] + df = df[ df["num_bikes_available"] != "" ] + df = df[ df["num_docks_available"] != "" ] + df = df[ df["is_installed"] != "" ] + df = df[ df["is_renting"] != "" ] + df = df[ df["is_returning"] != "" ] + df = df[ df["last_reported"] != "" ] + + logging.info("Re-ordering Headers") + df = df[ + [ + "station_id", + "num_bikes_available", + "num_bikes_disabled", + "num_docks_available", + "num_docks_disabled", + "is_installed", + "is_renting", + "is_returning", + "last_reported", + "num_ebikes_available", + "eightd_has_available_keys", + ] + ] + + logging.info(f"Transformation Process complete .. {source_file}") + + logging.info(f"Saving to output file.. {target_file}") + + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info(f"San Francisco - Bikeshare Status process completed") + + +def datetime_from_int(dt_int: int) -> str: + return datetime.datetime.fromtimestamp(dt_int).strftime("%Y-%m-%d %H:%M:%S") + + +def convert_dt_format(date_str: str, time_str: str) -> str: + return str(datetime.datetime.strptime(date_str, "%m/%d/%Y").date()) + " " + time_str + + +def rename_headers(df: pd.DataFrame) -> None: + header_names = { + "data.stations.eightd_has_available_keys": "eightd_has_available_keys", + "data.stations.is_installed": "is_installed", + "data.stations.is_renting": "is_renting", + "data.stations.is_returning": "is_returning", + "data.stations.last_reported": "last_reported", + "data.stations.num_bikes_available": "num_bikes_available", + "data.stations.num_bikes_disabled": "num_bikes_disabled", + "data.stations.num_docks_available": "num_docks_available", + "data.stations.num_docks_disabled": "num_docks_disabled", + "data.stations.num_ebikes_available": "num_ebikes_available", + "data.stations.station_id": "station_id", + } + + df.rename(columns=header_names, inplace=True) + +def save_to_new_file(df, file_path) -> None: + df.to_csv(file_path, index=False) + + +def download_file_json( + source_url: str, source_file_json: pathlib.Path, source_file_csv: pathlib.Path +) -> None: + + # this function extracts the json from a source url and creates + # a csv file from that data to be used as an input file + + # download json url into object r + try: + r = requests.get(source_url, stream=True) + if r.status_code != 200: + logging.error(f"Couldn't download {source_url}: {r.text}") + except ValueError: # includes simplejson.decoder.JSONDecodeError + print(f"Downloading JSON file {source_url} has failed {r.text}") + + # push object r (json) into json file + try: + with open(source_file_json, "wb") as f: + for chunk in r: + f.write(chunk) + except ValueError: + print(f"Writing JSON to {source_file_json} has failed") + + # read json file into object and write out to csv + df = pd.read_json(source_file_json)["data"]["stations"] + df = pd.DataFrame(df) + df.to_csv(source_file_csv, index=False) + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + ) diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/requirements.txt b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/san_francisco_bikeshare_status/_terraform/bikeshare_status_pipeline.tf b/datasets/san_francisco_bikeshare_status/_terraform/bikeshare_status_pipeline.tf new file mode 100644 index 000000000..8df43325d --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/_terraform/bikeshare_status_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "bikeshare_status" { + project = var.project_id + dataset_id = "san_francisco_bikeshare_status" + table_id = "bikeshare_status" + + description = "san_francisco_bikeshare_statusspc" + + + + + depends_on = [ + google_bigquery_dataset.san_francisco_bikeshare_status + ] +} + +output "bigquery_table-bikeshare_status-table_id" { + value = google_bigquery_table.bikeshare_status.table_id +} + +output "bigquery_table-bikeshare_status-id" { + value = google_bigquery_table.bikeshare_status.id +} diff --git a/datasets/san_francisco_bikeshare_status/_terraform/provider.tf b/datasets/san_francisco_bikeshare_status/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/san_francisco_bikeshare_status/_terraform/san_francisco_bikeshare_status_dataset.tf b/datasets/san_francisco_bikeshare_status/_terraform/san_francisco_bikeshare_status_dataset.tf new file mode 100644 index 000000000..090667341 --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/_terraform/san_francisco_bikeshare_status_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "san_francisco_bikeshare_status" { + dataset_id = "san_francisco_bikeshare_status" + project = var.project_id + description = "san_francisco_bikeshare_status" +} + +output "bigquery_dataset-san_francisco_bikeshare_status-dataset_id" { + value = google_bigquery_dataset.san_francisco_bikeshare_status.dataset_id +} diff --git a/datasets/san_francisco_bikeshare_status/_terraform/variables.tf b/datasets/san_francisco_bikeshare_status/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml b/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml new file mode 100644 index 000000000..b2d6e035c --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml @@ -0,0 +1,83 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "bikeshare_status" + description: "san_francisco_bikeshare_statusspc" + +dag: + airflow_version: 2 + initialize: + dag_id: bikeshare_status + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" # run once a week at Sunday 12am + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "bikeshare_status" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco_bikeshare_status.container_registry.run_csv_transform_kub_bikeshare_status }}" + env_vars: + SOURCE_URL: "https://gbfs.baywheels.com/gbfs/en/station_status.json" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.values.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv" + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.values.composer_bucket }}" + source_objects: ["data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "san_francisco_bikeshare_status.bikeshare_status" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + # - name: "trip_id" + # type: "INTEGER" + # mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/san_francisco_bikeshare_status/dataset.yaml b/datasets/san_francisco_bikeshare_status/dataset.yaml new file mode 100644 index 000000000..d32894d59 --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/dataset.yaml @@ -0,0 +1,58 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + # The `dataset` block includes properties for your dataset that will be shown + # to users of your data on the Google Cloud website. + + # Must be exactly the same name as the folder name your dataset.yaml is in. + name: san_francisco_bikeshare_status + + # A friendly, human-readable name of the dataset + friendly_name: ~ + + # A short, descriptive summary of the dataset. + description: ~ + + # A list of sources the dataset is derived from, using the YAML list syntax. + dataset_sources: ~ + + # A list of terms and conditions that users of the dataset should agree on, + # using the YAML list syntax. + terms_of_use: ~ + + +resources: + # A list of Google Cloud resources needed by your dataset. In principle, all + # pipelines under a dataset should be able to share these resources. + # + # The currently supported resources are shown below. Use only the resources + # you need, and delete the rest as needed by your pipeline. + # + # We will keep adding to the list below to support more Google Cloud resources + # over time. If a resource you need isn't supported, please file an issue on + # the repository. + + - type: bigquery_dataset + # Google BigQuery dataset to namespace all tables managed by this folder + # + # Required Properties: + # dataset_id + # + # Optional Properties: + # friendly_name (A user-friendly name of the dataset) + # description (A user-friendly description of the dataset) + # location (The geographic location where the dataset should reside) + dataset_id: san_francisco_bikeshare_status + description: san_francisco_bikeshare_status From ac619d7a033907d662a3db2b0ed2a4fbeff01bad Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Mon, 20 Sep 2021 01:47:53 +0000 Subject: [PATCH 2/5] fix: resolved case-sensitivity --- .../Dockerfile | 0 .../csv_transform.py | 4 +- .../requirements.txt | 0 .../bikeshare_status/bikeshare_status_dag.py | 84 +++++++++++++++++++ 4 files changed, 86 insertions(+), 2 deletions(-) rename datasets/san_francisco_bikeshare_status/_images/{run_csv_transform_kub_bikeshare_status => run_csv_transform_kub}/Dockerfile (100%) rename datasets/san_francisco_bikeshare_status/_images/{run_csv_transform_kub_bikeshare_status => run_csv_transform_kub}/csv_transform.py (98%) rename datasets/san_francisco_bikeshare_status/_images/{run_csv_transform_kub_bikeshare_status => run_csv_transform_kub}/requirements.txt (100%) create mode 100644 datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/Dockerfile b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/Dockerfile similarity index 100% rename from datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/Dockerfile rename to datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/Dockerfile diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/csv_transform.py b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py similarity index 98% rename from datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/csv_transform.py rename to datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py index 1ad8ffe1a..ba663d969 100644 --- a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/csv_transform.py +++ b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py @@ -33,9 +33,9 @@ def main( logging.info(f"San Francisco - Bikeshare Status process started") - logging.info("creating 'files' folder") + logging.info("Creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - logging.info("creating 'templates' folder") + logging.info("Creating 'templates' folder") pathlib.Path("./templates").mkdir(parents=True, exist_ok=True) logging.info(f"Extracting URL for status: {source_url}") diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/requirements.txt b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/requirements.txt similarity index 100% rename from datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub_bikeshare_status/requirements.txt rename to datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/requirements.txt diff --git a/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py b/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py new file mode 100644 index 000000000..b7c6362b4 --- /dev/null +++ b/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py @@ -0,0 +1,84 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="san_francisco_bikeshare_status.bikeshare_status", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="bikeshare_status", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.san_francisco_bikeshare_status.container_registry.run_csv_transform_kub_bikeshare_status }}", + env_vars={ + "SOURCE_URL": "https://gbfs.baywheels.com/gbfs/en/station_status.json", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.values.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv", + }, + resources={"limit_memory": "2G", "limit_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.values.composer_bucket }}", + source_objects=[ + "data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="san_francisco_bikeshare_status.bikeshare_status", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=None, + ) + + transform_csv >> load_to_bq From 97d1c9a41a68729a4b11b3d0e1f0da4bd9b6bdce Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Wed, 22 Sep 2021 18:57:08 +0000 Subject: [PATCH 3/5] feat: San Francisco Bikeshare Status - works in Airflow 2 --- .../run_csv_transform_kub/csv_transform.py | 59 +++++++------- .../bikeshare_status/bikeshare_status_dag.py | 78 +++++++++++++++++-- .../bikeshare_status/pipeline.yaml | 56 +++++++++++-- 3 files changed, 148 insertions(+), 45 deletions(-) diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py index ba663d969..20b3eebc5 100644 --- a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py @@ -13,10 +13,10 @@ # limitations under the License. import datetime +import json import logging import os import pathlib -from shutil import copyfile import pandas as pd import requests @@ -24,29 +24,24 @@ def main( - source_url: str, + source_url_json: str, source_file: pathlib.Path, target_file: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str, ) -> None: - logging.info(f"San Francisco - Bikeshare Status process started") + logging.info("San Francisco - Bikeshare Status process started") logging.info("Creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - logging.info("Creating 'templates' folder") - pathlib.Path("./templates").mkdir(parents=True, exist_ok=True) - logging.info(f"Extracting URL for status: {source_url}") + logging.info(f"Extracting URL for status: {source_url_json}") source_file_status_csv = str(source_file).replace(".csv", "") + "_status.csv" source_file_status_json = str(source_file).replace(".csv", "") + "_status.json" - logging.info(f"Downloading states json file {source_url}") - download_file_json( - source_url, source_file_status_json, source_file_status_csv - ) - copyfile(source_file_status_json, "./templates/bikeshare_status.json") + logging.info(f"Downloading states json file {source_url_json}") + download_file_json(source_url_json, source_file_status_json, source_file_status_csv) logging.info(f"Opening status file {source_file_status_csv}") df = pd.read_csv(source_file_status_csv) @@ -56,13 +51,13 @@ def main( logging.info(f"Renaming Columns {source_file_status_csv}") rename_headers(df) - df = df[ df["station_id"] != "" ] - df = df[ df["num_bikes_available"] != "" ] - df = df[ df["num_docks_available"] != "" ] - df = df[ df["is_installed"] != "" ] - df = df[ df["is_renting"] != "" ] - df = df[ df["is_returning"] != "" ] - df = df[ df["last_reported"] != "" ] + df = df[df["station_id"] != ""] + df = df[df["num_bikes_available"] != ""] + df = df[df["num_docks_available"] != ""] + df = df[df["is_installed"] != ""] + df = df[df["is_renting"] != ""] + df = df[df["is_returning"] != ""] + df = df[df["last_reported"] != ""] logging.info("Re-ordering Headers") df = df[ @@ -95,7 +90,7 @@ def main( ) upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - logging.info(f"San Francisco - Bikeshare Status process completed") + logging.info("San Francisco - Bikeshare Status process completed") def datetime_from_int(dt_int: int) -> str: @@ -123,26 +118,22 @@ def rename_headers(df: pd.DataFrame) -> None: df.rename(columns=header_names, inplace=True) + def save_to_new_file(df, file_path) -> None: df.to_csv(file_path, index=False) def download_file_json( - source_url: str, source_file_json: pathlib.Path, source_file_csv: pathlib.Path + source_url_json: str, source_file_json: str, source_file_csv: str ) -> None: - # this function extracts the json from a source url and creates + # This function extracts the json from a source url and creates # a csv file from that data to be used as an input file - # download json url into object r - try: - r = requests.get(source_url, stream=True) - if r.status_code != 200: - logging.error(f"Couldn't download {source_url}: {r.text}") - except ValueError: # includes simplejson.decoder.JSONDecodeError - print(f"Downloading JSON file {source_url} has failed {r.text}") + # Download json url into object r + r = requests.get(source_url_json + ".json", stream=True) - # push object r (json) into json file + # Push object r (json) into json file try: with open(source_file_json, "wb") as f: for chunk in r: @@ -150,9 +141,11 @@ def download_file_json( except ValueError: print(f"Writing JSON to {source_file_json} has failed") - # read json file into object and write out to csv - df = pd.read_json(source_file_json)["data"]["stations"] - df = pd.DataFrame(df) + f = open( + source_file_json.strip(), + ) + json_data = json.load(f) + df = pd.DataFrame(json_data["data"]["stations"]) df.to_csv(source_file_csv, index=False) @@ -167,7 +160,7 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) logging.getLogger().setLevel(logging.INFO) main( - source_url=os.environ["SOURCE_URL"], + source_url_json=os.environ["SOURCE_URL_JSON"], source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], diff --git a/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py b/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py index b7c6362b4..0453e6475 100644 --- a/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py +++ b/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py @@ -56,12 +56,12 @@ } }, image_pull_policy="Always", - image="{{ var.json.san_francisco_bikeshare_status.container_registry.run_csv_transform_kub_bikeshare_status }}", + image="{{ var.json.san_francisco_bikeshare_status.container_registry.run_csv_transform_kub }}", env_vars={ - "SOURCE_URL": "https://gbfs.baywheels.com/gbfs/en/station_status.json", + "SOURCE_URL_JSON": "https://gbfs.baywheels.com/gbfs/en/station_status", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", - "TARGET_GCS_BUCKET": "{{ var.values.composer_bucket }}", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv", }, resources={"limit_memory": "2G", "limit_cpu": "1"}, @@ -70,15 +70,83 @@ # Task to load CSV data to a BigQuery table load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", - bucket="{{ var.values.composer_bucket }}", + bucket="{{ var.value.composer_bucket }}", source_objects=[ "data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv" ], source_format="CSV", destination_project_dataset_table="san_francisco_bikeshare_status.bikeshare_status", skip_leading_rows=1, + allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", - schema_fields=None, + schema_fields=[ + { + "name": "station_id", + "type": "INTEGER", + "description": "Unique identifier of a station", + "mode": "REQUIRED", + }, + { + "name": "num_bikes_available", + "type": "INTEGER", + "description": "Number of bikes available for rental", + "mode": "REQUIRED", + }, + { + "name": "num_bikes_disabled", + "type": "INTEGER", + "description": "Number of disabled bikes at the station. Vendors who do not want to publicize the number of disabled bikes or docks in their system can opt to omit station capacity (in station_information), num_bikes_disabled and num_docks_disabled. If station capacity is published then broken docks/bikes can be inferred (though not specifically whether the decreased capacity is a broken bike or dock)", + "mode": "NULLABLE", + }, + { + "name": "num_docks_available", + "type": "INTEGER", + "description": "Number of docks accepting bike returns", + "mode": "REQUIRED", + }, + { + "name": "num_docks_disabled", + "type": "INTEGER", + "description": "Number of empty but disabled dock points at the station. This value remains as part of the spec as it is possibly useful during development", + "mode": "NULLABLE", + }, + { + "name": "is_installed", + "type": "BOOLEAN", + "description": "1/0 boolean - is the station currently on the street", + "mode": "REQUIRED", + }, + { + "name": "is_renting", + "type": "BOOLEAN", + "description": "1/0 boolean - is the station currently renting bikes (even if the station is empty, if it is set to allow rentals this value should be 1)", + "mode": "REQUIRED", + }, + { + "name": "is_returning", + "type": "BOOLEAN", + "description": "1/0 boolean - is the station accepting bike returns (if a station is full but would allow a return if it was not full then this value should be 1)", + "mode": "REQUIRED", + }, + { + "name": "last_reported", + "type": "INTEGER", + "description": "Integer POSIX timestamp indicating the last time this station reported its status to the backend", + "mode": "REQUIRED", + }, + { + "name": "num_ebikes_available", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "eightd_has_available_keys", + "type": "BOOLEAN", + "description": "", + "mode": "NULLABLE", + }, + ], ) transform_csv >> load_to_bq diff --git a/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml b/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml index b2d6e035c..9207001ed 100644 --- a/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml +++ b/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml @@ -52,12 +52,12 @@ dag: values: - "pool-e2-standard-4" image_pull_policy: "Always" - image: "{{ var.json.san_francisco_bikeshare_status.container_registry.run_csv_transform_kub_bikeshare_status }}" + image: "{{ var.json.san_francisco_bikeshare_status.container_registry.run_csv_transform_kub }}" env_vars: - SOURCE_URL: "https://gbfs.baywheels.com/gbfs/en/station_status.json" + SOURCE_URL_JSON: "https://gbfs.baywheels.com/gbfs/en/station_status" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" - TARGET_GCS_BUCKET: "{{ var.values.composer_bucket }}" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv" resources: limit_memory: "2G" @@ -68,16 +68,58 @@ dag: args: task_id: "load_to_bq" - bucket: "{{ var.values.composer_bucket }}" + bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv"] source_format: "CSV" destination_project_dataset_table: "san_francisco_bikeshare_status.bikeshare_status" skip_leading_rows: 1 + allow_quoted_newlines: True write_disposition: "WRITE_TRUNCATE" schema_fields: - # - name: "trip_id" - # type: "INTEGER" - # mode: "NULLABLE" + - "name": "station_id" + "type": "INTEGER" + "description": "Unique identifier of a station" + "mode": "REQUIRED" + - "name": "num_bikes_available" + "type": "INTEGER" + "description": "Number of bikes available for rental" + "mode": "REQUIRED" + - "name": "num_bikes_disabled" + "type": "INTEGER" + "description": "Number of disabled bikes at the station. Vendors who do not want to publicize the number of disabled bikes or docks in their system can opt to omit station capacity (in station_information), num_bikes_disabled and num_docks_disabled. If station capacity is published then broken docks/bikes can be inferred (though not specifically whether the decreased capacity is a broken bike or dock)" + "mode": "NULLABLE" + - "name": "num_docks_available" + "type": "INTEGER" + "description": "Number of docks accepting bike returns" + "mode": "REQUIRED" + - "name": "num_docks_disabled" + "type": "INTEGER" + "description": "Number of empty but disabled dock points at the station. This value remains as part of the spec as it is possibly useful during development" + "mode": "NULLABLE" + - "name": "is_installed" + "type": "BOOLEAN" + "description": "1/0 boolean - is the station currently on the street" + "mode": "REQUIRED" + - "name": "is_renting" + "type": "BOOLEAN" + "description": "1/0 boolean - is the station currently renting bikes (even if the station is empty, if it is set to allow rentals this value should be 1)" + "mode": "REQUIRED" + - "name": "is_returning" + "type": "BOOLEAN" + "description": "1/0 boolean - is the station accepting bike returns (if a station is full but would allow a return if it was not full then this value should be 1)" + "mode": "REQUIRED" + - "name": "last_reported" + "type": "INTEGER" + "description": "Integer POSIX timestamp indicating the last time this station reported its status to the backend" + "mode": "REQUIRED" + - "name": "num_ebikes_available" + "type": "INTEGER" + "description": "" + "mode": "NULLABLE" + - "name": "eightd_has_available_keys" + "type": "BOOLEAN" + "description": "" + "mode": "NULLABLE" graph_paths: - "transform_csv >> load_to_bq" From b6a38ddf9c6d158c1653e4396a5f214c09ceff3e Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 7 Oct 2021 22:13:23 +0000 Subject: [PATCH 4/5] fix: Implemented chunksize --- .../_images/run_csv_transform_kub/Dockerfile | 17 --- .../run_csv_transform_kub/csv_transform.py | 132 +++++++++++------- .../bikeshare_status/pipeline.yaml | 5 +- 3 files changed, 85 insertions(+), 69 deletions(-) diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/Dockerfile b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/Dockerfile index 85af90570..748bc3bec 100644 --- a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/Dockerfile +++ b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/Dockerfile @@ -12,27 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -# The base image for this build -# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim FROM python:3.8 - -# Allow statements and log messages to appear in Cloud logs ENV PYTHONUNBUFFERED True - -# Copy the requirements file into the image COPY requirements.txt ./ - -# Install the packages specified in the requirements file RUN python3 -m pip install --no-cache-dir -r requirements.txt - -# The WORKDIR instruction sets the working directory for any RUN, CMD, -# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. -# If the WORKDIR doesn’t exist, it will be created even if it’s not used in -# any subsequent Dockerfile instruction WORKDIR /custom - -# Copy the specific data processing script/s in the image under /custom/* COPY ./csv_transform.py . - -# Command to run the data processing script when the container is run CMD ["python3", "csv_transform.py"] diff --git a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py index 20b3eebc5..22352fc26 100644 --- a/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco_bikeshare_status/_images/run_csv_transform_kub/csv_transform.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime import json import logging import os @@ -27,30 +26,77 @@ def main( source_url_json: str, source_file: pathlib.Path, target_file: pathlib.Path, + chunksize: str, target_gcs_bucket: str, target_gcs_path: str, ) -> None: logging.info("San Francisco - Bikeshare Status process started") - logging.info("Creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) logging.info(f"Extracting URL for status: {source_url_json}") - source_file_status_csv = str(source_file).replace(".csv", "") + "_status.csv" source_file_status_json = str(source_file).replace(".csv", "") + "_status.json" logging.info(f"Downloading states json file {source_url_json}") - download_file_json(source_url_json, source_file_status_json, source_file_status_csv) + download_file_json(source_url_json, source_file_status_json, source_file) + + chunksz = int(chunksize) + + logging.info(f"Opening batch file {source_file}") + with pd.read_csv( + source_file, # path to main source file to load in batches + engine="python", + encoding="utf-8", + quotechar='"', # string separator, typically double-quotes + chunksize=chunksz, # size of batch data, in no. of records + sep=",", # data column separator, typically "," + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk(df, target_file_batch, target_file, (not chunk_number == 0)) - logging.info(f"Opening status file {source_file_status_csv}") - df = pd.read_csv(source_file_status_csv) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info("San Francisco - Bikeshare Status process completed") + + +def process_chunk( + df: pd.DataFrame, target_file_batch: str, target_file: str, skip_header: bool +) -> None: + df = rename_headers(df) + df = filter_empty_data(df) + df = reorder_headers(df) + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + + +def rename_headers(df: pd.DataFrame) -> None: + header_names = { + "data.stations.eightd_has_available_keys": "eightd_has_available_keys", + "data.stations.is_installed": "is_installed", + "data.stations.is_renting": "is_renting", + "data.stations.is_returning": "is_returning", + "data.stations.last_reported": "last_reported", + "data.stations.num_bikes_available": "num_bikes_available", + "data.stations.num_bikes_disabled": "num_bikes_disabled", + "data.stations.num_docks_available": "num_docks_available", + "data.stations.num_docks_disabled": "num_docks_disabled", + "data.stations.num_ebikes_available": "num_ebikes_available", + "data.stations.station_id": "station_id", + } - logging.info(f"Transformation Process Starting.. {source_file}") + df.rename(columns=header_names, inplace=True) - logging.info(f"Renaming Columns {source_file_status_csv}") - rename_headers(df) + return df + +def filter_empty_data(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Filter rows with empty key data") df = df[df["station_id"] != ""] df = df[df["num_bikes_available"] != ""] df = df[df["num_docks_available"] != ""] @@ -59,6 +105,10 @@ def main( df = df[df["is_returning"] != ""] df = df[df["last_reported"] != ""] + return df + + +def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: logging.info("Re-ordering Headers") df = df[ [ @@ -76,53 +126,34 @@ def main( ] ] - logging.info(f"Transformation Process complete .. {source_file}") - - logging.info(f"Saving to output file.. {target_file}") - - try: - save_to_new_file(df, file_path=str(target_file)) - except Exception as e: - logging.error(f"Error saving output file: {e}.") - - logging.info( - f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" - ) - upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - - logging.info("San Francisco - Bikeshare Status process completed") - - -def datetime_from_int(dt_int: int) -> str: - return datetime.datetime.fromtimestamp(dt_int).strftime("%Y-%m-%d %H:%M:%S") - - -def convert_dt_format(date_str: str, time_str: str) -> str: - return str(datetime.datetime.strptime(date_str, "%m/%d/%Y").date()) + " " + time_str - - -def rename_headers(df: pd.DataFrame) -> None: - header_names = { - "data.stations.eightd_has_available_keys": "eightd_has_available_keys", - "data.stations.is_installed": "is_installed", - "data.stations.is_renting": "is_renting", - "data.stations.is_returning": "is_returning", - "data.stations.last_reported": "last_reported", - "data.stations.num_bikes_available": "num_bikes_available", - "data.stations.num_bikes_disabled": "num_bikes_disabled", - "data.stations.num_docks_available": "num_docks_available", - "data.stations.num_docks_disabled": "num_docks_disabled", - "data.stations.num_ebikes_available": "num_ebikes_available", - "data.stations.station_id": "station_id", - } - - df.rename(columns=header_names, inplace=True) + return df def save_to_new_file(df, file_path) -> None: df.to_csv(file_path, index=False) +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + data_file = open(batch_file_path, "r") + if truncate_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + def download_file_json( source_url_json: str, source_file_json: str, source_file_csv: str ) -> None: @@ -163,6 +194,7 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) source_url_json=os.environ["SOURCE_URL_JSON"], source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], target_gcs_path=os.environ["TARGET_GCS_PATH"], ) diff --git a/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml b/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml index 9207001ed..50be3647a 100644 --- a/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml +++ b/datasets/san_francisco_bikeshare_status/bikeshare_status/pipeline.yaml @@ -57,11 +57,12 @@ dag: SOURCE_URL_JSON: "https://gbfs.baywheels.com/gbfs/en/station_status" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "750000" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv" resources: - limit_memory: "2G" - limit_cpu: "1" + limit_memory: "8G" + limit_cpu: "3" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" From 706dbd95a7e7e54584abac37cc38e55bbdd1f801 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Fri, 8 Oct 2021 14:30:08 +0000 Subject: [PATCH 5/5] fix: adding dag file --- .../bikeshare_status/bikeshare_status_dag.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py b/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py index 0453e6475..6e8cb0982 100644 --- a/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py +++ b/datasets/san_francisco_bikeshare_status/bikeshare_status/bikeshare_status_dag.py @@ -61,10 +61,11 @@ "SOURCE_URL_JSON": "https://gbfs.baywheels.com/gbfs/en/station_status", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "750000", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/san_francisco_bikeshare_status/bikeshare_status/data_output.csv", }, - resources={"limit_memory": "2G", "limit_cpu": "1"}, + resources={"limit_memory": "8G", "limit_cpu": "3"}, ) # Task to load CSV data to a BigQuery table