From 27a2c8e356cac8c51aa7b65fae508ad1d7202e9e Mon Sep 17 00:00:00 2001 From: vasuc-google <84339146+vasuc-google@users.noreply.github.com> Date: Wed, 24 Nov 2021 20:58:27 +0530 Subject: [PATCH] feat: Onboard Cloud Storage Geo Index Dataset (#219) --- .../_images/run_csv_transform_kub/Dockerfile | 37 +++++ .../run_csv_transform_kub/csv_transform.py | 137 ++++++++++++++++++ .../run_csv_transform_kub/requirements.txt | 3 + .../cloud_storage_geo_index_dataset.tf | 26 ++++ .../_terraform/landsat_index_pipeline.tf | 39 +++++ .../_terraform/provider.tf | 28 ++++ .../_terraform/sentinel_2_index_pipeline.tf | 39 +++++ .../_terraform/variables.tf | 23 +++ datasets/cloud_storage_geo_index/dataset.yaml | 25 ++++ .../landsat_index/landsat_index_dag.py | 105 ++++++++++++++ .../landsat_index/pipeline.yaml | 137 ++++++++++++++++++ .../sentinel_2_index/pipeline.yaml | 125 ++++++++++++++++ .../sentinel_2_index/sentinel_2_index_dag.py | 103 +++++++++++++ 13 files changed, 827 insertions(+) create mode 100644 datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/Dockerfile create mode 100644 datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/cloud_storage_geo_index/_terraform/cloud_storage_geo_index_dataset.tf create mode 100644 datasets/cloud_storage_geo_index/_terraform/landsat_index_pipeline.tf create mode 100644 datasets/cloud_storage_geo_index/_terraform/provider.tf create mode 100644 datasets/cloud_storage_geo_index/_terraform/sentinel_2_index_pipeline.tf create mode 100644 datasets/cloud_storage_geo_index/_terraform/variables.tf create mode 100644 datasets/cloud_storage_geo_index/dataset.yaml create mode 100644 datasets/cloud_storage_geo_index/landsat_index/landsat_index_dag.py create mode 100644 datasets/cloud_storage_geo_index/landsat_index/pipeline.yaml create mode 100644 datasets/cloud_storage_geo_index/sentinel_2_index/pipeline.yaml create mode 100644 datasets/cloud_storage_geo_index/sentinel_2_index/sentinel_2_index_dag.py diff --git a/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/Dockerfile b/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..7265a1b71 --- /dev/null +++ b/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,37 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/csv_transform.py b/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..0489eacbc --- /dev/null +++ b/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,137 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import pathlib +import subprocess +import typing + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + headers: typing.List[str], + rename_mappings: dict, + pipeline_name: str, +) -> None: + + logging.info("Creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file {source_url}") + download_file(source_url, source_file) + chunksz = int(chunksize) + + logging.info(f"Reading csv file {source_url}") + with pd.read_csv( + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + compression="gzip", + chunksize=chunksz, + ) as reader: + for chunk_number, chunk in enumerate(reader): + logging.info(f"Processing batch {chunk_number}") + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + + logging.info(" Renaming headers...") + rename_headers(df, rename_mappings) + logging.info("Transform: Reordering headers..") + df = df[headers] + if pipeline_name == "sentinel_2_index": + df["total_size"] = df["total_size"].astype("Int64") + + process_chunk(df, target_file_batch) + + logging.info(f"Appending batch {chunk_number} to {target_file}") + if chunk_number == 0: + subprocess.run(["cp", target_file_batch, target_file]) + else: + subprocess.check_call([f"sed -i '1d' {target_file_batch}"], shell=True) + subprocess.check_call( + [f"cat {target_file_batch} >> {target_file}"], shell=True + ) + subprocess.run(["rm", target_file_batch]) + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + +def process_chunk(df: pd.DataFrame, target_file_batch: str) -> None: + + logging.info(f"Saving to output file.. {target_file_batch}") + try: + save_to_new_file(df, file_path=str(target_file_batch)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + logging.info("..Done!") + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df = df.rename(columns=rename_mappings, inplace=True) + + +def save_to_new_file(df: pd.DataFrame, file_path) -> None: + df.to_csv(file_path, index=False) + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"Downloading {source_url} into {source_file}") + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + headers=json.loads(os.environ["CSV_HEADERS"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + pipeline_name=os.environ["PIPELINE_NAME"], + ) diff --git a/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/requirements.txt b/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/cloud_storage_geo_index/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/cloud_storage_geo_index/_terraform/cloud_storage_geo_index_dataset.tf b/datasets/cloud_storage_geo_index/_terraform/cloud_storage_geo_index_dataset.tf new file mode 100644 index 000000000..af01e4938 --- /dev/null +++ b/datasets/cloud_storage_geo_index/_terraform/cloud_storage_geo_index_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "cloud_storage_geo_index" { + dataset_id = "cloud_storage_geo_index" + project = var.project_id + description = "cloud_storage_geo_index data" +} + +output "bigquery_dataset-cloud_storage_geo_index-dataset_id" { + value = google_bigquery_dataset.cloud_storage_geo_index.dataset_id +} diff --git a/datasets/cloud_storage_geo_index/_terraform/landsat_index_pipeline.tf b/datasets/cloud_storage_geo_index/_terraform/landsat_index_pipeline.tf new file mode 100644 index 000000000..df73d94d1 --- /dev/null +++ b/datasets/cloud_storage_geo_index/_terraform/landsat_index_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "landsat_index" { + project = var.project_id + dataset_id = "cloud_storage_geo_index" + table_id = "landsat_index" + + description = "landsat_index dataset" + + + + + depends_on = [ + google_bigquery_dataset.cloud_storage_geo_index + ] +} + +output "bigquery_table-landsat_index-table_id" { + value = google_bigquery_table.landsat_index.table_id +} + +output "bigquery_table-landsat_index-id" { + value = google_bigquery_table.landsat_index.id +} diff --git a/datasets/cloud_storage_geo_index/_terraform/provider.tf b/datasets/cloud_storage_geo_index/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/cloud_storage_geo_index/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/cloud_storage_geo_index/_terraform/sentinel_2_index_pipeline.tf b/datasets/cloud_storage_geo_index/_terraform/sentinel_2_index_pipeline.tf new file mode 100644 index 000000000..8daea1ff4 --- /dev/null +++ b/datasets/cloud_storage_geo_index/_terraform/sentinel_2_index_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "sentinel_2_index" { + project = var.project_id + dataset_id = "cloud_storage_geo_index" + table_id = "sentinel_2_index" + + description = "sentinel_2_index dataset" + + + + + depends_on = [ + google_bigquery_dataset.cloud_storage_geo_index + ] +} + +output "bigquery_table-sentinel_2_index-table_id" { + value = google_bigquery_table.sentinel_2_index.table_id +} + +output "bigquery_table-sentinel_2_index-id" { + value = google_bigquery_table.sentinel_2_index.id +} diff --git a/datasets/cloud_storage_geo_index/_terraform/variables.tf b/datasets/cloud_storage_geo_index/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/cloud_storage_geo_index/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/cloud_storage_geo_index/dataset.yaml b/datasets/cloud_storage_geo_index/dataset.yaml new file mode 100644 index 000000000..6a11d8cc8 --- /dev/null +++ b/datasets/cloud_storage_geo_index/dataset.yaml @@ -0,0 +1,25 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: cloud_storage_geo_index + friendly_name: Cloud_Storage_Geo_Index + description: landsat_index datasets + dataset_sources: ~ + terms_of_use: ~ + +resources: + - type: bigquery_dataset + dataset_id: cloud_storage_geo_index + description: Cloud_Storage_Geo_Index Data diff --git a/datasets/cloud_storage_geo_index/landsat_index/landsat_index_dag.py b/datasets/cloud_storage_geo_index/landsat_index/landsat_index_dag.py new file mode 100644 index 000000000..e929c6ea7 --- /dev/null +++ b/datasets/cloud_storage_geo_index/landsat_index/landsat_index_dag.py @@ -0,0 +1,105 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="cloud_storage_geo_index.landsat_index", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + landsat_index_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="landsat_index_transform_csv", + startup_timeout_seconds=600, + name="landsat_index", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cloud_storage_geo_index.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/gcp-public-data-landsat/index.csv.gz", + "SOURCE_FILE": "files/data.csv.gz", + "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "1000000", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cloud_storage_geo_index/landsat_index/data_output.csv", + "PIPELINE_NAME": "landsat_index", + "CSV_HEADERS": '["scene_id","product_id","spacecraft_id","sensor_id","date_acquired","sensing_time","collection_number","collection_category","data_type","wrs_path","wrs_row","cloud_cover","north_lat","south_lat","west_lon","east_lon","total_size","base_url"]', + "RENAME_MAPPINGS": '{"SCENE_ID" : "scene_id","SPACECRAFT_ID" : "spacecraft_id","SENSOR_ID" : "sensor_id","DATE_ACQUIRED" : "date_acquired","COLLECTION_NUMBER" : "collection_number","COLLECTION_CATEGORY" : "collection_category","DATA_TYPE" : "data_type","WRS_PATH" : "wrs_path","WRS_ROW" : "wrs_row","CLOUD_COVER" : "cloud_cover","NORTH_LAT" : "north_lat","SOUTH_LAT" : "south_lat","WEST_LON" : "west_lon","EAST_LON" : "east_lon","TOTAL_SIZE" : "total_size","BASE_URL" : "base_url","PRODUCT_ID" : "product_id","SENSING_TIME" : "sensing_time"}', + }, + resources={"limit_memory": "8G", "limit_cpu": "3"}, + ) + + # Task to load CSV data to a BigQuery table + load_landsat_index_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_landsat_index_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cloud_storage_geo_index/landsat_index/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cloud_storage_geo_index.landsat_index", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "scene_id", "type": "STRING", "mode": "required"}, + {"name": "product_id", "type": "STRING", "mode": "NULLABLE"}, + {"name": "spacecraft_id", "type": "STRING", "mode": "NULLABLE"}, + {"name": "sensor_id", "type": "STRING", "mode": "NULLABLE"}, + {"name": "date_acquired", "type": "DATE", "mode": "NULLABLE"}, + {"name": "sensing_time", "type": "TIMESTAMP", "mode": "NULLABLE"}, + {"name": "collection_number", "type": "STRING", "mode": "NULLABLE"}, + {"name": "collection_category", "type": "STRING", "mode": "NULLABLE"}, + {"name": "data_type", "type": "STRING", "mode": "NULLABLE"}, + {"name": "wrs_path", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "wrs_row", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "cloud_cover", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "north_lat", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "south_lat", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "west_lon", "type": "FLOAt", "mode": "NULLABLE"}, + {"name": "east_lon", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "total_size", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "base_url", "type": "STRING", "mode": "NULLABLE"}, + ], + ) + + landsat_index_transform_csv >> load_landsat_index_to_bq diff --git a/datasets/cloud_storage_geo_index/landsat_index/pipeline.yaml b/datasets/cloud_storage_geo_index/landsat_index/pipeline.yaml new file mode 100644 index 000000000..eb61636cb --- /dev/null +++ b/datasets/cloud_storage_geo_index/landsat_index/pipeline.yaml @@ -0,0 +1,137 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + - type: bigquery_table + table_id: landsat_index + description: "Landsat_Index Dataset" + +dag: + airflow_version: 2 + initialize: + dag_id: landsat_index + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "landsat_index_transform_csv" + startup_timeout_seconds: 600 + name: "landsat_index" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.cloud_storage_geo_index.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://storage.googleapis.com/gcp-public-data-landsat/index.csv.gz" + SOURCE_FILE: "files/data.csv.gz" + TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "1000000" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cloud_storage_geo_index/landsat_index/data_output.csv" + PIPELINE_NAME: "landsat_index" + CSV_HEADERS: >- + ["scene_id","product_id","spacecraft_id","sensor_id","date_acquired","sensing_time","collection_number","collection_category","data_type","wrs_path","wrs_row","cloud_cover","north_lat","south_lat","west_lon","east_lon","total_size","base_url"] + RENAME_MAPPINGS: >- + {"SCENE_ID" : "scene_id","SPACECRAFT_ID" : "spacecraft_id","SENSOR_ID" : "sensor_id","DATE_ACQUIRED" : "date_acquired","COLLECTION_NUMBER" : "collection_number","COLLECTION_CATEGORY" : "collection_category","DATA_TYPE" : "data_type","WRS_PATH" : "wrs_path","WRS_ROW" : "wrs_row","CLOUD_COVER" : "cloud_cover","NORTH_LAT" : "north_lat","SOUTH_LAT" : "south_lat","WEST_LON" : "west_lon","EAST_LON" : "east_lon","TOTAL_SIZE" : "total_size","BASE_URL" : "base_url","PRODUCT_ID" : "product_id","SENSING_TIME" : "sensing_time"} + resources: + limit_memory: "4G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_landsat_index_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/cloud_storage_geo_index/landsat_index/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cloud_storage_geo_index.landsat_index" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "scene_id" + type: "STRING" + mode: "required" + - name: "product_id" + type: "STRING" + mode: "NULLABLE" + - name: "spacecraft_id" + type: "STRING" + mode: "NULLABLE" + - name: "sensor_id" + type: "STRING" + mode: "NULLABLE" + - name: "date_acquired" + type: "DATE" + mode: "NULLABLE" + - name: "sensing_time" + type: "TIMESTAMP" + mode: "NULLABLE" + - name: "collection_number" + type: "STRING" + mode: "NULLABLE" + - name: "collection_category" + type: "STRING" + mode: "NULLABLE" + - name: "data_type" + type: "STRING" + mode: "NULLABLE" + - name: "wrs_path" + type: "INTEGER" + mode: "NULLABLE" + - name: "wrs_row" + type: "INTEGER" + mode: "NULLABLE" + - name: "cloud_cover" + type: "FLOAT" + mode: "NULLABLE" + - name: "north_lat" + type: "FLOAT" + mode: "NULLABLE" + - name: "south_lat" + type: "FLOAT" + mode: "NULLABLE" + - name: "west_lon" + type: "FLOAt" + mode: "NULLABLE" + - name: "east_lon" + type: "FLOAT" + mode: "NULLABLE" + - name: "total_size" + type: "INTEGER" + mode: "NULLABLE" + - name: "base_url" + type: "STRING" + mode: "NULLABLE" + + graph_paths: + - "landsat_index_transform_csv >> load_landsat_index_to_bq" diff --git a/datasets/cloud_storage_geo_index/sentinel_2_index/pipeline.yaml b/datasets/cloud_storage_geo_index/sentinel_2_index/pipeline.yaml new file mode 100644 index 000000000..c4a0494b0 --- /dev/null +++ b/datasets/cloud_storage_geo_index/sentinel_2_index/pipeline.yaml @@ -0,0 +1,125 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + - type: bigquery_table + table_id: sentinel_2_index + description: "Sentinel_2_Index Dataset" + +dag: + airflow_version: 2 + initialize: + dag_id: sentinel_2_index + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "sentinel_2_index_transform_csv" + startup_timeout_seconds: 600 + name: "sentinel_2_index" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.cloud_storage_geo_index.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://storage.googleapis.com/gcp-public-data-sentinel-2/index.csv.gz" + SOURCE_FILE: "files/data.csv.gz" + TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "1000000" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cloud_storage_geo_index/sentinel_2_index/data_output.csv" + PIPELINE_NAME: "sentinel_2_index" + CSV_HEADERS: >- + ["granule_id","product_id","datatake_identifier","mgrs_tile","sensing_time","geometric_quality_flag","generation_time","north_lat","south_lat","west_lon","east_lon","base_url","total_size","cloud_cover"] + RENAME_MAPPINGS: >- + {"GRANULE_ID": "granule_id","PRODUCT_ID": "product_id","DATATAKE_IDENTIFIER": "datatake_identifier","MGRS_TILE": "mgrs_tile","SENSING_TIME": "sensing_time","TOTAL_SIZE": "total_size","CLOUD_COVER": "cloud_cover","GEOMETRIC_QUALITY_FLAG": "geometric_quality_flag","GENERATION_TIME": "generation_time", "NORTH_LAT": "north_lat","SOUTH_LAT": "south_lat","WEST_LON": "west_lon","EAST_LON": "east_lon","BASE_URL": "base_url"} + resources: + limit_memory: "4G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_sentinel_2_index_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/cloud_storage_geo_index/sentinel_2_index/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cloud_storage_geo_index.sentinel_2_index" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "granule_id" + type: "STRING" + mode: "REQUIRED" + - name: "product_id" + type: "STRING" + mode: "NULLABLE" + - name: "datatake_identifier" + type: "STRING" + mode: "NULLABLE" + - name: "mgrs_tile" + type: "STRING" + mode: "NULLABLE" + - name: "sensing_time" + type: "TIMESTAMP" + mode: "NULLABLE" + - name: "geometric_quality_flag" + type: "STRING" + mode: "NULLABLE" + - name: "generation_time" + type: "TIMESTAMP" + mode: "NULLABLE" + - name: "north_lat" + type: "FLOAT" + mode: "NULLABLE" + - name: "south_lat" + type: "FLOAT" + mode: "NULLABLE" + - name: "west_lon" + type: "FLOAT" + mode: "NULLABLE" + - name: "east_lon" + type: "FLOAT" + mode: "NULLABLE" + - name: "base_url" + type: "STRING" + mode: "NULLABLE" + - name: "total_size" + type: "INTEGER" + mode: "NULLABLE" + - name: "cloud_cover" + type: "FLOAT" + mode: "NULLABLE" + + graph_paths: + - "sentinel_2_index_transform_csv >> load_sentinel_2_index_to_bq" diff --git a/datasets/cloud_storage_geo_index/sentinel_2_index/sentinel_2_index_dag.py b/datasets/cloud_storage_geo_index/sentinel_2_index/sentinel_2_index_dag.py new file mode 100644 index 000000000..8bf5b138f --- /dev/null +++ b/datasets/cloud_storage_geo_index/sentinel_2_index/sentinel_2_index_dag.py @@ -0,0 +1,103 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="cloud_storage_geo_index.sentinel_2_index", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + sentinel_2_index_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="sentinel_2_index_transform_csv", + startup_timeout_seconds=600, + name="sentinel_2_index", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cloud_storage_geo_index.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/gcp-public-data-sentinel-2/index.csv.gz", + "SOURCE_FILE": "files/data.csv.gz", + "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "1000000", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cloud_storage_geo_index/sentinel_2_index/data_output.csv", + "PIPELINE_NAME": "sentinel_2_index", + "CSV_HEADERS": '["granule_id","product_id","datatake_identifier","mgrs_tile","sensing_time","geometric_quality_flag","generation_time","north_lat","south_lat","west_lon","east_lon","base_url","total_size","cloud_cover"]', + "RENAME_MAPPINGS": '{"GRANULE_ID": "granule_id","PRODUCT_ID": "product_id","DATATAKE_IDENTIFIER": "datatake_identifier","MGRS_TILE": "mgrs_tile","SENSING_TIME": "sensing_time","TOTAL_SIZE": "total_size","CLOUD_COVER": "cloud_cover","GEOMETRIC_QUALITY_FLAG": "geometric_quality_flag","GENERATION_TIME": "generation_time", "NORTH_LAT": "north_lat","SOUTH_LAT": "south_lat","WEST_LON": "west_lon","EAST_LON": "east_lon","BASE_URL": "base_url"}', + }, + resources={"limit_memory": "8G", "limit_cpu": "3"}, + ) + + # Task to load CSV data to a BigQuery table + load_sentinel_2_index_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_sentinel_2_index_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=[ + "data/cloud_storage_geo_index/sentinel_2_index/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="cloud_storage_geo_index.sentinel_2_index", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "granule_id", "type": "STRING", "mode": "REQUIRED"}, + {"name": "product_id", "type": "STRING", "mode": "NULLABLE"}, + {"name": "datatake_identifier", "type": "STRING", "mode": "NULLABLE"}, + {"name": "mgrs_tile", "type": "STRING", "mode": "NULLABLE"}, + {"name": "sensing_time", "type": "TIMESTAMP", "mode": "NULLABLE"}, + {"name": "geometric_quality_flag", "type": "STRING", "mode": "NULLABLE"}, + {"name": "generation_time", "type": "TIMESTAMP", "mode": "NULLABLE"}, + {"name": "north_lat", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "south_lat", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "west_lon", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "east_lon", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "base_url", "type": "STRING", "mode": "NULLABLE"}, + {"name": "total_size", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "cloud_cover", "type": "FLOAT", "mode": "NULLABLE"}, + ], + ) + + sentinel_2_index_transform_csv >> load_sentinel_2_index_to_bq