From ce9de3a2a173093b59c2d928beb9c6cecd126bc1 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 17 Jun 2021 14:13:49 -0400 Subject: [PATCH 01/15] config files for rolling 10-day pipeline --- .../copy_files_rolling_basis/pipeline.yaml | 52 +++++++++++++++++++ datasets/geos_fp/dataset.yaml | 26 ++++++++++ 2 files changed, 78 insertions(+) create mode 100644 datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml create mode 100644 datasets/geos_fp/dataset.yaml diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml new file mode 100644 index 000000000..7f79d3d5e --- /dev/null +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -0,0 +1,52 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: ~ + +dag: + initialize: + dag_id: copy_files_rolling_basis + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-06-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_in_last_n_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.crawl_and_download }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY: "{{ ds }}" + DOWNLOAD_DIR: "/geos_fp/data" + MANIFEST_PATH: "/geos_fp/manifest.txt" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + DAYS_ROLLING: "10" + BATCH_SIZE: "50" + resources: + limit_memory: "512M" + limit_cpu: "2" + + graph_paths: + - "copy_files_in_last_n_days" diff --git a/datasets/geos_fp/dataset.yaml b/datasets/geos_fp/dataset.yaml new file mode 100644 index 000000000..0c4efb5fd --- /dev/null +++ b/datasets/geos_fp/dataset.yaml @@ -0,0 +1,26 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: geos_fp + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + + +resources: + - type: storage_bucket + name: geos-fp + uniform_bucket_level_access: True From c2ff83eb93558bb3326112e56cd83e3e1a9625d3 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 17 Jun 2021 14:14:19 -0400 Subject: [PATCH 02/15] generated Terraform files --- .../geos_fp/_terraform/geos_fp_dataset.tf | 26 +++++++++++++++++ datasets/geos_fp/_terraform/provider.tf | 28 +++++++++++++++++++ datasets/geos_fp/_terraform/variables.tf | 23 +++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 datasets/geos_fp/_terraform/geos_fp_dataset.tf create mode 100644 datasets/geos_fp/_terraform/provider.tf create mode 100644 datasets/geos_fp/_terraform/variables.tf diff --git a/datasets/geos_fp/_terraform/geos_fp_dataset.tf b/datasets/geos_fp/_terraform/geos_fp_dataset.tf new file mode 100644 index 000000000..002fb2567 --- /dev/null +++ b/datasets/geos_fp/_terraform/geos_fp_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_storage_bucket" "geos-fp" { + name = "${var.bucket_name_prefix}-geos-fp" + force_destroy = true + uniform_bucket_level_access = true +} + +output "storage_bucket-geos-fp-name" { + value = google_storage_bucket.geos-fp.name +} diff --git a/datasets/geos_fp/_terraform/provider.tf b/datasets/geos_fp/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/geos_fp/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/geos_fp/_terraform/variables.tf b/datasets/geos_fp/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/geos_fp/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + From 0f4ee291cb1f67dbd0b090ea1c908eb44523a263 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 17 Jun 2021 14:15:12 -0400 Subject: [PATCH 03/15] generated DAG file --- .../copy_files_rolling_basis_dag.py | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py diff --git a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py new file mode 100644 index 000000000..099e55e1f --- /dev/null +++ b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py @@ -0,0 +1,54 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-06-01", +} + + +with DAG( + dag_id="geos_fp.copy_files_rolling_basis", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Copy files to GCS on a 10-day rolling basis + copy_files_in_last_n_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_in_last_n_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.crawl_and_download }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY": "{{ ds }}", + "DOWNLOAD_DIR": "/geos_fp/data", + "MANIFEST_PATH": "/geos_fp/manifest.txt", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "DAYS_ROLLING": "10", + "BATCH_SIZE": "50", + }, + resources={"limit_memory": "512M", "limit_cpu": "2"}, + ) + + copy_files_in_last_n_days From 451dfdf47eca7e72bdce80ea3e979da35794d2a8 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 17 Jun 2021 14:16:02 -0400 Subject: [PATCH 04/15] container image for GEOS-FP --- .../geos_fp/_images/rolling_copy/Dockerfile | 37 +++ datasets/geos_fp/_images/rolling_copy/Pipfile | 13 + .../_images/rolling_copy/requirements.txt | 3 + .../geos_fp/_images/rolling_copy/script.py | 231 ++++++++++++++++++ 4 files changed, 284 insertions(+) create mode 100644 datasets/geos_fp/_images/rolling_copy/Dockerfile create mode 100644 datasets/geos_fp/_images/rolling_copy/Pipfile create mode 100644 datasets/geos_fp/_images/rolling_copy/requirements.txt create mode 100644 datasets/geos_fp/_images/rolling_copy/script.py diff --git a/datasets/geos_fp/_images/rolling_copy/Dockerfile b/datasets/geos_fp/_images/rolling_copy/Dockerfile new file mode 100644 index 000000000..bd80bc71b --- /dev/null +++ b/datasets/geos_fp/_images/rolling_copy/Dockerfile @@ -0,0 +1,37 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./script.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "script.py"] diff --git a/datasets/geos_fp/_images/rolling_copy/Pipfile b/datasets/geos_fp/_images/rolling_copy/Pipfile new file mode 100644 index 000000000..798e84f1f --- /dev/null +++ b/datasets/geos_fp/_images/rolling_copy/Pipfile @@ -0,0 +1,13 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +beautifulsoup4 = "*" +requests = "*" + +[dev-packages] + +[requires] +python_version = "3.9" diff --git a/datasets/geos_fp/_images/rolling_copy/requirements.txt b/datasets/geos_fp/_images/rolling_copy/requirements.txt new file mode 100644 index 000000000..16a3aa210 --- /dev/null +++ b/datasets/geos_fp/_images/rolling_copy/requirements.txt @@ -0,0 +1,3 @@ +BeautifulSoup4 +requests +google-cloud-storage diff --git a/datasets/geos_fp/_images/rolling_copy/script.py b/datasets/geos_fp/_images/rolling_copy/script.py new file mode 100644 index 000000000..20dcfa310 --- /dev/null +++ b/datasets/geos_fp/_images/rolling_copy/script.py @@ -0,0 +1,231 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import os +import pathlib +import subprocess +import tempfile +import typing +from datetime import date, datetime, timedelta + +import bs4 +import requests +from google.cloud import storage + + +def main( + base_url: str, + today: date, + download_dir: pathlib.Path, + target_bucket: str, + manifest_path: pathlib.Path, + days_rolling: int, + batch_size: int, +) -> None: + # Collects all paths that span the last N days rolling. + # Example path: Y2021/M01/D01 + source_paths = folder_paths_last_n_days(today, n_days=days_rolling) + create_local_dirs(download_dir, source_paths) + + new_paths = set() + for source_path in source_paths: + url = f"{base_url}/{source_path}" + + response = requests.get(url) + if response.status_code == 200: + logging.info(f"Scraping .nc4 files in {url}") + webpage = bs4.BeautifulSoup(response.text, "html.parser") + new_paths.update(scrape(source_path, webpage)) + else: + logging.warning( + f"The following URL doesn't exist, will try again later: {url}" + ) + + old_paths = get_old_paths(target_bucket, manifest_path) + for_download = urls_to_download(old_paths, new_paths) + + copy_new_files(download_dir, for_download, manifest_path, batch_size, target_bucket) + remove_old_files_from_gcs( + download_dir, base_url, urls_to_delete(old_paths, new_paths) + ) + update_manifest_file(new_paths, manifest_path, target_bucket) + + +def folder_paths_last_n_days(today: date, n_days: int) -> typing.List[str]: + dates = [today - timedelta(days=n) for n in range(n_days)] + + # Generates URLs to folders containing the .nc4 files, for example + # https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das/Y2021/M01/D01/ + return [f"Y{dt.year}/M{dt.month:0>2}/D{dt.day:0>2}" for dt in dates] + + +def create_local_dirs( + download_dir: pathlib.Path, source_paths: typing.List[str] +) -> None: + for source_path in source_paths: + (download_dir / source_path).mkdir(parents=True, exist_ok=True) + + +def scrape(source_path: str, webpage: bs4.BeautifulSoup) -> typing.List[str]: + file_paths = [] + + # Finds all URLs that end with ".nc4" + for a_tag in webpage.find_all("a"): + + # The `href` property is the filename, + # e.g. GEOS.fp.asm.inst1_2d_smp_Nx.20210101_1700.V01.nc4 + if a_tag.get("href") and a_tag["href"].endswith(".nc4"): + file_paths.append(f"{source_path}/{a_tag['href']}") + + return file_paths + + +def get_old_paths(bucket_name: str, manifest_path: pathlib.Path) -> typing.Set[str]: + name = "manifest.txt" + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + + if storage.Blob(bucket=bucket, name=name).exists(storage_client): + logging.info(f"Manifest file found at gs://{bucket_name}/{name}") + blob = bucket.blob(name) + blob.download_to_filename(str(manifest_path)) + return set(manifest_path.read_text().split("\n")) + else: + manifest_path.touch() + return set() + + +def download_item(parent_dir: pathlib.Path, file_path: str) -> None: + r = requests.get(f"{os.environ['BASE_URL']}/{file_path}", stream=True) + if r.status_code == 200: + with open(parent_dir / file_path, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.warning(f"Couldn't download {file_path}: {r.text}") + + +def copy_new_files( + download_dir: pathlib.Path, + file_paths: typing.List[str], + manifest_path: pathlib.Path, + batch_size: int, + target_bucket: str, +) -> None: + total_files = len(file_paths) + logging.info(f"Downloading {total_files} files.") + for n, batch in enumerate(batches(file_paths, batch_size=batch_size), 1): + logging.info( + f"Processing batch {n}: {(n - 1) * batch_size + 1} to {min(total_files, n * batch_size)}" + ) + copy_batch(batch, download_dir, target_bucket) + + manifest = set(manifest_path.read_text().split("\n")) + manifest.update(batch) + update_manifest_file( + paths=manifest, + manifest_path=manifest_path, + target_bucket=target_bucket, + ) + + +def copy_batch( + batch: typing.List[str], + download_dir: pathlib.Path, + target_bucket: str, +) -> None: + for file_path in batch: + download_item(pathlib.Path(download_dir), file_path) + move_contents_to_gcs(pathlib.Path(download_dir), target_bucket) + + +def move_contents_to_gcs(dir: pathlib.Path, target_bucket: str) -> None: + subprocess.check_call( + ["gsutil", "-m", "mv", "-r", f"{dir}/", f"gs://{target_bucket}"] + ) + + +def remove_old_files_from_gcs( + download_dir: pathlib.Path, base_url: str, file_paths: typing.List[str] +) -> None: + logging.info(f"Deleting {len(file_paths)} files.") + + if not file_paths: + return + + # Save the urls in a temporary text file, so we can pass it into gsutil + # with the `-m` option to perform parallel deletions. + with tempfile.NamedTemporaryFile(dir=download_dir, mode="w") as tmp_file: + tmp_file.write( + "\n".join([f"{base_url}/{file_path}" for file_path in file_paths]) + ) + + ps = subprocess.Popen(["cat", tmp_file], stdout=subprocess.PIPE) + subprocess.check_output(["gsutil", "-m", "rm", "-I"], stdin=ps.stdout) + ps.wait() + + +def update_manifest_file( + paths: typing.Set[str], manifest_path: pathlib.Path, target_bucket: str +) -> None: + with open(manifest_path, "w") as file_: + file_.write("\n".join(paths)) + subprocess.check_call(["gsutil", "cp", manifest_path, f"gs://{target_bucket}"]) + + +def urls_to_download( + old_urls: typing.Set[str], new_urls: typing.Set[str] +) -> typing.Set[str]: + urls = [] + for new_url in new_urls: + if new_url not in old_urls: + urls.append(new_url) + return urls + + +def urls_to_delete( + old_urls: typing.Set[str], new_urls: typing.Set[str] +) -> typing.Set[str]: + urls = [] + for old_url in old_urls: + if old_url not in new_urls: + urls.append(old_url) + return urls + + +def batches(file_paths: typing.List[str], batch_size: int): + for i in range(0, len(file_paths), batch_size): + yield file_paths[i : i + batch_size] + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + assert os.environ["TODAY"] + assert os.environ["BASE_URL"] + assert os.environ["DOWNLOAD_DIR"] + assert os.environ["TARGET_BUCKET"] + assert os.environ["MANIFEST_PATH"] + + main( + base_url=os.environ["BASE_URL"], + today=datetime.strptime(os.environ["TODAY"], "%Y-%m-%d").date(), + download_dir=pathlib.Path(os.environ["DOWNLOAD_DIR"]).expanduser(), + manifest_path=pathlib.Path(os.environ["MANIFEST_PATH"]).expanduser(), + target_bucket=os.environ["TARGET_BUCKET"], + days_rolling=int(os.getenv("DAYS_ROLLING", 10)), + batch_size=int(os.getenv("BATCH_SIZE", 100)), + ) From e98f9fcc03986ac0d0818fb634e87d4a81ee8f1a Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Fri, 18 Jun 2021 13:48:59 -0400 Subject: [PATCH 05/15] increased memory and cpu for container --- .../copy_files_rolling_basis/copy_files_rolling_basis_dag.py | 2 +- datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py index 099e55e1f..733d1d246 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py +++ b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py @@ -48,7 +48,7 @@ "DAYS_ROLLING": "10", "BATCH_SIZE": "50", }, - resources={"limit_memory": "512M", "limit_cpu": "2"}, + resources={"request_memory": "4G", "request_cpu": "4"}, ) copy_files_in_last_n_days diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index 7f79d3d5e..cf2ff38ca 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -45,8 +45,8 @@ dag: DAYS_ROLLING: "10" BATCH_SIZE: "50" resources: - limit_memory: "512M" - limit_cpu: "2" + request_memory: "4G" + request_cpu: "4" graph_paths: - "copy_files_in_last_n_days" From 6fc2c909c543dbc5d5d91ad41983c02166c4f8a4 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 29 Jul 2021 14:29:47 -0400 Subject: [PATCH 06/15] install wget in container --- datasets/geos_fp/_images/rolling_copy/Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datasets/geos_fp/_images/rolling_copy/Dockerfile b/datasets/geos_fp/_images/rolling_copy/Dockerfile index bd80bc71b..ef95518c0 100644 --- a/datasets/geos_fp/_images/rolling_copy/Dockerfile +++ b/datasets/geos_fp/_images/rolling_copy/Dockerfile @@ -24,6 +24,9 @@ COPY requirements.txt ./ # Install the packages specified in the requirements file RUN python3 -m pip install --no-cache-dir -r requirements.txt +# Install wget +RUN apt-get install wget + # The WORKDIR instruction sets the working directory for any RUN, CMD, # ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. # If the WORKDIR doesn’t exist, it will be created even if it’s not used in From 95c303c0ab8f57a5293a5e9e24df66ba645d8679 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 29 Jul 2021 14:33:51 -0400 Subject: [PATCH 07/15] revised container script to use one manifest file per date --- .../geos_fp/_images/rolling_copy/script.py | 251 ++++++++---------- 1 file changed, 113 insertions(+), 138 deletions(-) diff --git a/datasets/geos_fp/_images/rolling_copy/script.py b/datasets/geos_fp/_images/rolling_copy/script.py index 20dcfa310..bbaa88396 100644 --- a/datasets/geos_fp/_images/rolling_copy/script.py +++ b/datasets/geos_fp/_images/rolling_copy/script.py @@ -17,72 +17,84 @@ import os import pathlib import subprocess -import tempfile import typing -from datetime import date, datetime, timedelta +from datetime import date, timedelta import bs4 import requests from google.cloud import storage +# The manifest file contains a list of files already downloaded for a given date +MANIFEST_FILE = "manifest.txt" + def main( base_url: str, - today: date, + dt: date, download_dir: pathlib.Path, target_bucket: str, - manifest_path: pathlib.Path, - days_rolling: int, batch_size: int, ) -> None: - # Collects all paths that span the last N days rolling. - # Example path: Y2021/M01/D01 - source_paths = folder_paths_last_n_days(today, n_days=days_rolling) - create_local_dirs(download_dir, source_paths) - - new_paths = set() - for source_path in source_paths: - url = f"{base_url}/{source_path}" - - response = requests.get(url) - if response.status_code == 200: - logging.info(f"Scraping .nc4 files in {url}") - webpage = bs4.BeautifulSoup(response.text, "html.parser") - new_paths.update(scrape(source_path, webpage)) - else: - logging.warning( - f"The following URL doesn't exist, will try again later: {url}" - ) - - old_paths = get_old_paths(target_bucket, manifest_path) - for_download = urls_to_download(old_paths, new_paths) - - copy_new_files(download_dir, for_download, manifest_path, batch_size, target_bucket) - remove_old_files_from_gcs( - download_dir, base_url, urls_to_delete(old_paths, new_paths) - ) - update_manifest_file(new_paths, manifest_path, target_bucket) + # Get date prefix, e.g. Y2021/M01/D01, and create directories for them + date_prefix = _date_prefix(dt) + (download_dir / date_prefix).mkdir(parents=True, exist_ok=True) + + # Generate a set of all .nc4 files from the specified url and date + all_files = get_all_files(base_url, date_prefix) + stored_files = get_stored_files(target_bucket, date_prefix, download_dir) + + # Files present in the source webpage but not yet stored on GCS + unstored_files = all_files - stored_files + + download_and_store_new_files( + download_dir, date_prefix, unstored_files, batch_size, target_bucket + ) -def folder_paths_last_n_days(today: date, n_days: int) -> typing.List[str]: - dates = [today - timedelta(days=n) for n in range(n_days)] - # Generates URLs to folders containing the .nc4 files, for example +def _date_prefix(dt: date) -> typing.List[str]: + # Generates URL paths to folders containing the .nc4 files, for example # https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das/Y2021/M01/D01/ - return [f"Y{dt.year}/M{dt.month:0>2}/D{dt.day:0>2}" for dt in dates] + # => Y2021/M01/D01 + return f"Y{dt.year}/M{dt.month:0>2}/D{dt.day:0>2}" + + +def get_all_files(base_url: str, date_prefix: str) -> typing.Set[str]: + all_files = set() + url = f"{base_url}/{date_prefix}" + response = requests.get(url) + if response.status_code == 200: + logging.info(f"Scraping .nc4 files in {url}") + webpage = bs4.BeautifulSoup(response.text, "html.parser") + all_files.update(scrape(date_prefix, webpage)) + else: + logging.warning(f"The following URL doesn't exist, will try again later: {url}") + return all_files -def create_local_dirs( - download_dir: pathlib.Path, source_paths: typing.List[str] -) -> None: - for source_path in source_paths: - (download_dir / source_path).mkdir(parents=True, exist_ok=True) +def get_stored_files( + bucket_name: str, date_prefix: str, download_dir: pathlib.Path +) -> typing.Set[str]: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + gcs_object = f"{date_prefix}/{MANIFEST_FILE}" + local_object = download_dir / MANIFEST_FILE + + if storage.Blob(bucket=bucket, name=gcs_object).exists(storage_client): + logging.info(f"Manifest file found at gs://{bucket_name}/{gcs_object}") + blob = bucket.blob(gcs_object) + blob.download_to_filename(str(local_object)) + else: + local_object.touch() + + with open(local_object) as f: + return set(f.read().splitlines()) def scrape(source_path: str, webpage: bs4.BeautifulSoup) -> typing.List[str]: file_paths = [] - # Finds all URLs that end with ".nc4" + # Go through all the URLs in the page and collect the ones ending in ".nc4" for a_tag in webpage.find_all("a"): # The `href` property is the filename, @@ -93,117 +105,83 @@ def scrape(source_path: str, webpage: bs4.BeautifulSoup) -> typing.List[str]: return file_paths -def get_old_paths(bucket_name: str, manifest_path: pathlib.Path) -> typing.Set[str]: - name = "manifest.txt" - storage_client = storage.Client() - bucket = storage_client.bucket(bucket_name) - - if storage.Blob(bucket=bucket, name=name).exists(storage_client): - logging.info(f"Manifest file found at gs://{bucket_name}/{name}") - blob = bucket.blob(name) - blob.download_to_filename(str(manifest_path)) - return set(manifest_path.read_text().split("\n")) - else: - manifest_path.touch() - return set() - - -def download_item(parent_dir: pathlib.Path, file_path: str) -> None: - r = requests.get(f"{os.environ['BASE_URL']}/{file_path}", stream=True) - if r.status_code == 200: - with open(parent_dir / file_path, "wb") as f: - for chunk in r: - f.write(chunk) - else: - logging.warning(f"Couldn't download {file_path}: {r.text}") - - -def copy_new_files( +def download_and_store_new_files( download_dir: pathlib.Path, - file_paths: typing.List[str], - manifest_path: pathlib.Path, + date_prefix: str, + new_files: typing.Set[str], batch_size: int, target_bucket: str, ) -> None: - total_files = len(file_paths) + """In batches, download files from the source to the local filesystem + and upload them to the GCS target bucket + """ + total_files = len(new_files) logging.info(f"Downloading {total_files} files.") - for n, batch in enumerate(batches(file_paths, batch_size=batch_size), 1): + for n, batch in enumerate(batches(list(new_files), batch_size=batch_size), 1): logging.info( f"Processing batch {n}: {(n - 1) * batch_size + 1} to {min(total_files, n * batch_size)}" ) - copy_batch(batch, download_dir, target_bucket) - - manifest = set(manifest_path.read_text().split("\n")) - manifest.update(batch) - update_manifest_file( - paths=manifest, - manifest_path=manifest_path, - target_bucket=target_bucket, - ) + download_batch(batch, download_dir) + move_dir_contents_to_gcs(download_dir, target_bucket, date_prefix) + update_manifest_file(batch, download_dir, target_bucket, date_prefix) -def copy_batch( - batch: typing.List[str], - download_dir: pathlib.Path, - target_bucket: str, -) -> None: +def download_batch(batch: typing.List[str], download_dir: pathlib.Path) -> None: for file_path in batch: - download_item(pathlib.Path(download_dir), file_path) - move_contents_to_gcs(pathlib.Path(download_dir), target_bucket) + logging.info(f"Downloading file to {download_dir}/{file_path}") + subprocess.check_call( + [ + "wget", + f"{os.environ['BASE_URL']}/{file_path}", + "-O", + f"{download_dir}/{file_path}", + "-nv", + ] + ) -def move_contents_to_gcs(dir: pathlib.Path, target_bucket: str) -> None: +def move_dir_contents_to_gcs( + dir_: pathlib.Path, target_bucket: str, date_prefix: str +) -> None: subprocess.check_call( - ["gsutil", "-m", "mv", "-r", f"{dir}/", f"gs://{target_bucket}"] + [ + "gsutil", + "-m", + "-o", + "GSUtil:parallel_composite_upload_threshold=250M", + "cp", + f"{dir_}/{date_prefix}/*.nc4", + f"gs://{target_bucket}/{date_prefix}", + ] ) + delete_dir_contents(dir_ / date_prefix) -def remove_old_files_from_gcs( - download_dir: pathlib.Path, base_url: str, file_paths: typing.List[str] -) -> None: - logging.info(f"Deleting {len(file_paths)} files.") - - if not file_paths: - return - - # Save the urls in a temporary text file, so we can pass it into gsutil - # with the `-m` option to perform parallel deletions. - with tempfile.NamedTemporaryFile(dir=download_dir, mode="w") as tmp_file: - tmp_file.write( - "\n".join([f"{base_url}/{file_path}" for file_path in file_paths]) - ) - - ps = subprocess.Popen(["cat", tmp_file], stdout=subprocess.PIPE) - subprocess.check_output(["gsutil", "-m", "rm", "-I"], stdin=ps.stdout) - ps.wait() +def delete_dir_contents(dir_to_delete: pathlib.Path) -> None: + """Delete directory contents, but not the dir itself. This is useful for keeping + date dirs such as Y2021/M07/D12 intact for the next batch of files to use. + """ + [f.unlink() for f in dir_to_delete.glob("*") if f.is_file()] def update_manifest_file( - paths: typing.Set[str], manifest_path: pathlib.Path, target_bucket: str + paths: typing.Set[str], + download_dir: pathlib.Path, + target_bucket: str, + date_prefix: str, ) -> None: - with open(manifest_path, "w") as file_: - file_.write("\n".join(paths)) - subprocess.check_call(["gsutil", "cp", manifest_path, f"gs://{target_bucket}"]) - - -def urls_to_download( - old_urls: typing.Set[str], new_urls: typing.Set[str] -) -> typing.Set[str]: - urls = [] - for new_url in new_urls: - if new_url not in old_urls: - urls.append(new_url) - return urls - - -def urls_to_delete( - old_urls: typing.Set[str], new_urls: typing.Set[str] -) -> typing.Set[str]: - urls = [] - for old_url in old_urls: - if old_url not in new_urls: - urls.append(old_url) - return urls + manifest_path = download_dir / MANIFEST_FILE + with open(manifest_path, "a") as f: + f.write("\n".join(paths)) + f.write("\n") + subprocess.check_call( + [ + "gsutil", + "cp", + str(manifest_path), + f"gs://{target_bucket}/{date_prefix}/{MANIFEST_FILE}", + ] + ) def batches(file_paths: typing.List[str], batch_size: int): @@ -214,18 +192,15 @@ def batches(file_paths: typing.List[str], batch_size: int): if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) - assert os.environ["TODAY"] assert os.environ["BASE_URL"] + assert os.environ["TODAY_DIFF"] assert os.environ["DOWNLOAD_DIR"] assert os.environ["TARGET_BUCKET"] - assert os.environ["MANIFEST_PATH"] main( base_url=os.environ["BASE_URL"], - today=datetime.strptime(os.environ["TODAY"], "%Y-%m-%d").date(), + dt=(date.today() - timedelta(days=int(os.environ["TODAY_DIFF"]))), download_dir=pathlib.Path(os.environ["DOWNLOAD_DIR"]).expanduser(), - manifest_path=pathlib.Path(os.environ["MANIFEST_PATH"]).expanduser(), target_bucket=os.environ["TARGET_BUCKET"], - days_rolling=int(os.getenv("DAYS_ROLLING", 10)), - batch_size=int(os.getenv("BATCH_SIZE", 100)), + batch_size=int(os.getenv("BATCH_SIZE", 10)), ) From ef241d7d683487348f6ff7658e6f8a6cd639eda5 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 29 Jul 2021 14:36:19 -0400 Subject: [PATCH 08/15] revised DAG to use one operator per date --- .../copy_files_rolling_basis/pipeline.yaml | 160 ++++++++++++++++-- 1 file changed, 150 insertions(+), 10 deletions(-) diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index cf2ff38ca..4ffd58690 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -25,28 +25,168 @@ dag: max_active_runs: 1 schedule_interval: "@daily" catchup: False - default_view: graph + default_view: "graph" tasks: + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on the specified date" + args: + task_id: "copy_files_dated_today" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "0" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on the specified date" + args: + task_id: "copy_files_dated_today_minus_1_day" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "1" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on the specified date" + args: + task_id: "copy_files_dated_today_minus_2_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "2" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_3_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "3" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_4_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "4" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_5_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "5" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" args: - task_id: "copy_files_in_last_n_days" + task_id: "copy_files_dated_today_minus_6_days" name: "geosfp" namespace: "default" - image: "{{ var.json.geos_fp.container_registry.crawl_and_download }}" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" - TODAY: "{{ ds }}" + TODAY_DIFF: "6" DOWNLOAD_DIR: "/geos_fp/data" - MANIFEST_PATH: "/geos_fp/manifest.txt" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" - DAYS_ROLLING: "10" - BATCH_SIZE: "50" + BATCH_SIZE: "10" resources: - request_memory: "4G" - request_cpu: "4" + request_memory: "1G" + request_cpu: "1" + - operator: "KubernetesPodOperator" + description: "Copy files to GCS on a 10-day rolling basis" + args: + task_id: "copy_files_dated_today_minus_7_days" + name: "geosfp" + namespace: "default" + image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" + image_pull_policy: "Always" + env_vars: + BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" + TODAY_DIFF: "7" + DOWNLOAD_DIR: "/geos_fp/data" + TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" + BATCH_SIZE: "10" + resources: + request_memory: "1G" + request_cpu: "1" + + - operator: "GoogleCloudStorageDeleteOperator" + description: "Deletes GCS data more than 7 days ago" + + args: + task_id: "delete_old_data" + bucket_name: "{{ var.json.geos_fp.destination_bucket }}" + prefix: Y{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%Y") }}/M{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%m") }}/D{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%d") }} +# graph_paths: - - "copy_files_in_last_n_days" + - "delete_old_data" + - "copy_files_dated_today" + - "copy_files_dated_today_minus_1_day" + - "copy_files_dated_today_minus_2_days" + - "copy_files_dated_today_minus_3_days" + - "copy_files_dated_today_minus_4_days" + - "copy_files_dated_today_minus_5_days" + - "copy_files_dated_today_minus_6_days" + - "copy_files_dated_today_minus_7_days" From dc98136eb672122b6b14f58b0a4379ad4148d9bb Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 29 Jul 2021 14:36:44 -0400 Subject: [PATCH 09/15] regenerate DAG .py file --- .../copy_files_rolling_basis_dag.py | 152 ++++++++++++++++-- 1 file changed, 142 insertions(+), 10 deletions(-) diff --git a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py index 733d1d246..6927a5908 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py +++ b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py @@ -14,7 +14,7 @@ from airflow import DAG -from airflow.contrib.operators import kubernetes_pod_operator +from airflow.contrib.operators import gcs_delete_operator, kubernetes_pod_operator default_args = { "owner": "Google", @@ -32,23 +32,155 @@ default_view="graph", ) as dag: + # Copy files to GCS on the specified date + copy_files_dated_today = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "0", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + ) + + # Copy files to GCS on the specified date + copy_files_dated_today_minus_1_day = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_1_day", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "1", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + ) + + # Copy files to GCS on the specified date + copy_files_dated_today_minus_2_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_2_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "2", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_3_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_3_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "3", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_4_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_4_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "4", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_5_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_5_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "5", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + ) + # Copy files to GCS on a 10-day rolling basis - copy_files_in_last_n_days = kubernetes_pod_operator.KubernetesPodOperator( - task_id="copy_files_in_last_n_days", + copy_files_dated_today_minus_6_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_6_days", name="geosfp", namespace="default", - image="{{ var.json.geos_fp.container_registry.crawl_and_download }}", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", - "TODAY": "{{ ds }}", + "TODAY_DIFF": "6", "DOWNLOAD_DIR": "/geos_fp/data", - "MANIFEST_PATH": "/geos_fp/manifest.txt", "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", - "DAYS_ROLLING": "10", - "BATCH_SIZE": "50", + "BATCH_SIZE": "10", }, - resources={"request_memory": "4G", "request_cpu": "4"}, + resources={"request_memory": "1G", "request_cpu": "1"}, + ) + + # Copy files to GCS on a 10-day rolling basis + copy_files_dated_today_minus_7_days = kubernetes_pod_operator.KubernetesPodOperator( + task_id="copy_files_dated_today_minus_7_days", + name="geosfp", + namespace="default", + image="{{ var.json.geos_fp.container_registry.rolling_copy }}", + image_pull_policy="Always", + env_vars={ + "BASE_URL": "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das", + "TODAY_DIFF": "7", + "DOWNLOAD_DIR": "/geos_fp/data", + "TARGET_BUCKET": "{{ var.json.geos_fp.destination_bucket }}", + "BATCH_SIZE": "10", + }, + resources={"request_memory": "1G", "request_cpu": "1"}, + ) + + # Deletes GCS data more than 7 days ago + delete_old_data = gcs_delete_operator.GoogleCloudStorageDeleteOperator( + task_id="delete_old_data", + bucket_name="{{ var.json.geos_fp.destination_bucket }}", + prefix='Y{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%Y") }}/M{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%m") }}/D{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%d") }}', ) - copy_files_in_last_n_days + delete_old_data + copy_files_dated_today + copy_files_dated_today_minus_1_day + copy_files_dated_today_minus_2_days + copy_files_dated_today_minus_3_days + copy_files_dated_today_minus_4_days + copy_files_dated_today_minus_5_days + copy_files_dated_today_minus_6_days + copy_files_dated_today_minus_7_days From 8e25ee7f558edffc7a9efff23d04434d6a4e785f Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 29 Jul 2021 14:54:12 -0400 Subject: [PATCH 10/15] fixed YAML linting --- datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index 4ffd58690..935654516 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -179,7 +179,7 @@ dag: task_id: "delete_old_data" bucket_name: "{{ var.json.geos_fp.destination_bucket }}" prefix: Y{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%Y") }}/M{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%m") }}/D{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%d") }} -# + graph_paths: - "delete_old_data" - "copy_files_dated_today" From 8bcecaf26680522357b1e2933548ad434b35d05f Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 29 Jul 2021 17:20:57 -0400 Subject: [PATCH 11/15] set schedule to be slightly after midnight UTC --- .../copy_files_rolling_basis/copy_files_rolling_basis_dag.py | 2 +- datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py index 6927a5908..f3ce570c1 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py +++ b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py @@ -27,7 +27,7 @@ dag_id="geos_fp.copy_files_rolling_basis", default_args=default_args, max_active_runs=1, - schedule_interval="@daily", + schedule_interval="0 2 * * *", catchup=False, default_view="graph", ) as dag: diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index 935654516..0ad7916ae 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -23,7 +23,7 @@ dag: depends_on_past: False start_date: '2021-06-01' max_active_runs: 1 - schedule_interval: "@daily" + schedule_interval: "0 2 * * *" # Daily at 2am UTC catchup: False default_view: "graph" From a0c523e51c7b2d0a86aa548f7bfda5ed5b0ceea9 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 29 Jul 2021 18:26:42 -0400 Subject: [PATCH 12/15] add retries --- .../copy_files_rolling_basis_dag.py | 24 +++++++++++++++++++ .../copy_files_rolling_basis/pipeline.yaml | 24 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py index f3ce570c1..5f5cd629c 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py +++ b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py @@ -47,6 +47,9 @@ "BATCH_SIZE": "10", }, resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, ) # Copy files to GCS on the specified date @@ -64,6 +67,9 @@ "BATCH_SIZE": "10", }, resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, ) # Copy files to GCS on the specified date @@ -81,6 +87,9 @@ "BATCH_SIZE": "10", }, resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, ) # Copy files to GCS on a 10-day rolling basis @@ -98,6 +107,9 @@ "BATCH_SIZE": "10", }, resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, ) # Copy files to GCS on a 10-day rolling basis @@ -115,6 +127,9 @@ "BATCH_SIZE": "10", }, resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, ) # Copy files to GCS on a 10-day rolling basis @@ -132,6 +147,9 @@ "BATCH_SIZE": "10", }, resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, ) # Copy files to GCS on a 10-day rolling basis @@ -149,6 +167,9 @@ "BATCH_SIZE": "10", }, resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, ) # Copy files to GCS on a 10-day rolling basis @@ -166,6 +187,9 @@ "BATCH_SIZE": "10", }, resources={"request_memory": "1G", "request_cpu": "1"}, + retries=3, + retry_delay=300, + retry_exponential_backoff=True, ) # Deletes GCS data more than 7 days ago diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index 0ad7916ae..c45ffd022 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -45,6 +45,9 @@ dag: resources: request_memory: "1G" request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true - operator: "KubernetesPodOperator" description: "Copy files to GCS on the specified date" @@ -63,6 +66,9 @@ dag: resources: request_memory: "1G" request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true - operator: "KubernetesPodOperator" description: "Copy files to GCS on the specified date" @@ -81,6 +87,9 @@ dag: resources: request_memory: "1G" request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -99,6 +108,9 @@ dag: resources: request_memory: "1G" request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -117,6 +129,9 @@ dag: resources: request_memory: "1G" request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -135,6 +150,9 @@ dag: resources: request_memory: "1G" request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -153,6 +171,9 @@ dag: resources: request_memory: "1G" request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -171,6 +192,9 @@ dag: resources: request_memory: "1G" request_cpu: "1" + retries: 3 + retry_delay: 300 + retry_exponential_backoff: true - operator: "GoogleCloudStorageDeleteOperator" description: "Deletes GCS data more than 7 days ago" From fd356d5931b7503064d69e880e44afff871f9ce4 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Fri, 30 Jul 2021 13:20:20 -0400 Subject: [PATCH 13/15] add kpod timeout and simplified old data prefix --- .../copy_files_rolling_basis_dag.py | 10 +++++++++- .../geos_fp/copy_files_rolling_basis/pipeline.yaml | 11 +++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py index 5f5cd629c..fbab6770b 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py +++ b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py @@ -50,6 +50,7 @@ retries=3, retry_delay=300, retry_exponential_backoff=True, + startup_timeout_seconds=600, ) # Copy files to GCS on the specified date @@ -70,6 +71,7 @@ retries=3, retry_delay=300, retry_exponential_backoff=True, + startup_timeout_seconds=600, ) # Copy files to GCS on the specified date @@ -90,6 +92,7 @@ retries=3, retry_delay=300, retry_exponential_backoff=True, + startup_timeout_seconds=600, ) # Copy files to GCS on a 10-day rolling basis @@ -110,6 +113,7 @@ retries=3, retry_delay=300, retry_exponential_backoff=True, + startup_timeout_seconds=600, ) # Copy files to GCS on a 10-day rolling basis @@ -130,6 +134,7 @@ retries=3, retry_delay=300, retry_exponential_backoff=True, + startup_timeout_seconds=600, ) # Copy files to GCS on a 10-day rolling basis @@ -150,6 +155,7 @@ retries=3, retry_delay=300, retry_exponential_backoff=True, + startup_timeout_seconds=600, ) # Copy files to GCS on a 10-day rolling basis @@ -170,6 +176,7 @@ retries=3, retry_delay=300, retry_exponential_backoff=True, + startup_timeout_seconds=600, ) # Copy files to GCS on a 10-day rolling basis @@ -190,13 +197,14 @@ retries=3, retry_delay=300, retry_exponential_backoff=True, + startup_timeout_seconds=600, ) # Deletes GCS data more than 7 days ago delete_old_data = gcs_delete_operator.GoogleCloudStorageDeleteOperator( task_id="delete_old_data", bucket_name="{{ var.json.geos_fp.destination_bucket }}", - prefix='Y{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%Y") }}/M{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%m") }}/D{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%d") }}', + prefix="{{ macros.ds_format(macros.ds_add(ds, -8), \u0027%Y-%m-%d\u0027, \u0027Y%Y/M%m/D%d\u0027) }}", ) delete_old_data diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index c45ffd022..2e273c623 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -48,6 +48,7 @@ dag: retries: 3 retry_delay: 300 retry_exponential_backoff: true + startup_timeout_seconds: 600 - operator: "KubernetesPodOperator" description: "Copy files to GCS on the specified date" @@ -69,6 +70,7 @@ dag: retries: 3 retry_delay: 300 retry_exponential_backoff: true + startup_timeout_seconds: 600 - operator: "KubernetesPodOperator" description: "Copy files to GCS on the specified date" @@ -90,6 +92,7 @@ dag: retries: 3 retry_delay: 300 retry_exponential_backoff: true + startup_timeout_seconds: 600 - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -111,6 +114,7 @@ dag: retries: 3 retry_delay: 300 retry_exponential_backoff: true + startup_timeout_seconds: 600 - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -132,6 +136,7 @@ dag: retries: 3 retry_delay: 300 retry_exponential_backoff: true + startup_timeout_seconds: 600 - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -153,6 +158,7 @@ dag: retries: 3 retry_delay: 300 retry_exponential_backoff: true + startup_timeout_seconds: 600 - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -174,6 +180,7 @@ dag: retries: 3 retry_delay: 300 retry_exponential_backoff: true + startup_timeout_seconds: 600 - operator: "KubernetesPodOperator" description: "Copy files to GCS on a 10-day rolling basis" @@ -195,14 +202,14 @@ dag: retries: 3 retry_delay: 300 retry_exponential_backoff: true + startup_timeout_seconds: 600 - operator: "GoogleCloudStorageDeleteOperator" description: "Deletes GCS data more than 7 days ago" - args: task_id: "delete_old_data" bucket_name: "{{ var.json.geos_fp.destination_bucket }}" - prefix: Y{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%Y") }}/M{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%m") }}/D{{ macros.ds_format(macros.ds_add(ds, -8), "%Y-%m-%d", "%d") }} + prefix: "{{ macros.ds_format(macros.ds_add(ds, -8), '%Y-%m-%d', 'Y%Y/M%m/D%d') }}" graph_paths: - "delete_old_data" From 87cc9320c96e3cc8ea434cbe63083fcbe5a9b084 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Wed, 4 Aug 2021 18:31:55 -0400 Subject: [PATCH 14/15] removed trailing whitespaces --- datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index 2e273c623..69abba8cd 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -178,7 +178,7 @@ dag: request_memory: "1G" request_cpu: "1" retries: 3 - retry_delay: 300 + retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 @@ -200,7 +200,7 @@ dag: request_memory: "1G" request_cpu: "1" retries: 3 - retry_delay: 300 + retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 From 6a0263486360b9f0eb39f8b81e421de0ccc125b1 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Fri, 13 Aug 2021 17:21:09 -0400 Subject: [PATCH 15/15] specify DAG to use Airflow 1 operators --- datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index 69abba8cd..077603d43 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -16,6 +16,7 @@ resources: ~ dag: + airflow_version: 1 initialize: dag_id: copy_files_rolling_basis default_args: