From f307ccefbe60895e8eced4c5040d1629c8486f9e Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Fri, 24 Sep 2021 11:41:46 -0400 Subject: [PATCH] fix: Delete temp GCS objects generated by gsutil's parallel composite upload for `geos_fp` dataset (#195) * fix: Delete temporary files generated by gsutil's parallel composite uploads * fix: fix type errors * fix: Add node pool affinities for the DAG tasks --- .../geos_fp/_images/rolling_copy/script.py | 24 +++- .../copy_files_rolling_basis_dag.py | 136 ++++++++++++++++++ .../copy_files_rolling_basis/pipeline.yaml | 72 ++++++++++ 3 files changed, 230 insertions(+), 2 deletions(-) diff --git a/datasets/geos_fp/_images/rolling_copy/script.py b/datasets/geos_fp/_images/rolling_copy/script.py index bbaa88396..9b2edaf8e 100644 --- a/datasets/geos_fp/_images/rolling_copy/script.py +++ b/datasets/geos_fp/_images/rolling_copy/script.py @@ -52,7 +52,7 @@ def main( ) -def _date_prefix(dt: date) -> typing.List[str]: +def _date_prefix(dt: date) -> str: # Generates URL paths to folders containing the .nc4 files, for example # https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das/Y2021/M01/D01/ # => Y2021/M01/D01 @@ -154,6 +154,7 @@ def move_dir_contents_to_gcs( f"gs://{target_bucket}/{date_prefix}", ] ) + delete_temp_pcu_objects(target_bucket) delete_dir_contents(dir_ / date_prefix) @@ -164,8 +165,27 @@ def delete_dir_contents(dir_to_delete: pathlib.Path) -> None: [f.unlink() for f in dir_to_delete.glob("*") if f.is_file()] +def delete_temp_pcu_objects(target_bucket: str) -> None: + """Delete temp GCS objects created by gsutil's parallel composite uploads. + See https://cloud.google.com/storage/docs/uploads-downloads#gsutil-pcu + """ + res = subprocess.run( + ["gsutil", "ls", f"gs://{target_bucket}"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + uris = res.stdout.split() + for uri in uris: + object_name = uri.split(target_bucket + "/")[-1] + if not object_name.startswith("Y"): + subprocess.check_call( + ["gsutil", "rm", "-r", f"gs://{target_bucket}/{object_name}"], + ) + + def update_manifest_file( - paths: typing.Set[str], + paths: typing.List[str], download_dir: pathlib.Path, target_bucket: str, date_prefix: str, diff --git a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py index fbab6770b..950ab960a 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py +++ b/datasets/geos_fp/copy_files_rolling_basis/copy_files_rolling_basis_dag.py @@ -37,6 +37,23 @@ task_id="copy_files_dated_today", name="geosfp", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ @@ -58,6 +75,23 @@ task_id="copy_files_dated_today_minus_1_day", name="geosfp", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ @@ -79,6 +113,23 @@ task_id="copy_files_dated_today_minus_2_days", name="geosfp", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ @@ -100,6 +151,23 @@ task_id="copy_files_dated_today_minus_3_days", name="geosfp", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ @@ -121,6 +189,23 @@ task_id="copy_files_dated_today_minus_4_days", name="geosfp", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ @@ -142,6 +227,23 @@ task_id="copy_files_dated_today_minus_5_days", name="geosfp", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ @@ -163,6 +265,23 @@ task_id="copy_files_dated_today_minus_6_days", name="geosfp", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ @@ -184,6 +303,23 @@ task_id="copy_files_dated_today_minus_7_days", name="geosfp", namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, image="{{ var.json.geos_fp.container_registry.rolling_copy }}", image_pull_policy="Always", env_vars={ diff --git a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml index 077603d43..7bfb61a4e 100644 --- a/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml +++ b/datasets/geos_fp/copy_files_rolling_basis/pipeline.yaml @@ -35,6 +35,15 @@ dag: task_id: "copy_files_dated_today" name: "geosfp" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: @@ -57,6 +66,15 @@ dag: task_id: "copy_files_dated_today_minus_1_day" name: "geosfp" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: @@ -79,6 +97,15 @@ dag: task_id: "copy_files_dated_today_minus_2_days" name: "geosfp" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: @@ -101,6 +128,15 @@ dag: task_id: "copy_files_dated_today_minus_3_days" name: "geosfp" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: @@ -123,6 +159,15 @@ dag: task_id: "copy_files_dated_today_minus_4_days" name: "geosfp" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: @@ -145,6 +190,15 @@ dag: task_id: "copy_files_dated_today_minus_5_days" name: "geosfp" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: @@ -167,6 +221,15 @@ dag: task_id: "copy_files_dated_today_minus_6_days" name: "geosfp" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: @@ -189,6 +252,15 @@ dag: task_id: "copy_files_dated_today_minus_7_days" name: "geosfp" namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: