From 58c3fc1126580dc7b1ec023edab10df3a892e58c Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 2 Sep 2021 20:25:02 +0000 Subject: [PATCH 1/6] feat: sunroof implementation census_tract and postal_code --- .../Dockerfile | 38 +++ .../csv_transform.py | 169 ++++++++++++ .../requirements.txt | 3 + .../Dockerfile | 38 +++ .../csv_transform.py | 169 ++++++++++++ .../requirements.txt | 3 + datasets/sunroof/_terraform/provider.tf | 28 ++ ...olar_potential_by_census_tract_pipeline.tf | 39 +++ ...solar_potential_by_postal_code_pipeline.tf | 39 +++ .../sunroof/_terraform/sunroof_dataset.tf | 26 ++ datasets/sunroof/_terraform/variables.tf | 23 ++ datasets/sunroof/dataset.yaml | 58 ++++ .../pipeline.yaml | 199 ++++++++++++++ .../solar_potential_by_census_tract_dag.py | 258 ++++++++++++++++++ .../pipeline.yaml | 199 ++++++++++++++ .../solar_potential_by_postal_code_dag.py | 258 ++++++++++++++++++ datasets/sunroof/sunroof_variables.json | 14 + 17 files changed, 1561 insertions(+) create mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/Dockerfile create mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py create mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/requirements.txt create mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/Dockerfile create mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py create mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/requirements.txt create mode 100644 datasets/sunroof/_terraform/provider.tf create mode 100644 datasets/sunroof/_terraform/solar_potential_by_census_tract_pipeline.tf create mode 100644 datasets/sunroof/_terraform/solar_potential_by_postal_code_pipeline.tf create mode 100644 datasets/sunroof/_terraform/sunroof_dataset.tf create mode 100644 datasets/sunroof/_terraform/variables.tf create mode 100644 datasets/sunroof/dataset.yaml create mode 100644 datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml create mode 100644 datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py create mode 100644 datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml create mode 100644 datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py create mode 100644 datasets/sunroof/sunroof_variables.json diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/Dockerfile b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/Dockerfile new file mode 100644 index 000000000..85af90570 --- /dev/null +++ b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/Dockerfile @@ -0,0 +1,38 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py new file mode 100644 index 000000000..f8df16d1f --- /dev/null +++ b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py @@ -0,0 +1,169 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# import modules +import datetime +import logging +import os +import pathlib +import pdb +from subprocess import PIPE, Popen + +import pandas as pd +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + + logging.info(f"Sunroof Solar Potential By Census Tract process started") + + logging.info("creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file from {source_url} to {source_file}") + download_file_gs(source_url, source_file) + + logging.info(f"Opening file {source_file}") + df = pd.read_csv(source_file) + + logging.info(f"Transformation Process Starting.. {source_file}") + + logging.info(f"Transform: Renaming Headers.. {source_file}") + rename_headers(df) + + logging.info("Transform: Removing NULL text") + remove_nan_cols(df) + + logging.info("Transform: Adding geography field") + df["center_point"] = "POINT( " + df["lng_avg"].map(str) + " " + df["lat_avg"].map(str) + " )" + + logging.info("Transform: Reordering headers..") + df = df[ + [ + "region_name", + "state_name", + "lat_max", + "lat_min", + "lng_max", + "lng_min", + "lat_avg", + "lng_avg", + "yearly_sunlight_kwh_kw_threshold_avg", + "count_qualified", + "percent_covered", + "percent_qualified", + "number_of_panels_n", + "number_of_panels_s", + "number_of_panels_e", + "number_of_panels_w", + "number_of_panels_f", + "number_of_panels_median", + "number_of_panels_total", + "kw_median", + "kw_total", + "yearly_sunlight_kwh_n", + "yearly_sunlight_kwh_s", + "yearly_sunlight_kwh_e", + "yearly_sunlight_kwh_w", + "yearly_sunlight_kwh_f", + "yearly_sunlight_kwh_median", + "yearly_sunlight_kwh_total", + "install_size_kw_buckets", + "carbon_offset_metric_tons", + "existing_installs_count" + ] + ] + + logging.info(f"Transformation Process complete .. {source_file}") + + logging.info(f"Saving to output file.. {target_file}") + + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info(f"Sunroof Solar Potential By Census Tract process completed") + +def remove_nan(dt_str: str) -> int: + if dt_str is None or len(str(dt_str)) == 0 or str(dt_str) == "nan": + return int() + else: + return int(dt_str) + +def remove_nan_cols(df: pd.DataFrame) -> None: + cols = { + "count_qualified", + "existing_installs_count", + "number_of_panels_n", + "number_of_panels_s", + "number_of_panels_e", + "number_of_panels_w", + "number_of_panels_f", + "number_of_panels_median", + "number_of_panels_total" + } + + for col in cols: + df[col] = df[col].apply(remove_nan) + + +def rename_headers(df: pd.DataFrame) -> None: + header_names = { + "install_size_kw_buckets_json": "install_size_kw_buckets" + } + + df = df.rename(columns=header_names, inplace=True) + + +def save_to_new_file(df: pd.DataFrame, file_path) -> None: + df.to_csv(file_path, index=False) + + +def download_file_gs(source_url: str, source_file: pathlib.Path) -> None: + try: + process = Popen(['gsutil', 'cp', source_url, source_file], stdout=PIPE, stderr=PIPE) + process.communicate() + except ValueError: + logging.error(f"Couldn't download {source_url}: {ValueError}") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + ) diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/requirements.txt b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/Dockerfile b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/Dockerfile new file mode 100644 index 000000000..85af90570 --- /dev/null +++ b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/Dockerfile @@ -0,0 +1,38 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py new file mode 100644 index 000000000..a6d1540cf --- /dev/null +++ b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py @@ -0,0 +1,169 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# import modules +import datetime +import logging +import os +import pathlib +import pdb +from subprocess import PIPE, Popen + +import pandas as pd +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, +) -> None: + + logging.info(f"Sunroof Solar Potential By Postal Code process started") + + logging.info("creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file from {source_url} to {source_file}") + download_file_gs(source_url, source_file) + + logging.info(f"Opening file {source_file}") + df = pd.read_csv(source_file) + + logging.info(f"Transformation Process Starting.. {source_file}") + + logging.info(f"Transform: Renaming Headers.. {source_file}") + rename_headers(df) + + logging.info("Transform: Removing NULL text") + remove_nan_cols(df) + + logging.info("Transform: Adding geography field") + df["center_point"] = "POINT( " + df["lng_avg"].map(str) + " " + df["lat_avg"].map(str) + " )" + + logging.info("Transform: Reordering headers..") + df = df[ + [ + "region_name", + "state_name", + "lat_max", + "lat_min", + "lng_max", + "lng_min", + "lat_avg", + "lng_avg", + "yearly_sunlight_kwh_kw_threshold_avg", + "count_qualified", + "percent_covered", + "percent_qualified", + "number_of_panels_n", + "number_of_panels_s", + "number_of_panels_e", + "number_of_panels_w", + "number_of_panels_f", + "number_of_panels_median", + "number_of_panels_total", + "kw_median", + "kw_total", + "yearly_sunlight_kwh_n", + "yearly_sunlight_kwh_s", + "yearly_sunlight_kwh_e", + "yearly_sunlight_kwh_w", + "yearly_sunlight_kwh_f", + "yearly_sunlight_kwh_median", + "yearly_sunlight_kwh_total", + "install_size_kw_buckets", + "carbon_offset_metric_tons", + "existing_installs_count" + ] + ] + + logging.info(f"Transformation Process complete .. {source_file}") + + logging.info(f"Saving to output file.. {target_file}") + + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info(f"Sunroof Solar Potential By Postal Code process completed") + +def remove_nan(dt_str: str) -> int: + if dt_str is None or len(str(dt_str)) == 0 or str(dt_str) == "nan": + return int() + else: + return int(dt_str) + +def remove_nan_cols(df: pd.DataFrame) -> None: + cols = { + "count_qualified", + "existing_installs_count", + "number_of_panels_n", + "number_of_panels_s", + "number_of_panels_e", + "number_of_panels_w", + "number_of_panels_f", + "number_of_panels_median", + "number_of_panels_total" + } + + for col in cols: + df[col] = df[col].apply(remove_nan) + + +def rename_headers(df: pd.DataFrame) -> None: + header_names = { + "install_size_kw_buckets_json": "install_size_kw_buckets" + } + + df = df.rename(columns=header_names, inplace=True) + + +def save_to_new_file(df: pd.DataFrame, file_path) -> None: + df.to_csv(file_path, index=False) + + +def download_file_gs(source_url: str, source_file: pathlib.Path) -> None: + try: + process = Popen(['gsutil', 'cp', source_url, source_file], stdout=PIPE, stderr=PIPE) + process.communicate() + except ValueError: + logging.error(f"Couldn't download {source_url}: {ValueError}") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + ) diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/requirements.txt b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/sunroof/_terraform/provider.tf b/datasets/sunroof/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/sunroof/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/sunroof/_terraform/solar_potential_by_census_tract_pipeline.tf b/datasets/sunroof/_terraform/solar_potential_by_census_tract_pipeline.tf new file mode 100644 index 000000000..b8eecce01 --- /dev/null +++ b/datasets/sunroof/_terraform/solar_potential_by_census_tract_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "solar_potential_by_census_tract" { + project = var.project_id + dataset_id = "sunroof" + table_id = "solar_potential_by_census_tract" + + description = "Sunroof Solar Potential By Census Tract" + + + + + depends_on = [ + google_bigquery_dataset.sunroof + ] +} + +output "bigquery_table-solar_potential_by_census_tract-table_id" { + value = google_bigquery_table.solar_potential_by_census_tract.table_id +} + +output "bigquery_table-solar_potential_by_census_tract-id" { + value = google_bigquery_table.solar_potential_by_census_tract.id +} diff --git a/datasets/sunroof/_terraform/solar_potential_by_postal_code_pipeline.tf b/datasets/sunroof/_terraform/solar_potential_by_postal_code_pipeline.tf new file mode 100644 index 000000000..82040a0b6 --- /dev/null +++ b/datasets/sunroof/_terraform/solar_potential_by_postal_code_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "solar_potential_by_postal_code" { + project = var.project_id + dataset_id = "sunroof" + table_id = "solar_potential_by_postal_code" + + description = "Sunroof Solar Potential By Postal Code" + + + + + depends_on = [ + google_bigquery_dataset.sunroof + ] +} + +output "bigquery_table-solar_potential_by_postal_code-table_id" { + value = google_bigquery_table.solar_potential_by_postal_code.table_id +} + +output "bigquery_table-solar_potential_by_postal_code-id" { + value = google_bigquery_table.solar_potential_by_postal_code.id +} diff --git a/datasets/sunroof/_terraform/sunroof_dataset.tf b/datasets/sunroof/_terraform/sunroof_dataset.tf new file mode 100644 index 000000000..68d241622 --- /dev/null +++ b/datasets/sunroof/_terraform/sunroof_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "sunroof" { + dataset_id = "sunroof" + project = var.project_id + description = "sunroof" +} + +output "bigquery_dataset-sunroof-dataset_id" { + value = google_bigquery_dataset.sunroof.dataset_id +} diff --git a/datasets/sunroof/_terraform/variables.tf b/datasets/sunroof/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/sunroof/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/sunroof/dataset.yaml b/datasets/sunroof/dataset.yaml new file mode 100644 index 000000000..f1a20a80e --- /dev/null +++ b/datasets/sunroof/dataset.yaml @@ -0,0 +1,58 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + # The `dataset` block includes properties for your dataset that will be shown + # to users of your data on the Google Cloud website. + + # Must be exactly the same name as the folder name your dataset.yaml is in. + name: sunroof + + # A friendly, human-readable name of the dataset + friendly_name: #friendlyname + + # A short, descriptive summary of the dataset. + description: #~friendlydescription + + # A list of sources the dataset is derived from, using the YAML list syntax. + dataset_sources: ~ + + # A list of terms and conditions that users of the dataset should agree on, + # using the YAML list syntax. + terms_of_use: ~ + + +resources: + # A list of Google Cloud resources needed by your dataset. In principle, all + # pipelines under a dataset should be able to share these resources. + # + # The currently supported resources are shown below. Use only the resources + # you need, and delete the rest as needed by your pipeline. + # + # We will keep adding to the list below to support more Google Cloud resources + # over time. If a resource you need isn't supported, please file an issue on + # the repository. + + - type: bigquery_dataset + # Google BigQuery dataset to namespace all tables managed by this folder + # + # Required Properties: + # dataset_id + # + # Optional Properties: + # friendly_name (A user-friendly name of the dataset) + # description (A user-friendly description of the dataset) + # location (The geographic location where the dataset should reside) + dataset_id: sunroof + description: sunroof diff --git a/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml b/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml new file mode 100644 index 000000000..df3dc62a2 --- /dev/null +++ b/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml @@ -0,0 +1,199 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "solar_potential_by_census_tract" + description: "Sunroof Solar Potential By Census Tract" + +dag: + airflow_version: 2 + initialize: + dag_id: solar_potential_by_census_tract + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" # run once a week at Sunday 12am + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "solar_potential_by_census_tract" + namespace: "default" + image_pull_policy: "Always" + image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub_solar_potential_by_census_tract }}" + env_vars: + SOURCE_URL: "gs://project-sunroof/csv/latest/project-sunroof-census_tract.csv" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/sunroof/solar_potential_by_census_tract/data_output.csv" + resources: + limit_memory: "4G" + limit_cpu: "2" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.json.shared.composer_bucket }}" + source_objects: ["data/sunroof/solar_potential_by_census_tract/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "sunroof.solar_potential_by_census_tract" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - "name": "region_name" + "type": "STRING" + "description": "Census Tract" + "mode": "NULLABLE" + - "name": "state_name" + "type": "STRING" + "description": "Name of the state containing that region" + "mode": "NULLABLE" + - "name": "lat_max" + "type": "FLOAT" + "description": "maximum latitude for that region" + "mode": "NULLABLE" + - "name": "lat_min" + "type": "FLOAT" + "description": "minimum latitude for that region" + "mode": "NULLABLE" + - "name": "lng_max" + "type": "FLOAT" + "description": "maximum longitude for that region" + "mode": "NULLABLE" + - "name": "lng_min" + "type": "FLOAT" + "description": "minimum longitude for that region" + "mode": "NULLABLE" + - "name": "lat_avg" + "type": "FLOAT" + "description": "average latitude for that region" + "mode": "NULLABLE" + - "name": "lng_avg" + "type": "FLOAT" + "description": "average longitude for that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_kw_threshold_avg" + "type": "FLOAT" + "description": "75% of the optimimum sunlight in the county containing that zip code" + "mode": "NULLABLE" + - "name": "count_qualified" + "type": "INTEGER" + "description": "# of buildings in Google Maps that are suitable for solar" + "mode": "NULLABLE" + - "name": "percent_covered" + "type": "FLOAT" + "description": "% of buildings in Google Maps covered by Project Sunroof" + "mode": "NULLABLE" + - "name": "percent_qualified" + "type": "FLOAT" + "description": "% of buildings covered by Project Sunroof that are suitable for solar" + "mode": "NULLABLE" + - "name": "number_of_panels_n" + "type": "INTEGER" + "description": "# of solar panels potential for north-facing roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_s" + "type": "INTEGER" + "description": "# of solar panels potential for south-facing roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_e" + "type": "INTEGER" + "description": "# of solar panels potential for east-facing roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_w" + "type": "INTEGER" + "description": "# of solar panels potential for west-facing roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_f" + "type": "INTEGER" + "description": "# of solar panels potential for flat roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_median" + "type": "INTEGER" + "description": "# of panels that fit on the median roof" + "mode": "NULLABLE" + - "name": "number_of_panels_total" + "type": "INTEGER" + "description": "# of solar panels potential for all roof space in that region, assuming 1.650m 0.992m panels" + "mode": "NULLABLE" + - "name": "kw_median" + "type": "FLOAT" + "description": "kW of solar potential for the median building in that region (assuming 250 watts per panel)" + "mode": "NULLABLE" + - "name": "kw_total" + "type": "FLOAT" + "description": "# of kW of solar potential for all roof types in that region (assuming 250 watts per panel)" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_n" + "type": "FLOAT" + "description": "total solar energy generation potential for north-facing roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_s" + "type": "FLOAT" + "description": "total solar energy generation potential for south-facing roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_e" + "type": "FLOAT" + "description": "total solar energy generation potential for east-facing roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_w" + "type": "FLOAT" + "description": "total solar energy generation potential for west-facing roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_f" + "type": "FLOAT" + "description": "total solar energy generation potential for flat roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_median" + "type": "FLOAT" + "description": "kWh/kw/yr for the median roof, in DC (not AC) terms" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_total" + "type": "FLOAT" + "description": "total solar energy generation potential for all roof space in that region" + "mode": "NULLABLE" + - "name": "install_size_kw_buckets" + "type": "STRING" + "description": "# of buildings with potential for various installation size buckets. Format is a JSON array, where each element is a tuple containing (1) lower bound of bucket, in kW, and (2) number of buildings in that bucket." + "mode": "NULLABLE" + - "name": "carbon_offset_metric_tons" + "type": "FLOAT" + "description": "The potential carbon dioxide abatement of the solar capacity that meets the technical potential criteria. The calculation uses eGRID subregion CO2 equivalent non-baseload output emission rates. https://www.epa.gov/sites/production/files/2015-10/documents/egrid2012_summarytables_0.pdf" + "mode": "NULLABLE" + - "name": "existing_installs_count" + "type": "INTEGER" + "description": "# of buildings estimated to have a solar installation, at time of data collection" + "mode": "NULLABLE" + - "name": "center_point" + "type": "GEOGRAPHY" + "description": "" + "mode": "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py b/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py new file mode 100644 index 000000000..74f7e8859 --- /dev/null +++ b/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py @@ -0,0 +1,258 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="sunroof.solar_potential_by_census_tract", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="solar_potential_by_census_tract", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.sunroof.container_registry.run_csv_transform_kub_solar_potential_by_census_tract }}", + env_vars={ + "SOURCE_URL": "gs://project-sunroof/csv/latest/project-sunroof-census_tract.csv", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/sunroof/solar_potential_by_census_tract/data_output.csv", + }, + resources={"limit_memory": "4G", "limit_cpu": "2"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=["data/sunroof/solar_potential_by_census_tract/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="sunroof.solar_potential_by_census_tract", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "region_name", + "type": "STRING", + "description": "Census Tract", + "mode": "NULLABLE", + }, + { + "name": "state_name", + "type": "STRING", + "description": "Name of the state containing that region", + "mode": "NULLABLE", + }, + { + "name": "lat_max", + "type": "FLOAT", + "description": "maximum latitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lat_min", + "type": "FLOAT", + "description": "minimum latitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lng_max", + "type": "FLOAT", + "description": "maximum longitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lng_min", + "type": "FLOAT", + "description": "minimum longitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lat_avg", + "type": "FLOAT", + "description": "average latitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lng_avg", + "type": "FLOAT", + "description": "average longitude for that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_kw_threshold_avg", + "type": "FLOAT", + "description": "75% of the optimimum sunlight in the county containing that zip code", + "mode": "NULLABLE", + }, + { + "name": "count_qualified", + "type": "INTEGER", + "description": "# of buildings in Google Maps that are suitable for solar", + "mode": "NULLABLE", + }, + { + "name": "percent_covered", + "type": "FLOAT", + "description": "% of buildings in Google Maps covered by Project Sunroof", + "mode": "NULLABLE", + }, + { + "name": "percent_qualified", + "type": "FLOAT", + "description": "% of buildings covered by Project Sunroof that are suitable for solar", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_n", + "type": "INTEGER", + "description": "# of solar panels potential for north-facing roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_s", + "type": "INTEGER", + "description": "# of solar panels potential for south-facing roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_e", + "type": "INTEGER", + "description": "# of solar panels potential for east-facing roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_w", + "type": "INTEGER", + "description": "# of solar panels potential for west-facing roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_f", + "type": "INTEGER", + "description": "# of solar panels potential for flat roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_median", + "type": "INTEGER", + "description": "# of panels that fit on the median roof", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_total", + "type": "INTEGER", + "description": "# of solar panels potential for all roof space in that region, assuming 1.650m 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "kw_median", + "type": "FLOAT", + "description": "kW of solar potential for the median building in that region (assuming 250 watts per panel)", + "mode": "NULLABLE", + }, + { + "name": "kw_total", + "type": "FLOAT", + "description": "# of kW of solar potential for all roof types in that region (assuming 250 watts per panel)", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_n", + "type": "FLOAT", + "description": "total solar energy generation potential for north-facing roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_s", + "type": "FLOAT", + "description": "total solar energy generation potential for south-facing roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_e", + "type": "FLOAT", + "description": "total solar energy generation potential for east-facing roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_w", + "type": "FLOAT", + "description": "total solar energy generation potential for west-facing roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_f", + "type": "FLOAT", + "description": "total solar energy generation potential for flat roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_median", + "type": "FLOAT", + "description": "kWh/kw/yr for the median roof, in DC (not AC) terms", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_total", + "type": "FLOAT", + "description": "total solar energy generation potential for all roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "install_size_kw_buckets", + "type": "STRING", + "description": "# of buildings with potential for various installation size buckets. Format is a JSON array, where each element is a tuple containing (1) lower bound of bucket, in kW, and (2) number of buildings in that bucket.", + "mode": "NULLABLE", + }, + { + "name": "carbon_offset_metric_tons", + "type": "FLOAT", + "description": "The potential carbon dioxide abatement of the solar capacity that meets the technical potential criteria. The calculation uses eGRID subregion CO2 equivalent non-baseload output emission rates. https://www.epa.gov/sites/production/files/2015-10/documents/egrid2012_summarytables_0.pdf", + "mode": "NULLABLE", + }, + { + "name": "existing_installs_count", + "type": "INTEGER", + "description": "# of buildings estimated to have a solar installation, at time of data collection", + "mode": "NULLABLE", + }, + { + "name": "center_point", + "type": "GEOGRAPHY", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml b/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml new file mode 100644 index 000000000..e3cfc84ac --- /dev/null +++ b/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml @@ -0,0 +1,199 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "solar_potential_by_postal_code" + description: "Sunroof Solar Potential By Postal Code" + +dag: + airflow_version: 2 + initialize: + dag_id: solar_potential_by_postal_code + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" # run once a week at Sunday 12am + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "solar_potential_by_postal_code" + namespace: "default" + image_pull_policy: "Always" + image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub_solar_potential_by_postal_code }}" + env_vars: + SOURCE_URL: "gs://project-sunroof/csv/latest/project-sunroof-postal_code.csv" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/sunroof/solar_potential_by_postal_code/data_output.csv" + resources: + limit_memory: "4G" + limit_cpu: "2" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.json.shared.composer_bucket }}" + source_objects: ["data/sunroof/solar_potential_by_postal_code/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "sunroof.solar_potential_by_postal_code" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - "name": "region_name" + "type": "STRING" + "description": "Census Tract" + "mode": "NULLABLE" + - "name": "state_name" + "type": "STRING" + "description": "Name of the state containing that region" + "mode": "NULLABLE" + - "name": "lat_max" + "type": "FLOAT" + "description": "maximum latitude for that region" + "mode": "NULLABLE" + - "name": "lat_min" + "type": "FLOAT" + "description": "minimum latitude for that region" + "mode": "NULLABLE" + - "name": "lng_max" + "type": "FLOAT" + "description": "maximum longitude for that region" + "mode": "NULLABLE" + - "name": "lng_min" + "type": "FLOAT" + "description": "minimum longitude for that region" + "mode": "NULLABLE" + - "name": "lat_avg" + "type": "FLOAT" + "description": "average latitude for that region" + "mode": "NULLABLE" + - "name": "lng_avg" + "type": "FLOAT" + "description": "average longitude for that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_kw_threshold_avg" + "type": "FLOAT" + "description": "75% of the optimimum sunlight in the county containing that zip code" + "mode": "NULLABLE" + - "name": "count_qualified" + "type": "INTEGER" + "description": "# of buildings in Google Maps that are suitable for solar" + "mode": "NULLABLE" + - "name": "percent_covered" + "type": "FLOAT" + "description": "% of buildings in Google Maps covered by Project Sunroof" + "mode": "NULLABLE" + - "name": "percent_qualified" + "type": "FLOAT" + "description": "% of buildings covered by Project Sunroof that are suitable for solar" + "mode": "NULLABLE" + - "name": "number_of_panels_n" + "type": "INTEGER" + "description": "# of solar panels potential for north-facing roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_s" + "type": "INTEGER" + "description": "# of solar panels potential for south-facing roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_e" + "type": "INTEGER" + "description": "# of solar panels potential for east-facing roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_w" + "type": "INTEGER" + "description": "# of solar panels potential for west-facing roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_f" + "type": "INTEGER" + "description": "# of solar panels potential for flat roof space in that region, assuming 1.650m x 0.992m panels" + "mode": "NULLABLE" + - "name": "number_of_panels_median" + "type": "INTEGER" + "description": "# of panels that fit on the median roof" + "mode": "NULLABLE" + - "name": "number_of_panels_total" + "type": "INTEGER" + "description": "# of solar panels potential for all roof space in that region, assuming 1.650m 0.992m panels" + "mode": "NULLABLE" + - "name": "kw_median" + "type": "FLOAT" + "description": "kW of solar potential for the median building in that region (assuming 250 watts per panel)" + "mode": "NULLABLE" + - "name": "kw_total" + "type": "FLOAT" + "description": "# of kW of solar potential for all roof types in that region (assuming 250 watts per panel)" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_n" + "type": "FLOAT" + "description": "total solar energy generation potential for north-facing roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_s" + "type": "FLOAT" + "description": "total solar energy generation potential for south-facing roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_e" + "type": "FLOAT" + "description": "total solar energy generation potential for east-facing roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_w" + "type": "FLOAT" + "description": "total solar energy generation potential for west-facing roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_f" + "type": "FLOAT" + "description": "total solar energy generation potential for flat roof space in that region" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_median" + "type": "FLOAT" + "description": "kWh/kw/yr for the median roof, in DC (not AC) terms" + "mode": "NULLABLE" + - "name": "yearly_sunlight_kwh_total" + "type": "FLOAT" + "description": "total solar energy generation potential for all roof space in that region" + "mode": "NULLABLE" + - "name": "install_size_kw_buckets" + "type": "STRING" + "description": "# of buildings with potential for various installation size buckets. Format is a JSON array, where each element is a tuple containing (1) lower bound of bucket, in kW, and (2) number of buildings in that bucket." + "mode": "NULLABLE" + - "name": "carbon_offset_metric_tons" + "type": "FLOAT" + "description": "The potential carbon dioxide abatement of the solar capacity that meets the technical potential criteria. The calculation uses eGRID subregion CO2 equivalent non-baseload output emission rates. https://www.epa.gov/sites/production/files/2015-10/documents/egrid2012_summarytables_0.pdf" + "mode": "NULLABLE" + - "name": "existing_installs_count" + "type": "INTEGER" + "description": "# of buildings estimated to have a solar installation, at time of data collection" + "mode": "NULLABLE" + - "name": "center_point" + "type": "GEOGRAPHY" + "description": "" + "mode": "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py b/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py new file mode 100644 index 000000000..3fd2b247f --- /dev/null +++ b/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py @@ -0,0 +1,258 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="sunroof.solar_potential_by_postal_code", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="solar_potential_by_postal_code", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.sunroof.container_registry.run_csv_transform_kub_solar_potential_by_postal_code }}", + env_vars={ + "SOURCE_URL": "gs://project-sunroof/csv/latest/project-sunroof-postal_code.csv", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/sunroof/solar_potential_by_postal_code/data_output.csv", + }, + resources={"limit_memory": "4G", "limit_cpu": "2"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=["data/sunroof/solar_potential_by_postal_code/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="sunroof.solar_potential_by_postal_code", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "region_name", + "type": "STRING", + "description": "Census Tract", + "mode": "NULLABLE", + }, + { + "name": "state_name", + "type": "STRING", + "description": "Name of the state containing that region", + "mode": "NULLABLE", + }, + { + "name": "lat_max", + "type": "FLOAT", + "description": "maximum latitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lat_min", + "type": "FLOAT", + "description": "minimum latitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lng_max", + "type": "FLOAT", + "description": "maximum longitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lng_min", + "type": "FLOAT", + "description": "minimum longitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lat_avg", + "type": "FLOAT", + "description": "average latitude for that region", + "mode": "NULLABLE", + }, + { + "name": "lng_avg", + "type": "FLOAT", + "description": "average longitude for that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_kw_threshold_avg", + "type": "FLOAT", + "description": "75% of the optimimum sunlight in the county containing that zip code", + "mode": "NULLABLE", + }, + { + "name": "count_qualified", + "type": "INTEGER", + "description": "# of buildings in Google Maps that are suitable for solar", + "mode": "NULLABLE", + }, + { + "name": "percent_covered", + "type": "FLOAT", + "description": "% of buildings in Google Maps covered by Project Sunroof", + "mode": "NULLABLE", + }, + { + "name": "percent_qualified", + "type": "FLOAT", + "description": "% of buildings covered by Project Sunroof that are suitable for solar", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_n", + "type": "INTEGER", + "description": "# of solar panels potential for north-facing roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_s", + "type": "INTEGER", + "description": "# of solar panels potential for south-facing roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_e", + "type": "INTEGER", + "description": "# of solar panels potential for east-facing roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_w", + "type": "INTEGER", + "description": "# of solar panels potential for west-facing roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_f", + "type": "INTEGER", + "description": "# of solar panels potential for flat roof space in that region, assuming 1.650m x 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_median", + "type": "INTEGER", + "description": "# of panels that fit on the median roof", + "mode": "NULLABLE", + }, + { + "name": "number_of_panels_total", + "type": "INTEGER", + "description": "# of solar panels potential for all roof space in that region, assuming 1.650m 0.992m panels", + "mode": "NULLABLE", + }, + { + "name": "kw_median", + "type": "FLOAT", + "description": "kW of solar potential for the median building in that region (assuming 250 watts per panel)", + "mode": "NULLABLE", + }, + { + "name": "kw_total", + "type": "FLOAT", + "description": "# of kW of solar potential for all roof types in that region (assuming 250 watts per panel)", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_n", + "type": "FLOAT", + "description": "total solar energy generation potential for north-facing roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_s", + "type": "FLOAT", + "description": "total solar energy generation potential for south-facing roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_e", + "type": "FLOAT", + "description": "total solar energy generation potential for east-facing roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_w", + "type": "FLOAT", + "description": "total solar energy generation potential for west-facing roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_f", + "type": "FLOAT", + "description": "total solar energy generation potential for flat roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_median", + "type": "FLOAT", + "description": "kWh/kw/yr for the median roof, in DC (not AC) terms", + "mode": "NULLABLE", + }, + { + "name": "yearly_sunlight_kwh_total", + "type": "FLOAT", + "description": "total solar energy generation potential for all roof space in that region", + "mode": "NULLABLE", + }, + { + "name": "install_size_kw_buckets", + "type": "STRING", + "description": "# of buildings with potential for various installation size buckets. Format is a JSON array, where each element is a tuple containing (1) lower bound of bucket, in kW, and (2) number of buildings in that bucket.", + "mode": "NULLABLE", + }, + { + "name": "carbon_offset_metric_tons", + "type": "FLOAT", + "description": "The potential carbon dioxide abatement of the solar capacity that meets the technical potential criteria. The calculation uses eGRID subregion CO2 equivalent non-baseload output emission rates. https://www.epa.gov/sites/production/files/2015-10/documents/egrid2012_summarytables_0.pdf", + "mode": "NULLABLE", + }, + { + "name": "existing_installs_count", + "type": "INTEGER", + "description": "# of buildings estimated to have a solar installation, at time of data collection", + "mode": "NULLABLE", + }, + { + "name": "center_point", + "type": "GEOGRAPHY", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/sunroof/sunroof_variables.json b/datasets/sunroof/sunroof_variables.json new file mode 100644 index 000000000..31bd0b10d --- /dev/null +++ b/datasets/sunroof/sunroof_variables.json @@ -0,0 +1,14 @@ +{ + "shared": { + "composer_bucket": "us-central1-pipelines-dev-fc245d58-bucket", + "airflow_data_folder": "/home/airflow/gcs/data", + "airflow_home": "/home/airflow/gcs" + }, + "sunroof": { + "destination_bucket": "public-datasets-dev-sunroof-project", + "container_registry": { + "run_csv_transform_kub_solar_potential_by_census_tract": "gcr.io/bigquery-public-data-dev/sunroof__run_csv_transform_kub_solar_potential_by_census_tract:latest", + "run_csv_transform_kub_solar_potential_by_postal_code": "gcr.io/bigquery-public-data-dev/sunroof__run_csv_transform_kub_solar_potential_by_postal_code:latest" + } + } +} From cbeb46cb1c05ff76a773f6a8e00c3caaa9c9f748 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 2 Sep 2021 20:31:40 +0000 Subject: [PATCH 2/6] fix: resolved flake8 --- .../csv_transform.py | 42 ++++++++++--------- .../csv_transform.py | 42 ++++++++++--------- 2 files changed, 44 insertions(+), 40 deletions(-) diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py index f8df16d1f..49fc116d1 100644 --- a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py +++ b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py @@ -13,11 +13,9 @@ # limitations under the License. # import modules -import datetime import logging import os import pathlib -import pdb from subprocess import PIPE, Popen import pandas as pd @@ -32,7 +30,7 @@ def main( target_gcs_path: str, ) -> None: - logging.info(f"Sunroof Solar Potential By Census Tract process started") + logging.info("Sunroof Solar Potential By Census Tract process started") logging.info("creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) @@ -52,7 +50,9 @@ def main( remove_nan_cols(df) logging.info("Transform: Adding geography field") - df["center_point"] = "POINT( " + df["lng_avg"].map(str) + " " + df["lat_avg"].map(str) + " )" + df["center_point"] = ( + "POINT( " + df["lng_avg"].map(str) + " " + df["lat_avg"].map(str) + " )" + ) logging.info("Transform: Reordering headers..") df = df[ @@ -87,7 +87,7 @@ def main( "yearly_sunlight_kwh_total", "install_size_kw_buckets", "carbon_offset_metric_tons", - "existing_installs_count" + "existing_installs_count", ] ] @@ -105,7 +105,8 @@ def main( ) upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - logging.info(f"Sunroof Solar Potential By Census Tract process completed") + logging.info("Sunroof Solar Potential By Census Tract process completed") + def remove_nan(dt_str: str) -> int: if dt_str is None or len(str(dt_str)) == 0 or str(dt_str) == "nan": @@ -113,27 +114,26 @@ def remove_nan(dt_str: str) -> int: else: return int(dt_str) + def remove_nan_cols(df: pd.DataFrame) -> None: cols = { - "count_qualified", - "existing_installs_count", - "number_of_panels_n", - "number_of_panels_s", - "number_of_panels_e", - "number_of_panels_w", - "number_of_panels_f", - "number_of_panels_median", - "number_of_panels_total" - } + "count_qualified", + "existing_installs_count", + "number_of_panels_n", + "number_of_panels_s", + "number_of_panels_e", + "number_of_panels_w", + "number_of_panels_f", + "number_of_panels_median", + "number_of_panels_total", + } for col in cols: df[col] = df[col].apply(remove_nan) def rename_headers(df: pd.DataFrame) -> None: - header_names = { - "install_size_kw_buckets_json": "install_size_kw_buckets" - } + header_names = {"install_size_kw_buckets_json": "install_size_kw_buckets"} df = df.rename(columns=header_names, inplace=True) @@ -144,7 +144,9 @@ def save_to_new_file(df: pd.DataFrame, file_path) -> None: def download_file_gs(source_url: str, source_file: pathlib.Path) -> None: try: - process = Popen(['gsutil', 'cp', source_url, source_file], stdout=PIPE, stderr=PIPE) + process = Popen( + ["gsutil", "cp", source_url, source_file], stdout=PIPE, stderr=PIPE + ) process.communicate() except ValueError: logging.error(f"Couldn't download {source_url}: {ValueError}") diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py index a6d1540cf..576ba3714 100644 --- a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py +++ b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py @@ -13,11 +13,9 @@ # limitations under the License. # import modules -import datetime import logging import os import pathlib -import pdb from subprocess import PIPE, Popen import pandas as pd @@ -32,7 +30,7 @@ def main( target_gcs_path: str, ) -> None: - logging.info(f"Sunroof Solar Potential By Postal Code process started") + logging.info("Sunroof Solar Potential By Postal Code process started") logging.info("creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) @@ -52,7 +50,9 @@ def main( remove_nan_cols(df) logging.info("Transform: Adding geography field") - df["center_point"] = "POINT( " + df["lng_avg"].map(str) + " " + df["lat_avg"].map(str) + " )" + df["center_point"] = ( + "POINT( " + df["lng_avg"].map(str) + " " + df["lat_avg"].map(str) + " )" + ) logging.info("Transform: Reordering headers..") df = df[ @@ -87,7 +87,7 @@ def main( "yearly_sunlight_kwh_total", "install_size_kw_buckets", "carbon_offset_metric_tons", - "existing_installs_count" + "existing_installs_count", ] ] @@ -105,7 +105,8 @@ def main( ) upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - logging.info(f"Sunroof Solar Potential By Postal Code process completed") + logging.info("Sunroof Solar Potential By Postal Code process completed") + def remove_nan(dt_str: str) -> int: if dt_str is None or len(str(dt_str)) == 0 or str(dt_str) == "nan": @@ -113,27 +114,26 @@ def remove_nan(dt_str: str) -> int: else: return int(dt_str) + def remove_nan_cols(df: pd.DataFrame) -> None: cols = { - "count_qualified", - "existing_installs_count", - "number_of_panels_n", - "number_of_panels_s", - "number_of_panels_e", - "number_of_panels_w", - "number_of_panels_f", - "number_of_panels_median", - "number_of_panels_total" - } + "count_qualified", + "existing_installs_count", + "number_of_panels_n", + "number_of_panels_s", + "number_of_panels_e", + "number_of_panels_w", + "number_of_panels_f", + "number_of_panels_median", + "number_of_panels_total", + } for col in cols: df[col] = df[col].apply(remove_nan) def rename_headers(df: pd.DataFrame) -> None: - header_names = { - "install_size_kw_buckets_json": "install_size_kw_buckets" - } + header_names = {"install_size_kw_buckets_json": "install_size_kw_buckets"} df = df.rename(columns=header_names, inplace=True) @@ -144,7 +144,9 @@ def save_to_new_file(df: pd.DataFrame, file_path) -> None: def download_file_gs(source_url: str, source_file: pathlib.Path) -> None: try: - process = Popen(['gsutil', 'cp', source_url, source_file], stdout=PIPE, stderr=PIPE) + process = Popen( + ["gsutil", "cp", source_url, source_file], stdout=PIPE, stderr=PIPE + ) process.communicate() except ValueError: logging.error(f"Couldn't download {source_url}: {ValueError}") From 0333bf14c615ef15cfa7ade0063eddc39be2ceda Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 7 Oct 2021 20:29:07 +0000 Subject: [PATCH 3/6] fix: Merged pipelines to use one image. --- .../Dockerfile | 0 .../csv_transform.py | 114 ++++++++---- .../requirements.txt | 1 + .../requirements.txt | 3 - .../Dockerfile | 38 ---- .../csv_transform.py | 171 ------------------ datasets/sunroof/dataset.yaml | 36 +--- .../pipeline.yaml | 8 +- .../solar_potential_by_census_tract_dag.py | 6 +- .../pipeline.yaml | 10 +- .../solar_potential_by_postal_code_dag.py | 8 +- 11 files changed, 98 insertions(+), 297 deletions(-) rename datasets/sunroof/_images/{run_csv_transform_kub_solar_potential_by_census_tract => run_csv_transform_kub}/Dockerfile (100%) rename datasets/sunroof/_images/{run_csv_transform_kub_solar_potential_by_census_tract => run_csv_transform_kub}/csv_transform.py (57%) rename datasets/sunroof/_images/{run_csv_transform_kub_solar_potential_by_postal_code => run_csv_transform_kub}/requirements.txt (84%) delete mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/requirements.txt delete mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/Dockerfile delete mode 100644 datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/Dockerfile b/datasets/sunroof/_images/run_csv_transform_kub/Dockerfile similarity index 100% rename from datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/Dockerfile rename to datasets/sunroof/_images/run_csv_transform_kub/Dockerfile diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py b/datasets/sunroof/_images/run_csv_transform_kub/csv_transform.py similarity index 57% rename from datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py rename to datasets/sunroof/_images/run_csv_transform_kub/csv_transform.py index 49fc116d1..089782c4d 100644 --- a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/csv_transform.py +++ b/datasets/sunroof/_images/run_csv_transform_kub/csv_transform.py @@ -26,35 +26,83 @@ def main( source_url: str, source_file: pathlib.Path, target_file: pathlib.Path, + chunksize: str, target_gcs_bucket: str, target_gcs_path: str, ) -> None: - logging.info("Sunroof Solar Potential By Census Tract process started") + logging.info("Sunroof solar potential started") - logging.info("creating 'files' folder") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - - logging.info(f"Downloading file from {source_url} to {source_file}") download_file_gs(source_url, source_file) - logging.info(f"Opening file {source_file}") - df = pd.read_csv(source_file) + chunksz = int(chunksize) + + logging.info(f"Opening batch file {source_file}") + with pd.read_csv( + source_file, # path to main source file to load in batches + engine="python", + encoding="utf-8", + quotechar='"', # string separator, typically double-quotes + chunksize=chunksz, # size of batch data, in no. of records + sep=",", # data column separator, typically "," + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk(df, target_file_batch, target_file, (not chunk_number == 0)) + + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info("Sunroof solar potential process completed") + + +def process_chunk( + df: pd.DataFrame, target_file_batch: str, target_file: str, skip_header: bool +) -> None: + df = rename_headers(df) + df = remove_nan_cols(df) + df = generate_location(df) + df = reorder_headers(df) + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) - logging.info(f"Transformation Process Starting.. {source_file}") - logging.info(f"Transform: Renaming Headers.. {source_file}") - rename_headers(df) +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + data_file = open(batch_file_path, "r") + if truncate_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) - logging.info("Transform: Removing NULL text") - remove_nan_cols(df) - logging.info("Transform: Adding geography field") +def generate_location(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Generating location data") df["center_point"] = ( "POINT( " + df["lng_avg"].map(str) + " " + df["lat_avg"].map(str) + " )" ) - logging.info("Transform: Reordering headers..") + return df + + +def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Reordering headers..") df = df[ [ "region_name", @@ -88,34 +136,22 @@ def main( "install_size_kw_buckets", "carbon_offset_metric_tons", "existing_installs_count", + "center_point", ] ] - logging.info(f"Transformation Process complete .. {source_file}") - - logging.info(f"Saving to output file.. {target_file}") - - try: - save_to_new_file(df, file_path=str(target_file)) - except Exception as e: - logging.error(f"Error saving output file: {e}.") - - logging.info( - f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" - ) - upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - - logging.info("Sunroof Solar Potential By Census Tract process completed") + return df def remove_nan(dt_str: str) -> int: - if dt_str is None or len(str(dt_str)) == 0 or str(dt_str) == "nan": + if not dt_str or str(dt_str) == "nan": return int() else: return int(dt_str) -def remove_nan_cols(df: pd.DataFrame) -> None: +def remove_nan_cols(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Resolve NaN data") cols = { "count_qualified", "existing_installs_count", @@ -131,11 +167,15 @@ def remove_nan_cols(df: pd.DataFrame) -> None: for col in cols: df[col] = df[col].apply(remove_nan) + return df -def rename_headers(df: pd.DataFrame) -> None: + +def rename_headers(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Renaming columns") header_names = {"install_size_kw_buckets_json": "install_size_kw_buckets"} + df = df.rename(columns=header_names) - df = df.rename(columns=header_names, inplace=True) + return df def save_to_new_file(df: pd.DataFrame, file_path) -> None: @@ -143,13 +183,8 @@ def save_to_new_file(df: pd.DataFrame, file_path) -> None: def download_file_gs(source_url: str, source_file: pathlib.Path) -> None: - try: - process = Popen( - ["gsutil", "cp", source_url, source_file], stdout=PIPE, stderr=PIPE - ) - process.communicate() - except ValueError: - logging.error(f"Couldn't download {source_url}: {ValueError}") + process = Popen(["gsutil", "cp", source_url, source_file], stdout=PIPE, stderr=PIPE) + process.communicate() def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: @@ -166,6 +201,7 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) source_url=os.environ["SOURCE_URL"], source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], target_gcs_path=os.environ["TARGET_GCS_PATH"], ) diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/requirements.txt b/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt similarity index 84% rename from datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/requirements.txt rename to datasets/sunroof/_images/run_csv_transform_kub/requirements.txt index f36704793..57148d55b 100644 --- a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/requirements.txt +++ b/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt @@ -1,3 +1,4 @@ requests pandas google-cloud-storage +gsutil diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/requirements.txt b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/requirements.txt deleted file mode 100644 index f36704793..000000000 --- a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_census_tract/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -requests -pandas -google-cloud-storage diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/Dockerfile b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/Dockerfile deleted file mode 100644 index 85af90570..000000000 --- a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/Dockerfile +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# The base image for this build -# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim -FROM python:3.8 - -# Allow statements and log messages to appear in Cloud logs -ENV PYTHONUNBUFFERED True - -# Copy the requirements file into the image -COPY requirements.txt ./ - -# Install the packages specified in the requirements file -RUN python3 -m pip install --no-cache-dir -r requirements.txt - -# The WORKDIR instruction sets the working directory for any RUN, CMD, -# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. -# If the WORKDIR doesn’t exist, it will be created even if it’s not used in -# any subsequent Dockerfile instruction -WORKDIR /custom - -# Copy the specific data processing script/s in the image under /custom/* -COPY ./csv_transform.py . - -# Command to run the data processing script when the container is run -CMD ["python3", "csv_transform.py"] diff --git a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py b/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py deleted file mode 100644 index 576ba3714..000000000 --- a/datasets/sunroof/_images/run_csv_transform_kub_solar_potential_by_postal_code/csv_transform.py +++ /dev/null @@ -1,171 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# import modules -import logging -import os -import pathlib -from subprocess import PIPE, Popen - -import pandas as pd -from google.cloud import storage - - -def main( - source_url: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - target_gcs_bucket: str, - target_gcs_path: str, -) -> None: - - logging.info("Sunroof Solar Potential By Postal Code process started") - - logging.info("creating 'files' folder") - pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - - logging.info(f"Downloading file from {source_url} to {source_file}") - download_file_gs(source_url, source_file) - - logging.info(f"Opening file {source_file}") - df = pd.read_csv(source_file) - - logging.info(f"Transformation Process Starting.. {source_file}") - - logging.info(f"Transform: Renaming Headers.. {source_file}") - rename_headers(df) - - logging.info("Transform: Removing NULL text") - remove_nan_cols(df) - - logging.info("Transform: Adding geography field") - df["center_point"] = ( - "POINT( " + df["lng_avg"].map(str) + " " + df["lat_avg"].map(str) + " )" - ) - - logging.info("Transform: Reordering headers..") - df = df[ - [ - "region_name", - "state_name", - "lat_max", - "lat_min", - "lng_max", - "lng_min", - "lat_avg", - "lng_avg", - "yearly_sunlight_kwh_kw_threshold_avg", - "count_qualified", - "percent_covered", - "percent_qualified", - "number_of_panels_n", - "number_of_panels_s", - "number_of_panels_e", - "number_of_panels_w", - "number_of_panels_f", - "number_of_panels_median", - "number_of_panels_total", - "kw_median", - "kw_total", - "yearly_sunlight_kwh_n", - "yearly_sunlight_kwh_s", - "yearly_sunlight_kwh_e", - "yearly_sunlight_kwh_w", - "yearly_sunlight_kwh_f", - "yearly_sunlight_kwh_median", - "yearly_sunlight_kwh_total", - "install_size_kw_buckets", - "carbon_offset_metric_tons", - "existing_installs_count", - ] - ] - - logging.info(f"Transformation Process complete .. {source_file}") - - logging.info(f"Saving to output file.. {target_file}") - - try: - save_to_new_file(df, file_path=str(target_file)) - except Exception as e: - logging.error(f"Error saving output file: {e}.") - - logging.info( - f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" - ) - upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - - logging.info("Sunroof Solar Potential By Postal Code process completed") - - -def remove_nan(dt_str: str) -> int: - if dt_str is None or len(str(dt_str)) == 0 or str(dt_str) == "nan": - return int() - else: - return int(dt_str) - - -def remove_nan_cols(df: pd.DataFrame) -> None: - cols = { - "count_qualified", - "existing_installs_count", - "number_of_panels_n", - "number_of_panels_s", - "number_of_panels_e", - "number_of_panels_w", - "number_of_panels_f", - "number_of_panels_median", - "number_of_panels_total", - } - - for col in cols: - df[col] = df[col].apply(remove_nan) - - -def rename_headers(df: pd.DataFrame) -> None: - header_names = {"install_size_kw_buckets_json": "install_size_kw_buckets"} - - df = df.rename(columns=header_names, inplace=True) - - -def save_to_new_file(df: pd.DataFrame, file_path) -> None: - df.to_csv(file_path, index=False) - - -def download_file_gs(source_url: str, source_file: pathlib.Path) -> None: - try: - process = Popen( - ["gsutil", "cp", source_url, source_file], stdout=PIPE, stderr=PIPE - ) - process.communicate() - except ValueError: - logging.error(f"Couldn't download {source_url}: {ValueError}") - - -def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: - storage_client = storage.Client() - bucket = storage_client.bucket(gcs_bucket) - blob = bucket.blob(gcs_path) - blob.upload_from_filename(file_path) - - -if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) - - main( - source_url=os.environ["SOURCE_URL"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - ) diff --git a/datasets/sunroof/dataset.yaml b/datasets/sunroof/dataset.yaml index f1a20a80e..58de08fae 100644 --- a/datasets/sunroof/dataset.yaml +++ b/datasets/sunroof/dataset.yaml @@ -13,46 +13,14 @@ # limitations under the License. dataset: - # The `dataset` block includes properties for your dataset that will be shown - # to users of your data on the Google Cloud website. - - # Must be exactly the same name as the folder name your dataset.yaml is in. name: sunroof - - # A friendly, human-readable name of the dataset - friendly_name: #friendlyname - - # A short, descriptive summary of the dataset. - description: #~friendlydescription - - # A list of sources the dataset is derived from, using the YAML list syntax. + friendly_name: sunroof + description: sunroof dataset_sources: ~ - - # A list of terms and conditions that users of the dataset should agree on, - # using the YAML list syntax. terms_of_use: ~ resources: - # A list of Google Cloud resources needed by your dataset. In principle, all - # pipelines under a dataset should be able to share these resources. - # - # The currently supported resources are shown below. Use only the resources - # you need, and delete the rest as needed by your pipeline. - # - # We will keep adding to the list below to support more Google Cloud resources - # over time. If a resource you need isn't supported, please file an issue on - # the repository. - - type: bigquery_dataset - # Google BigQuery dataset to namespace all tables managed by this folder - # - # Required Properties: - # dataset_id - # - # Optional Properties: - # friendly_name (A user-friendly name of the dataset) - # description (A user-friendly description of the dataset) - # location (The geographic location where the dataset should reside) dataset_id: sunroof description: sunroof diff --git a/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml b/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml index df3dc62a2..646fcdf29 100644 --- a/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml +++ b/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml @@ -43,16 +43,17 @@ dag: name: "solar_potential_by_census_tract" namespace: "default" image_pull_policy: "Always" - image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub_solar_potential_by_census_tract }}" + image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: "gs://project-sunroof/csv/latest/project-sunroof-census_tract.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "750000" TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" TARGET_GCS_PATH: "data/sunroof/solar_potential_by_census_tract/data_output.csv" resources: - limit_memory: "4G" - limit_cpu: "2" + limit_memory: "8G" + limit_cpu: "3" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -64,6 +65,7 @@ dag: source_format: "CSV" destination_project_dataset_table: "sunroof.solar_potential_by_census_tract" skip_leading_rows: 1 + allow_quoted_newlines: True write_disposition: "WRITE_TRUNCATE" schema_fields: - "name": "region_name" diff --git a/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py b/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py index 74f7e8859..36c773cd1 100644 --- a/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py +++ b/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py @@ -39,15 +39,16 @@ name="solar_potential_by_census_tract", namespace="default", image_pull_policy="Always", - image="{{ var.json.sunroof.container_registry.run_csv_transform_kub_solar_potential_by_census_tract }}", + image="{{ var.json.sunroof.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "gs://project-sunroof/csv/latest/project-sunroof-census_tract.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "750000", "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", "TARGET_GCS_PATH": "data/sunroof/solar_potential_by_census_tract/data_output.csv", }, - resources={"limit_memory": "4G", "limit_cpu": "2"}, + resources={"limit_memory": "8G", "limit_cpu": "3"}, ) # Task to load CSV data to a BigQuery table @@ -58,6 +59,7 @@ source_format="CSV", destination_project_dataset_table="sunroof.solar_potential_by_census_tract", skip_leading_rows=1, + allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", schema_fields=[ { diff --git a/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml b/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml index e3cfc84ac..8630cd165 100644 --- a/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml +++ b/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml @@ -43,16 +43,17 @@ dag: name: "solar_potential_by_postal_code" namespace: "default" image_pull_policy: "Always" - image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub_solar_potential_by_postal_code }}" + image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: "gs://project-sunroof/csv/latest/project-sunroof-postal_code.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "750000" TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" TARGET_GCS_PATH: "data/sunroof/solar_potential_by_postal_code/data_output.csv" resources: - limit_memory: "4G" - limit_cpu: "2" + limit_memory: "8G" + limit_cpu: "3" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -64,6 +65,7 @@ dag: source_format: "CSV" destination_project_dataset_table: "sunroof.solar_potential_by_postal_code" skip_leading_rows: 1 + allow_quoted_newlines: True write_disposition: "WRITE_TRUNCATE" schema_fields: - "name": "region_name" @@ -178,7 +180,7 @@ dag: "type": "FLOAT" "description": "total solar energy generation potential for all roof space in that region" "mode": "NULLABLE" - - "name": "install_size_kw_buckets" + - "name": "install_size_kw_buckets_json" "type": "STRING" "description": "# of buildings with potential for various installation size buckets. Format is a JSON array, where each element is a tuple containing (1) lower bound of bucket, in kW, and (2) number of buildings in that bucket." "mode": "NULLABLE" diff --git a/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py b/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py index 3fd2b247f..0fc2f7f70 100644 --- a/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py +++ b/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py @@ -39,15 +39,16 @@ name="solar_potential_by_postal_code", namespace="default", image_pull_policy="Always", - image="{{ var.json.sunroof.container_registry.run_csv_transform_kub_solar_potential_by_postal_code }}", + image="{{ var.json.sunroof.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "gs://project-sunroof/csv/latest/project-sunroof-postal_code.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "750000", "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", "TARGET_GCS_PATH": "data/sunroof/solar_potential_by_postal_code/data_output.csv", }, - resources={"limit_memory": "4G", "limit_cpu": "2"}, + resources={"limit_memory": "8G", "limit_cpu": "3"}, ) # Task to load CSV data to a BigQuery table @@ -58,6 +59,7 @@ source_format="CSV", destination_project_dataset_table="sunroof.solar_potential_by_postal_code", skip_leading_rows=1, + allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", schema_fields=[ { @@ -229,7 +231,7 @@ "mode": "NULLABLE", }, { - "name": "install_size_kw_buckets", + "name": "install_size_kw_buckets_json", "type": "STRING", "description": "# of buildings with potential for various installation size buckets. Format is a JSON array, where each element is a tuple containing (1) lower bound of bucket, in kW, and (2) number of buildings in that bucket.", "mode": "NULLABLE", From cbaa1047b53c8dafa0411f93dda49bf1083cfb75 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 7 Oct 2021 20:36:23 +0000 Subject: [PATCH 4/6] fix: Removed requests from requirements.txt --- datasets/sunroof/_images/run_csv_transform_kub/requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt b/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt index 57148d55b..7908505a4 100644 --- a/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt +++ b/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt @@ -1,4 +1,3 @@ -requests pandas google-cloud-storage gsutil From 7b2133daf42e881d6eb7a5b56f5bc2cd2b618dce Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Fri, 15 Oct 2021 18:05:55 +0000 Subject: [PATCH 5/6] fix: Resolved points raised in PR --- .../_images/run_csv_transform_kub/Dockerfile | 17 ----------------- .../run_csv_transform_kub/csv_transform.py | 6 ++---- .../run_csv_transform_kub/requirements.txt | 1 - .../pipeline.yaml | 6 +++--- .../solar_potential_by_census_tract_dag.py | 6 +++--- .../pipeline.yaml | 6 +++--- .../solar_potential_by_postal_code_dag.py | 6 +++--- datasets/sunroof/sunroof_variables.json | 14 -------------- 8 files changed, 14 insertions(+), 48 deletions(-) delete mode 100644 datasets/sunroof/sunroof_variables.json diff --git a/datasets/sunroof/_images/run_csv_transform_kub/Dockerfile b/datasets/sunroof/_images/run_csv_transform_kub/Dockerfile index 85af90570..748bc3bec 100644 --- a/datasets/sunroof/_images/run_csv_transform_kub/Dockerfile +++ b/datasets/sunroof/_images/run_csv_transform_kub/Dockerfile @@ -12,27 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -# The base image for this build -# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim FROM python:3.8 - -# Allow statements and log messages to appear in Cloud logs ENV PYTHONUNBUFFERED True - -# Copy the requirements file into the image COPY requirements.txt ./ - -# Install the packages specified in the requirements file RUN python3 -m pip install --no-cache-dir -r requirements.txt - -# The WORKDIR instruction sets the working directory for any RUN, CMD, -# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. -# If the WORKDIR doesn’t exist, it will be created even if it’s not used in -# any subsequent Dockerfile instruction WORKDIR /custom - -# Copy the specific data processing script/s in the image under /custom/* COPY ./csv_transform.py . - -# Command to run the data processing script when the container is run CMD ["python3", "csv_transform.py"] diff --git a/datasets/sunroof/_images/run_csv_transform_kub/csv_transform.py b/datasets/sunroof/_images/run_csv_transform_kub/csv_transform.py index 089782c4d..3b0be4884 100644 --- a/datasets/sunroof/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/sunroof/_images/run_csv_transform_kub/csv_transform.py @@ -12,11 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -# import modules import logging import os import pathlib -from subprocess import PIPE, Popen import pandas as pd from google.cloud import storage @@ -183,8 +181,8 @@ def save_to_new_file(df: pd.DataFrame, file_path) -> None: def download_file_gs(source_url: str, source_file: pathlib.Path) -> None: - process = Popen(["gsutil", "cp", source_url, source_file], stdout=PIPE, stderr=PIPE) - process.communicate() + with open(source_file, "wb+") as file_obj: + storage.Client().download_blob_to_file(source_url, file_obj) def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: diff --git a/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt b/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt index 7908505a4..a13f29317 100644 --- a/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt +++ b/datasets/sunroof/_images/run_csv_transform_kub/requirements.txt @@ -1,3 +1,2 @@ pandas google-cloud-storage -gsutil diff --git a/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml b/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml index 646fcdf29..aeee2ce8f 100644 --- a/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml +++ b/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml @@ -43,13 +43,13 @@ dag: name: "solar_potential_by_census_tract" namespace: "default" image_pull_policy: "Always" - image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub }}" + image: "{{ var.value.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: "gs://project-sunroof/csv/latest/project-sunroof-census_tract.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" CHUNKSIZE: "750000" - TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/sunroof/solar_potential_by_census_tract/data_output.csv" resources: limit_memory: "8G" @@ -60,7 +60,7 @@ dag: args: task_id: "load_to_bq" - bucket: "{{ var.json.shared.composer_bucket }}" + bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/sunroof/solar_potential_by_census_tract/data_output.csv"] source_format: "CSV" destination_project_dataset_table: "sunroof.solar_potential_by_census_tract" diff --git a/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py b/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py index 36c773cd1..55b7d87bd 100644 --- a/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py +++ b/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py @@ -39,13 +39,13 @@ name="solar_potential_by_census_tract", namespace="default", image_pull_policy="Always", - image="{{ var.json.sunroof.container_registry.run_csv_transform_kub }}", + image="{{ var.value.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "gs://project-sunroof/csv/latest/project-sunroof-census_tract.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", "CHUNKSIZE": "750000", - "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/sunroof/solar_potential_by_census_tract/data_output.csv", }, resources={"limit_memory": "8G", "limit_cpu": "3"}, @@ -54,7 +54,7 @@ # Task to load CSV data to a BigQuery table load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", - bucket="{{ var.json.shared.composer_bucket }}", + bucket="{{ var.value.composer_bucket }}", source_objects=["data/sunroof/solar_potential_by_census_tract/data_output.csv"], source_format="CSV", destination_project_dataset_table="sunroof.solar_potential_by_census_tract", diff --git a/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml b/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml index 8630cd165..4290a8c68 100644 --- a/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml +++ b/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml @@ -43,13 +43,13 @@ dag: name: "solar_potential_by_postal_code" namespace: "default" image_pull_policy: "Always" - image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub }}" + image: "{{ var.value.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: "gs://project-sunroof/csv/latest/project-sunroof-postal_code.csv" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" CHUNKSIZE: "750000" - TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/sunroof/solar_potential_by_postal_code/data_output.csv" resources: limit_memory: "8G" @@ -60,7 +60,7 @@ dag: args: task_id: "load_to_bq" - bucket: "{{ var.json.shared.composer_bucket }}" + bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/sunroof/solar_potential_by_postal_code/data_output.csv"] source_format: "CSV" destination_project_dataset_table: "sunroof.solar_potential_by_postal_code" diff --git a/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py b/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py index 0fc2f7f70..bb1871bb0 100644 --- a/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py +++ b/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py @@ -39,13 +39,13 @@ name="solar_potential_by_postal_code", namespace="default", image_pull_policy="Always", - image="{{ var.json.sunroof.container_registry.run_csv_transform_kub }}", + image="{{ var.value.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "gs://project-sunroof/csv/latest/project-sunroof-postal_code.csv", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", "CHUNKSIZE": "750000", - "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/sunroof/solar_potential_by_postal_code/data_output.csv", }, resources={"limit_memory": "8G", "limit_cpu": "3"}, @@ -54,7 +54,7 @@ # Task to load CSV data to a BigQuery table load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", - bucket="{{ var.json.shared.composer_bucket }}", + bucket="{{ var.value.composer_bucket }}", source_objects=["data/sunroof/solar_potential_by_postal_code/data_output.csv"], source_format="CSV", destination_project_dataset_table="sunroof.solar_potential_by_postal_code", diff --git a/datasets/sunroof/sunroof_variables.json b/datasets/sunroof/sunroof_variables.json deleted file mode 100644 index 31bd0b10d..000000000 --- a/datasets/sunroof/sunroof_variables.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "shared": { - "composer_bucket": "us-central1-pipelines-dev-fc245d58-bucket", - "airflow_data_folder": "/home/airflow/gcs/data", - "airflow_home": "/home/airflow/gcs" - }, - "sunroof": { - "destination_bucket": "public-datasets-dev-sunroof-project", - "container_registry": { - "run_csv_transform_kub_solar_potential_by_census_tract": "gcr.io/bigquery-public-data-dev/sunroof__run_csv_transform_kub_solar_potential_by_census_tract:latest", - "run_csv_transform_kub_solar_potential_by_postal_code": "gcr.io/bigquery-public-data-dev/sunroof__run_csv_transform_kub_solar_potential_by_postal_code:latest" - } - } -} From f8df7c70026ddf8c8899ea7660209c9e1a52074c Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Thu, 21 Oct 2021 21:30:52 +0000 Subject: [PATCH 6/6] fix: Added affinity code to pipeline. Tested in AF. --- .../pipeline.yaml | 11 ++++++++++- .../solar_potential_by_census_tract_dag.py | 19 ++++++++++++++++++- .../pipeline.yaml | 11 ++++++++++- .../solar_potential_by_postal_code_dag.py | 19 ++++++++++++++++++- 4 files changed, 56 insertions(+), 4 deletions(-) diff --git a/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml b/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml index aeee2ce8f..2a552e2d3 100644 --- a/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml +++ b/datasets/sunroof/solar_potential_by_census_tract/pipeline.yaml @@ -42,8 +42,17 @@ dag: task_id: "transform_csv" name: "solar_potential_by_census_tract" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image_pull_policy: "Always" - image: "{{ var.value.container_registry.run_csv_transform_kub }}" + image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: "gs://project-sunroof/csv/latest/project-sunroof-census_tract.csv" SOURCE_FILE: "files/data.csv" diff --git a/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py b/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py index 55b7d87bd..7d211ef49 100644 --- a/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py +++ b/datasets/sunroof/solar_potential_by_census_tract/solar_potential_by_census_tract_dag.py @@ -38,8 +38,25 @@ task_id="transform_csv", name="solar_potential_by_census_tract", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image_pull_policy="Always", - image="{{ var.value.container_registry.run_csv_transform_kub }}", + image="{{ var.json.sunroof.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "gs://project-sunroof/csv/latest/project-sunroof-census_tract.csv", "SOURCE_FILE": "files/data.csv", diff --git a/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml b/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml index 4290a8c68..fcf62e0a9 100644 --- a/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml +++ b/datasets/sunroof/solar_potential_by_postal_code/pipeline.yaml @@ -42,8 +42,17 @@ dag: task_id: "transform_csv" name: "solar_potential_by_postal_code" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image_pull_policy: "Always" - image: "{{ var.value.container_registry.run_csv_transform_kub }}" + image: "{{ var.json.sunroof.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: "gs://project-sunroof/csv/latest/project-sunroof-postal_code.csv" SOURCE_FILE: "files/data.csv" diff --git a/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py b/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py index bb1871bb0..9fb1625b5 100644 --- a/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py +++ b/datasets/sunroof/solar_potential_by_postal_code/solar_potential_by_postal_code_dag.py @@ -38,8 +38,25 @@ task_id="transform_csv", name="solar_potential_by_postal_code", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image_pull_policy="Always", - image="{{ var.value.container_registry.run_csv_transform_kub }}", + image="{{ var.json.sunroof.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "gs://project-sunroof/csv/latest/project-sunroof-postal_code.csv", "SOURCE_FILE": "files/data.csv",