From 37f8b66b8933c2b8fafc67ed0b6af76054d07234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Tue, 29 Mar 2022 15:42:07 +0200 Subject: [PATCH 1/7] chore: add testcase for issue #1460 --- tests/test_github_issue1460/Snakefile | 29 ++++++++++++ .../expected-results/blob.txt | 1 + .../expected-results/test.txt | 1 + tests/test_google_lifesciences.py | 45 +++++++++++++++++-- 4 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 tests/test_github_issue1460/Snakefile create mode 100644 tests/test_github_issue1460/expected-results/blob.txt create mode 100644 tests/test_github_issue1460/expected-results/test.txt diff --git a/tests/test_github_issue1460/Snakefile b/tests/test_github_issue1460/Snakefile new file mode 100644 index 000000000..f0921895b --- /dev/null +++ b/tests/test_github_issue1460/Snakefile @@ -0,0 +1,29 @@ +rule all: + input: + "blob.txt", + "test.txt" + + +rule intermediate: + input: + "preblob.txt", + "pretest.txt", + output: + "blob.txt", + "test.txt", + shell: + """ + cp {input[0]} {output[0]} + cp {input[1]} {output[1]} + """ + + +rule create: + output: + "preblob.txt", + "pretest.txt" + shell: + ''' + echo "test file" > {output[0]} + echo "test file" > {output[1]} + ''' diff --git a/tests/test_github_issue1460/expected-results/blob.txt b/tests/test_github_issue1460/expected-results/blob.txt new file mode 100644 index 000000000..16b14f5da --- /dev/null +++ b/tests/test_github_issue1460/expected-results/blob.txt @@ -0,0 +1 @@ +test file diff --git a/tests/test_github_issue1460/expected-results/test.txt b/tests/test_github_issue1460/expected-results/test.txt new file mode 100644 index 000000000..16b14f5da --- /dev/null +++ b/tests/test_github_issue1460/expected-results/test.txt @@ -0,0 +1 @@ +test file diff --git a/tests/test_google_lifesciences.py b/tests/test_google_lifesciences.py index dc735f8fc..71fe2cc78 100644 --- a/tests/test_google_lifesciences.py +++ b/tests/test_google_lifesciences.py @@ -20,7 +20,7 @@ def has_google_credentials(): ) -def cleanup_google_storage(prefix, bucket_name="snakemake-testing"): +def cleanup_google_storage(prefix, bucket_name="snakemake-testing", restrict_to=None): """Given a storage prefix and a bucket, recursively delete files there This is intended to run after testing to ensure that the bucket is cleaned up. @@ -34,9 +34,14 @@ def cleanup_google_storage(prefix, bucket_name="snakemake-testing"): blobs = bucket.list_blobs(prefix="source") for blob in blobs: blob.delete() - # Using API we get an exception about bucket deletion - shell("gsutil -m rm -r gs://{bucket.name}/* || true") - bucket.delete() + blobs = bucket.list_blobs(prefix=prefix) + for blob in blobs: + if restrict_to is None or f"{bucket_name}/{blob.name}" in restrict_to: + blob.delete() + if restrict_to is None: + # Using API we get an exception about bucket deletion + shell("gsutil -m rm -r gs://{bucket.name}/* || true") + bucket.delete() def create_google_storage(bucket_name="snakemake-testing"): @@ -126,3 +131,35 @@ def test_github_issue1396(): ) finally: cleanup_google_storage(storage_prefix, bucket_name) + + +def test_github_issue1460(): + bucket_name = "snakemake-testing-%s" % next(tempfile._get_candidate_names()) + create_google_storage(bucket_name) + storage_prefix = "test_github_issue1460" + prefix = "%s/%s" % (bucket_name, storage_prefix) + workdir = dpath("test_github_issue1460") + try: + run( + workdir, + default_remote_prefix=prefix, + google_lifesciences=True, + google_lifesciences_cache=False, + ) + cleanup_google_storage( + storage_prefix, + bucket_name, + restrict_to=[ + f"{prefix}/test.txt", + f"{prefix}/blob.txt", + f"{prefix}/pretest.txt", + ], + ) + run( + workdir, + default_remote_prefix=prefix, + google_lifesciences=True, + google_lifesciences_cache=False, + ) + finally: + cleanup_google_storage(storage_prefix, bucket_name) From 5f51fda0a0b698b49b8aa21a74ff4934fd28583b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Wed, 30 Mar 2022 14:03:23 +0200 Subject: [PATCH 2/7] fix expected results --- tests/test_github_issue1460/expected-results/.gitkeep | 0 tests/test_github_issue1460/expected-results/blob.txt | 1 - tests/test_github_issue1460/expected-results/test.txt | 1 - 3 files changed, 2 deletions(-) create mode 100644 tests/test_github_issue1460/expected-results/.gitkeep delete mode 100644 tests/test_github_issue1460/expected-results/blob.txt delete mode 100644 tests/test_github_issue1460/expected-results/test.txt diff --git a/tests/test_github_issue1460/expected-results/.gitkeep b/tests/test_github_issue1460/expected-results/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_github_issue1460/expected-results/blob.txt b/tests/test_github_issue1460/expected-results/blob.txt deleted file mode 100644 index 16b14f5da..000000000 --- a/tests/test_github_issue1460/expected-results/blob.txt +++ /dev/null @@ -1 +0,0 @@ -test file diff --git a/tests/test_github_issue1460/expected-results/test.txt b/tests/test_github_issue1460/expected-results/test.txt deleted file mode 100644 index 16b14f5da..000000000 --- a/tests/test_github_issue1460/expected-results/test.txt +++ /dev/null @@ -1 +0,0 @@ -test file From c04fb4c1a283d741d7f1596d910cd17e163bb276 Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 30 Mar 2022 12:23:00 -0600 Subject: [PATCH 3/7] try force update of inventory after upload Signed-off-by: vsoch --- snakemake/executors/google_lifesciences.py | 58 +++++++++++++++------- snakemake/io.py | 10 ++++ 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/snakemake/executors/google_lifesciences.py b/snakemake/executors/google_lifesciences.py index 10a4b2778..9b3531785 100644 --- a/snakemake/executors/google_lifesciences.py +++ b/snakemake/executors/google_lifesciences.py @@ -33,7 +33,8 @@ class GoogleLifeSciencesExecutor(ClusterExecutor): - """the GoogleLifeSciences executor uses Google Cloud Storage, and + """ + The GoogleLifeSciences executor uses Google Cloud Storage, and Compute Engine paired with the Google Life Sciences API. https://cloud.google.com/life-sciences/docs/quickstart """ @@ -125,7 +126,8 @@ def get_default_resources_args(self, default_resources=None): ) def _get_services(self): - """use the Google Discovery Build to generate API clients + """ + Use the Google Discovery Build to generate API clients for Life Sciences, and use the google storage python client for storage. """ @@ -184,7 +186,8 @@ def build_request(http, *args, **kwargs): self._bucket_service = storage.Client() def _get_bucket(self): - """get a connection to the storage bucket (self.bucket) and exit + """ + Get a connection to the storage bucket (self.bucket) and exit if the name is taken or otherwise invalid. Parameters @@ -223,7 +226,8 @@ def _get_bucket(self): logger.debug("logs=%s" % self.gs_logs) def _set_location(self, location=None): - """The location is where the Google Life Sciences API is located. + """ + The location is where the Google Life Sciences API is located. This can be meaningful if the requester has data residency requirements or multi-zone needs. To determine this value, we first use the locations API to determine locations available, @@ -290,7 +294,8 @@ def _set_location(self, location=None): ) def shutdown(self): - """shutdown deletes build packages if the user didn't request to clean + """ + Shutdown deletes build packages if the user didn't request to clean up the cache. At this point we've already cancelled running jobs. """ from google.api_core import retry @@ -334,7 +339,8 @@ def cancel(self): self.shutdown() def get_available_machine_types(self): - """Using the regions available at self.regions, use the GCP API + """ + Using the regions available at self.regions, use the GCP API to retrieve a lookup dictionary of all available machine types. """ # Regular expression to determine if zone in region @@ -374,7 +380,8 @@ def get_available_machine_types(self): return machine_types def _add_gpu(self, gpu_count): - """Add a number of NVIDIA gpus to the current executor. This works + """ + Add a number of NVIDIA gpus to the current executor. This works by way of adding nvidia_gpu to the job default resources, and also changing the default machine type prefix to be n1, which is the currently only supported instance type for using GPUs for LHS. @@ -393,7 +400,8 @@ def _add_gpu(self, gpu_count): self._machine_type_prefix = "n1" def _set_preemptible_rules(self, preemption_default=None, preemptible_rules=None): - """define a lookup dictionary for preemptible instance retries, which + """ + Define a lookup dictionary for preemptible instance retries, which is supported by the Google Life Science API. The user can set a default for all steps, specify per step, or define a default for all steps that aren't individually customized. @@ -418,7 +426,8 @@ def _set_preemptible_rules(self, preemption_default=None, preemptible_rules=None rule.restart_times = restart_times def _generate_job_resources(self, job): - """given a particular job, generate the resources that it needs, + """ + Given a particular job, generate the resources that it needs, including default regions and the virtual machine configuration """ # Right now, do a best effort mapping of resources to instance types @@ -563,7 +572,8 @@ def _generate_job_resources(self, job): return resources def _get_accelerator(self, gpu_count, zone, gpu_model=None): - """Get an appropriate accelerator for a GPU given a zone selection. + """ + Get an appropriate accelerator for a GPU given a zone selection. Currently Google offers NVIDIA Tesla T4 (likely the best), NVIDIA P100, and the same T4 for a graphical workstation. Since this isn't a graphical workstation use case, we choose the @@ -620,7 +630,8 @@ def get_snakefile(self): return self.workflow.main_snakefile.replace(self.workdir, "").strip(os.sep) def _set_workflow_sources(self): - """We only add files from the working directory that are config related + """ + We only add files from the working directory that are config related (e.g., the Snakefile or a config.yml equivalent), or checked into git. """ self.workflow_sources = [] @@ -640,7 +651,8 @@ def _set_workflow_sources(self): ) def _generate_build_source_package(self): - """in order for the instance to access the working directory in storage, + """ + In order for the instance to access the working directory in storage, we need to upload it. This file is cleaned up at the end of the run. We do this, and then obtain from the instance and extract. """ @@ -687,7 +699,8 @@ def _generate_build_source_package(self): return hash_tar def _upload_build_source_package(self, targz): - """given a .tar.gz created for a workflow, upload it to source/cache + """ + Given a .tar.gz created for a workflow, upload it to source/cache of Google storage, only if the blob doesn't already exist. """ from google.api_core import retry @@ -728,7 +741,9 @@ def _generate_log_action(self, job): return action def _generate_job_action(self, job): - """generate a single action to execute the job.""" + """ + Generate a single action to execute the job. + """ exec_job = self.format_job_exec(job) # The full command to download the archive, extract, and run @@ -764,7 +779,8 @@ def _get_jobname(self, job): return "snakejob-%s-%s-%s" % (self.run_namespace, job.name, job.jobid) def _generate_pipeline_labels(self, job): - """generate basic labels to identify the job, namespace, and that + """ + Generate basic labels to identify the job, namespace, and that snakemake is running the show! """ jobname = self._get_jobname(job) @@ -788,7 +804,8 @@ def _generate_environment(self): return envvars def _generate_pipeline(self, job): - """based on the job details, generate a google Pipeline object + """ + Based on the job details, generate a google Pipeline object to pass to pipelines.run. This includes actions, resources, environment, and timeout. """ @@ -864,7 +881,8 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None): ) def _job_was_successful(self, status): - """based on a status response (a [pipeline].projects.locations.operations.get + """ + Based on a status response (a [pipeline].projects.locations.operations.get debug print the list of events, return True if all return codes 0 and False otherwise (indication of failure). In that a nonzero exit status is found, we also debug print it for the user. @@ -897,7 +915,8 @@ def _job_was_successful(self, status): return success def _retry_request(self, request, timeout=2, attempts=3): - """The Google Python API client frequently has BrokenPipe errors. This + """ + The Google Python API client frequently has BrokenPipe errors. This function takes a request, and executes it up to number of retry, each time with a 2* increase in timeout. @@ -936,7 +955,8 @@ def _retry_request(self, request, timeout=2, attempts=3): raise ex def _wait_for_jobs(self): - """wait for jobs to complete. This means requesting their status, + """ + Wait for jobs to complete. This means requesting their status, and then marking them as finished when a "done" parameter shows up. Even for finished jobs, the status should still return """ diff --git a/snakemake/io.py b/snakemake/io.py index 2a600c99e..08987f9a0 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -269,6 +269,13 @@ def wrapper(self, *args, **kwargs): def inventory(self): async_run(self._inventory()) + async def _remote_inventory(self): + """Force a cache update for the remote inventory, which always changes.""" + cache = self.rule.workflow.iocache + if not self.is_remote or not cache.active: + return + await asyncio.gather(*[self.remote_object.inventory(cache)]) + async def _inventory(self): """Starting from the given file, try to cache as much existence and modification date information of this and other files as possible. @@ -606,6 +613,9 @@ def upload_to_remote(self): self.remote_object.upload() logger.info("Finished upload.") + # Ensure we update the remote cache + async_run(self._remote_inventory()) + def prepare(self): path_until_wildcard = re.split(DYNAMIC_FILL, self.file)[0] dir = os.path.dirname(path_until_wildcard) From 16668430835ff10406fa5faef38994f32c75378d Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 30 Mar 2022 14:11:26 -0600 Subject: [PATCH 4/7] run gls tests first Signed-off-by: vsoch --- .github/workflows/main.yml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 81d522c39..5097ec8f3 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -115,6 +115,15 @@ jobs: run: | echo CONTAINER_IMAGE=snakemake/snakemake:$GITHUB_SHA >> $GITHUB_ENV + # TODO reactivate in April (we have no free resources left this month) + - name: Test Google Life Sciences Executor + if: env.GCP_AVAILABLE + run: | + # activate conda env + export PATH="/usr/share/miniconda/bin:$PATH" + source activate snakemake + pytest -s -v -x tests/test_google_lifesciences.py + # TODO reactivate in April (we have no free resources left this month) - name: Test Kubernetes execution if: env.GCP_AVAILABLE @@ -139,15 +148,6 @@ jobs: # pytest -v -x -s tests/test_tibanna.py - # TODO reactivate in April (we have no free resources left this month) - - name: Test Google Life Sciences Executor - if: env.GCP_AVAILABLE - run: | - # activate conda env - export PATH="/usr/share/miniconda/bin:$PATH" - source activate snakemake - pytest -s -v -x tests/test_google_lifesciences.py - - name: Test GA4GH TES executor run: | # activate conda env From 136b0e019a342043060baf9587c65927a650eec1 Mon Sep 17 00:00:00 2001 From: vsoch Date: Fri, 1 Apr 2022 18:04:10 -0600 Subject: [PATCH 5/7] pushing test fix the issue is that the blob object held by a GS remote object can go sort of stale, returning a False for blob.exists() when it clearly exists! To fix we need to do an additional self.update_blob() and then returning the exists check again. I am not sure if this can be made more efficient by only checking under certain conditions, but since it seems likely we cannot perfectly know when the blob has gone stale the sure way is to always update. Signed-off-by: vsoch --- snakemake/io.py | 1 + snakemake/remote/GS.py | 6 ++++-- tests/conftest.py | 4 +++- tests/test_google_lifesciences.py | 1 + tests/test_io.py | 13 ++++++++----- tests/testapi.py | 9 ++++++++- 6 files changed, 25 insertions(+), 9 deletions(-) diff --git a/snakemake/io.py b/snakemake/io.py index 08987f9a0..36cc05a74 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -96,6 +96,7 @@ def lutime(f, times): def lchmod(f, mode): os.chmod(f, mode, follow_symlinks=False) + else: def lchmod(f, mode): diff --git a/snakemake/remote/GS.py b/snakemake/remote/GS.py index ea92e7994..d17135ad8 100644 --- a/snakemake/remote/GS.py +++ b/snakemake/remote/GS.py @@ -206,8 +206,10 @@ def exists(self): return True elif any(self.directory_entries()): return True - else: - return False + + # The blob object can get out of sync, one last try! + self.update_blob() + return self.blob.exists() @retry.Retry(predicate=google_cloud_retry_predicate) def mtime(self): diff --git a/tests/conftest.py b/tests/conftest.py index 2671d16ea..01f08aa9e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,9 @@ skip_on_windows = pytest.mark.skipif(ON_WINDOWS, reason="Unix stuff") only_on_windows = pytest.mark.skipif(not ON_WINDOWS, reason="Windows stuff") -needs_strace = pytest.mark.xfail(os.system("strace -o /dev/null true") != 0, reason="Missing strace") +needs_strace = pytest.mark.xfail( + os.system("strace -o /dev/null true") != 0, reason="Missing strace" +) @pytest.fixture(autouse=True) diff --git a/tests/test_google_lifesciences.py b/tests/test_google_lifesciences.py index 71fe2cc78..63a75a410 100644 --- a/tests/test_google_lifesciences.py +++ b/tests/test_google_lifesciences.py @@ -28,6 +28,7 @@ def cleanup_google_storage(prefix, bucket_name="snakemake-testing", restrict_to= Arguments: prefix (str) : the "subfolder" or prefix for some files in the buckets bucket_name (str) : the name of the bucket, default snakemake-testing + restrict_to (list) : only delete files in these paths (None deletes all) """ client = storage.Client() bucket = client.get_bucket(bucket_name) diff --git a/tests/test_io.py b/tests/test_io.py index 9b6745143..e9d0c8254 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -104,8 +104,11 @@ def __repr__(self): ) == sorted(["a: aa + b: b", "a: aa + b: bb", "c: c", "c: cc"]) # expand on pathlib.Path objects - assert expand( - PosixPath() / "{x}" / "{y}", - x="Hello", - y="world", - ) == ["Hello/world"] + assert ( + expand( + PosixPath() / "{x}" / "{y}", + x="Hello", + y="world", + ) + == ["Hello/world"] + ) diff --git a/tests/testapi.py b/tests/testapi.py index b06a0b9d5..e3904ac78 100644 --- a/tests/testapi.py +++ b/tests/testapi.py @@ -72,4 +72,11 @@ def test_dicts_in_config(): ), file=f, ) - snakemake(path, workdir=tmpdir, config={"this_option": "does_not_break", "test": {'this_dict':'shoult_not_either'}}) + snakemake( + path, + workdir=tmpdir, + config={ + "this_option": "does_not_break", + "test": {"this_dict": "shoult_not_either"}, + }, + ) From cee8b9ddba3f541ba9973dbd7f2eda9c70bc315d Mon Sep 17 00:00:00 2001 From: vsoch Date: Fri, 1 Apr 2022 18:07:55 -0600 Subject: [PATCH 6/7] wrong version of black Signed-off-by: vsoch --- snakemake/io.py | 1 - tests/test_expand.py | 17 +++++++---------- tests/test_io.py | 13 +++++-------- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/snakemake/io.py b/snakemake/io.py index 36cc05a74..08987f9a0 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -96,7 +96,6 @@ def lutime(f, times): def lchmod(f, mode): os.chmod(f, mode, follow_symlinks=False) - else: def lchmod(f, mode): diff --git a/tests/test_expand.py b/tests/test_expand.py index daba72ea4..e98733a69 100644 --- a/tests/test_expand.py +++ b/tests/test_expand.py @@ -65,13 +65,10 @@ def test_allow_missing(): "4_{c}.b", ] # replace product - assert ( - expand( - ["{a}_{b}_{C}.ab", "{b}_{c}.b"], - zip, - a="1 2".split(), - b="3 4".split(), - allow_missing=True, - ) - == ["1_3_{C}.ab", "2_4_{C}.ab", "3_{c}.b", "4_{c}.b"] - ) + assert expand( + ["{a}_{b}_{C}.ab", "{b}_{c}.b"], + zip, + a="1 2".split(), + b="3 4".split(), + allow_missing=True, + ) == ["1_3_{C}.ab", "2_4_{C}.ab", "3_{c}.b", "4_{c}.b"] diff --git a/tests/test_io.py b/tests/test_io.py index e9d0c8254..9b6745143 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -104,11 +104,8 @@ def __repr__(self): ) == sorted(["a: aa + b: b", "a: aa + b: bb", "c: c", "c: cc"]) # expand on pathlib.Path objects - assert ( - expand( - PosixPath() / "{x}" / "{y}", - x="Hello", - y="world", - ) - == ["Hello/world"] - ) + assert expand( + PosixPath() / "{x}" / "{y}", + x="Hello", + y="world", + ) == ["Hello/world"] From 5ffa06197232704710c522704dd66e4ba930dd02 Mon Sep 17 00:00:00 2001 From: vsoch Date: Fri, 1 Apr 2022 19:04:01 -0600 Subject: [PATCH 7/7] remove unused remote inventory, was just testing! Signed-off-by: vsoch --- snakemake/io.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/snakemake/io.py b/snakemake/io.py index 08987f9a0..2a600c99e 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -269,13 +269,6 @@ def wrapper(self, *args, **kwargs): def inventory(self): async_run(self._inventory()) - async def _remote_inventory(self): - """Force a cache update for the remote inventory, which always changes.""" - cache = self.rule.workflow.iocache - if not self.is_remote or not cache.active: - return - await asyncio.gather(*[self.remote_object.inventory(cache)]) - async def _inventory(self): """Starting from the given file, try to cache as much existence and modification date information of this and other files as possible. @@ -613,9 +606,6 @@ def upload_to_remote(self): self.remote_object.upload() logger.info("Finished upload.") - # Ensure we update the remote cache - async_run(self._remote_inventory()) - def prepare(self): path_until_wildcard = re.split(DYNAMIC_FILL, self.file)[0] dir = os.path.dirname(path_until_wildcard)