diff --git a/datasets/geos_fp/_images/rolling_copy/Dockerfile b/datasets/geos_fp/_images/rolling_copy/Dockerfile new file mode 100644 index 000000000..ef95518c0 --- /dev/null +++ b/datasets/geos_fp/_images/rolling_copy/Dockerfile @@ -0,0 +1,40 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# Install wget +RUN apt-get install wget + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./script.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "script.py"] diff --git a/datasets/geos_fp/_images/rolling_copy/Pipfile b/datasets/geos_fp/_images/rolling_copy/Pipfile new file mode 100644 index 000000000..798e84f1f --- /dev/null +++ b/datasets/geos_fp/_images/rolling_copy/Pipfile @@ -0,0 +1,13 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +beautifulsoup4 = "*" +requests = "*" + +[dev-packages] + +[requires] +python_version = "3.9" diff --git a/datasets/geos_fp/_images/rolling_copy/requirements.txt b/datasets/geos_fp/_images/rolling_copy/requirements.txt new file mode 100644 index 000000000..16a3aa210 --- /dev/null +++ b/datasets/geos_fp/_images/rolling_copy/requirements.txt @@ -0,0 +1,3 @@ +BeautifulSoup4 +requests +google-cloud-storage diff --git a/datasets/geos_fp/_images/rolling_copy/script.py b/datasets/geos_fp/_images/rolling_copy/script.py new file mode 100644 index 000000000..bbaa88396 --- /dev/null +++ b/datasets/geos_fp/_images/rolling_copy/script.py @@ -0,0 +1,206 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import os +import pathlib +import subprocess +import typing +from datetime import date, timedelta + +import bs4 +import requests +from google.cloud import storage + +# The manifest file contains a list of files already downloaded for a given date +MANIFEST_FILE = "manifest.txt" + + +def main( + base_url: str, + dt: date, + download_dir: pathlib.Path, + target_bucket: str, + batch_size: int, +) -> None: + # Get date prefix, e.g. Y2021/M01/D01, and create directories for them + date_prefix = _date_prefix(dt) + (download_dir / date_prefix).mkdir(parents=True, exist_ok=True) + + # Generate a set of all .nc4 files from the specified url and date + all_files = get_all_files(base_url, date_prefix) + + stored_files = get_stored_files(target_bucket, date_prefix, download_dir) + + # Files present in the source webpage but not yet stored on GCS + unstored_files = all_files - stored_files + + download_and_store_new_files( + download_dir, date_prefix, unstored_files, batch_size, target_bucket + ) + + +def _date_prefix(dt: date) -> typing.List[str]: + # Generates URL paths to folders containing the .nc4 files, for example + # https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das/Y2021/M01/D01/ + # => Y2021/M01/D01 + return f"Y{dt.year}/M{dt.month:0>2}/D{dt.day:0>2}" + + +def get_all_files(base_url: str, date_prefix: str) -> typing.Set[str]: + all_files = set() + url = f"{base_url}/{date_prefix}" + response = requests.get(url) + if response.status_code == 200: + logging.info(f"Scraping .nc4 files in {url}") + webpage = bs4.BeautifulSoup(response.text, "html.parser") + all_files.update(scrape(date_prefix, webpage)) + else: + logging.warning(f"The following URL doesn't exist, will try again later: {url}") + return all_files + + +def get_stored_files( + bucket_name: str, date_prefix: str, download_dir: pathlib.Path +) -> typing.Set[str]: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + gcs_object = f"{date_prefix}/{MANIFEST_FILE}" + local_object = download_dir / MANIFEST_FILE + + if storage.Blob(bucket=bucket, name=gcs_object).exists(storage_client): + logging.info(f"Manifest file found at gs://{bucket_name}/{gcs_object}") + blob = bucket.blob(gcs_object) + blob.download_to_filename(str(local_object)) + else: + local_object.touch() + + with open(local_object) as f: + return set(f.read().splitlines()) + + +def scrape(source_path: str, webpage: bs4.BeautifulSoup) -> typing.List[str]: + file_paths = [] + + # Go through all the URLs in the page and collect the ones ending in ".nc4" + for a_tag in webpage.find_all("a"): + + # The `href` property is the filename, + # e.g. GEOS.fp.asm.inst1_2d_smp_Nx.20210101_1700.V01.nc4 + if a_tag.get("href") and a_tag["href"].endswith(".nc4"): + file_paths.append(f"{source_path}/{a_tag['href']}") + + return file_paths + + +def download_and_store_new_files( + download_dir: pathlib.Path, + date_prefix: str, + new_files: typing.Set[str], + batch_size: int, + target_bucket: str, +) -> None: + """In batches, download files from the source to the local filesystem + and upload them to the GCS target bucket + """ + total_files = len(new_files) + logging.info(f"Downloading {total_files} files.") + for n, batch in enumerate(batches(list(new_files), batch_size=batch_size), 1): + logging.info( + f"Processing batch {n}: {(n - 1) * batch_size + 1} to {min(total_files, n * batch_size)}" + ) + download_batch(batch, download_dir) + move_dir_contents_to_gcs(download_dir, target_bucket, date_prefix) + update_manifest_file(batch, download_dir, target_bucket, date_prefix) + + +def download_batch(batch: typing.List[str], download_dir: pathlib.Path) -> None: + for file_path in batch: + logging.info(f"Downloading file to {download_dir}/{file_path}") + subprocess.check_call( + [ + "wget", + f"{os.environ['BASE_URL']}/{file_path}", + "-O", + f"{download_dir}/{file_path}", + "-nv", + ] + ) + + +def move_dir_contents_to_gcs( + dir_: pathlib.Path, target_bucket: str, date_prefix: str +) -> None: + subprocess.check_call( + [ + "gsutil", + "-m", + "-o", + "GSUtil:parallel_composite_upload_threshold=250M", + "cp", + f"{dir_}/{date_prefix}/*.nc4", + f"gs://{target_bucket}/{date_prefix}", + ] + ) + delete_dir_contents(dir_ / date_prefix) + + +def delete_dir_contents(dir_to_delete: pathlib.Path) -> None: + """Delete directory contents, but not the dir itself. This is useful for keeping + date dirs such as Y2021/M07/D12 intact for the next batch of files to use. + """ + [f.unlink() for f in dir_to_delete.glob("*") if f.is_file()] + + +def update_manifest_file( + paths: typing.Set[str], + download_dir: pathlib.Path, + target_bucket: str, + date_prefix: str, +) -> None: + manifest_path = download_dir / MANIFEST_FILE + with open(manifest_path, "a") as f: + f.write("\n".join(paths)) + f.write("\n") + subprocess.check_call( + [ + "gsutil", + "cp", + str(manifest_path), + f"gs://{target_bucket}/{date_prefix}/{MANIFEST_FILE}", + ] + ) + + +def batches(file_paths: typing.List[str], batch_size: int): + for i in range(0, len(file_paths), batch_size): + yield file_paths[i : i + batch_size] + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + assert os.environ["BASE_URL"] + assert os.environ["TODAY_DIFF"] + assert os.environ["DOWNLOAD_DIR"] + assert os.environ["TARGET_BUCKET"] + + main( + base_url=os.environ["BASE_URL"], + dt=(date.today() - timedelta(days=int(os.environ["TODAY_DIFF"]))), + download_dir=pathlib.Path(os.environ["DOWNLOAD_DIR"]).expanduser(), + target_bucket=os.environ["TARGET_BUCKET"], + batch_size=int(os.getenv("BATCH_SIZE", 10)), + ) diff --git a/datasets/geos_fp/_terraform/geos_fp_dataset.tf b/datasets/geos_fp/_terraform/geos_fp_dataset.tf new file mode 100644 index 000000000..002fb2567 --- /dev/null +++ b/datasets/geos_fp/_terraform/geos_fp_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_storage_bucket" "geos-fp" { + name = "${var.bucket_name_prefix}-geos-fp" + force_destroy = true + uniform_bucket_level_access = true +} + +output "storage_bucket-geos-fp-name" { + value = google_storage_bucket.geos-fp.name +} diff --git a/datasets/geos_fp/_terraform/provider.tf b/datasets/geos_fp/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/geos_fp/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/geos_fp/_terraform/variables.tf b/datasets/geos_fp/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/geos_fp/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py new file mode 100644 index 000000000..fbab6770b --- /dev/null +++ b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py @@ -0,0 +1,218 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_delete_operator, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-06-01", +} + + +with DAG( + dag_id="geos_fp.copy_files_rolling_basis", + default_args=default_args, + max_active_runs=1, + schedule_interval="0 2 * * *", + catchup=False, + default_view="graph", +) as dag: + + # Copy files to GCS on the specified date + copy_files_dated_today = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "0", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, + startup_timeout_seconds=600, + ) + + # Copy files to GCS on the specified date + copy_files_dated_today_minus_1_day = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_1_day", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "1", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, + startup_timeout_seconds=600, + ) + + # Copy files to GCS on the specified date + copy_files_dated_today_minus_2_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_2_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "2", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, + startup_timeout_seconds=600, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_3_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_3_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "3", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, + startup_timeout_seconds=600, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_4_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_4_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "4", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, + startup_timeout_seconds=600, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_5_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_5_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "5", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, + startup_timeout_seconds=600, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_6_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_6_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "6", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, + startup_timeout_seconds=600, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_7_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_7_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "7", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, + startup_timeout_seconds=600, + ) + + # Deletes GCS data more than 7 days ago + delete_old_data = gcs_delete_operator.GoogleCloudStorageDeleteOperator( + task_id="delete_old_data", + bucket_name="{{ var.json.geos_fp.destination_bucket }}", + prefix="{{ macros.ds_format(macros.ds_add(ds, -8), \u0027%Y-%m-%d\u0027, \u0027Y%Y/M%m/D%d\u0027) }}", + ) + + delete_old_data + copy_files_dated_today + copy_files_dated_today_minus_1_day + copy_files_dated_today_minus_2_days + copy_files_dated_today_minus_3_days + copy_files_dated_today_minus_4_days + copy_files_dated_today_minus_5_days + copy_files_dated_today_minus_6_days + copy_files_dated_today_minus_7_days diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml new file mode 100644 index 000000000..077603d43 --- /dev/null +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -0,0 +1,224 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: ~ + +dag: + airflow_version: 1 + initialize: + dag_id: copy_files_rolling_basis + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-06-01' + max_active_runs: 1 + schedule_interval: "0 2 * * *" # Daily at 2am UTC + catchup: False + default_view: "graph" + + tasks: + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on the specified date" + args: + task_id: "copy_files_dated_today" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "0" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true + startup_timeout_seconds: 600 + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on the specified date" + args: + task_id: "copy_files_dated_today_minus_1_day" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "1" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true + startup_timeout_seconds: 600 + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on the specified date" + args: + task_id: "copy_files_dated_today_minus_2_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "2" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true + startup_timeout_seconds: 600 + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_3_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "3" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true + startup_timeout_seconds: 600 + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_4_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "4" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true + startup_timeout_seconds: 600 + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_5_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "5" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true + startup_timeout_seconds: 600 + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_6_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "6" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true + startup_timeout_seconds: 600 + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_7_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "7" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true + startup_timeout_seconds: 600 + + - operator: "GoogleCloudStorageDeleteOperator" + description: "Deletes GCS data more than 7 days ago" + args: + task_id: "delete_old_data" + bucket_name: "{{ var.json.geos_fp.destination_bucket }}" + prefix: "{{ macros.ds_format(macros.ds_add(ds, -8), '%Y-%m-%d', 'Y%Y/M%m/D%d') }}" + + graph_paths: + - "delete_old_data" + - "copy_files_dated_today" + - "copy_files_dated_today_minus_1_day" + - "copy_files_dated_today_minus_2_days" + - "copy_files_dated_today_minus_3_days" + - "copy_files_dated_today_minus_4_days" + - "copy_files_dated_today_minus_5_days" + - "copy_files_dated_today_minus_6_days" + - "copy_files_dated_today_minus_7_days" diff --git a/datasets/geos_fp/dataset.yaml b/datasets/geos_fp/dataset.yaml new file mode 100644 index 000000000..0c4efb5fd --- /dev/null +++ b/datasets/geos_fp/dataset.yaml @@ -0,0 +1,26 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: geos_fp + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + + +resources: + - type: storage_bucket + name: geos-fp + uniform_bucket_level_access: True