diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 81d522c39..5097ec8f3 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -115,6 +115,15 @@ jobs: run: | echo CONTAINER_IMAGE=snakemake/snakemake:$GITHUB_SHA >> $GITHUB_ENV + # TODO reactivate in April (we have no free resources left this month) + - name: Test Google Life Sciences Executor + if: env.GCP_AVAILABLE + run: | + # activate conda env + export PATH="/usr/share/miniconda/bin:$PATH" + source activate snakemake + pytest -s -v -x tests/test_google_lifesciences.py + # TODO reactivate in April (we have no free resources left this month) - name: Test Kubernetes execution if: env.GCP_AVAILABLE @@ -139,15 +148,6 @@ jobs: # pytest -v -x -s tests/test_tibanna.py - # TODO reactivate in April (we have no free resources left this month) - - name: Test Google Life Sciences Executor - if: env.GCP_AVAILABLE - run: | - # activate conda env - export PATH="/usr/share/miniconda/bin:$PATH" - source activate snakemake - pytest -s -v -x tests/test_google_lifesciences.py - - name: Test GA4GH TES executor run: | # activate conda env diff --git a/snakemake/executors/google_lifesciences.py b/snakemake/executors/google_lifesciences.py index 10a4b2778..9b3531785 100644 --- a/snakemake/executors/google_lifesciences.py +++ b/snakemake/executors/google_lifesciences.py @@ -33,7 +33,8 @@ class GoogleLifeSciencesExecutor(ClusterExecutor): - """the GoogleLifeSciences executor uses Google Cloud Storage, and + """ + The GoogleLifeSciences executor uses Google Cloud Storage, and Compute Engine paired with the Google Life Sciences API. https://cloud.google.com/life-sciences/docs/quickstart """ @@ -125,7 +126,8 @@ def get_default_resources_args(self, default_resources=None): ) def _get_services(self): - """use the Google Discovery Build to generate API clients + """ + Use the Google Discovery Build to generate API clients for Life Sciences, and use the google storage python client for storage. """ @@ -184,7 +186,8 @@ def build_request(http, *args, **kwargs): self._bucket_service = storage.Client() def _get_bucket(self): - """get a connection to the storage bucket (self.bucket) and exit + """ + Get a connection to the storage bucket (self.bucket) and exit if the name is taken or otherwise invalid. Parameters @@ -223,7 +226,8 @@ def _get_bucket(self): logger.debug("logs=%s" % self.gs_logs) def _set_location(self, location=None): - """The location is where the Google Life Sciences API is located. + """ + The location is where the Google Life Sciences API is located. This can be meaningful if the requester has data residency requirements or multi-zone needs. To determine this value, we first use the locations API to determine locations available, @@ -290,7 +294,8 @@ def _set_location(self, location=None): ) def shutdown(self): - """shutdown deletes build packages if the user didn't request to clean + """ + Shutdown deletes build packages if the user didn't request to clean up the cache. At this point we've already cancelled running jobs. """ from google.api_core import retry @@ -334,7 +339,8 @@ def cancel(self): self.shutdown() def get_available_machine_types(self): - """Using the regions available at self.regions, use the GCP API + """ + Using the regions available at self.regions, use the GCP API to retrieve a lookup dictionary of all available machine types. """ # Regular expression to determine if zone in region @@ -374,7 +380,8 @@ def get_available_machine_types(self): return machine_types def _add_gpu(self, gpu_count): - """Add a number of NVIDIA gpus to the current executor. This works + """ + Add a number of NVIDIA gpus to the current executor. This works by way of adding nvidia_gpu to the job default resources, and also changing the default machine type prefix to be n1, which is the currently only supported instance type for using GPUs for LHS. @@ -393,7 +400,8 @@ def _add_gpu(self, gpu_count): self._machine_type_prefix = "n1" def _set_preemptible_rules(self, preemption_default=None, preemptible_rules=None): - """define a lookup dictionary for preemptible instance retries, which + """ + Define a lookup dictionary for preemptible instance retries, which is supported by the Google Life Science API. The user can set a default for all steps, specify per step, or define a default for all steps that aren't individually customized. @@ -418,7 +426,8 @@ def _set_preemptible_rules(self, preemption_default=None, preemptible_rules=None rule.restart_times = restart_times def _generate_job_resources(self, job): - """given a particular job, generate the resources that it needs, + """ + Given a particular job, generate the resources that it needs, including default regions and the virtual machine configuration """ # Right now, do a best effort mapping of resources to instance types @@ -563,7 +572,8 @@ def _generate_job_resources(self, job): return resources def _get_accelerator(self, gpu_count, zone, gpu_model=None): - """Get an appropriate accelerator for a GPU given a zone selection. + """ + Get an appropriate accelerator for a GPU given a zone selection. Currently Google offers NVIDIA Tesla T4 (likely the best), NVIDIA P100, and the same T4 for a graphical workstation. Since this isn't a graphical workstation use case, we choose the @@ -620,7 +630,8 @@ def get_snakefile(self): return self.workflow.main_snakefile.replace(self.workdir, "").strip(os.sep) def _set_workflow_sources(self): - """We only add files from the working directory that are config related + """ + We only add files from the working directory that are config related (e.g., the Snakefile or a config.yml equivalent), or checked into git. """ self.workflow_sources = [] @@ -640,7 +651,8 @@ def _set_workflow_sources(self): ) def _generate_build_source_package(self): - """in order for the instance to access the working directory in storage, + """ + In order for the instance to access the working directory in storage, we need to upload it. This file is cleaned up at the end of the run. We do this, and then obtain from the instance and extract. """ @@ -687,7 +699,8 @@ def _generate_build_source_package(self): return hash_tar def _upload_build_source_package(self, targz): - """given a .tar.gz created for a workflow, upload it to source/cache + """ + Given a .tar.gz created for a workflow, upload it to source/cache of Google storage, only if the blob doesn't already exist. """ from google.api_core import retry @@ -728,7 +741,9 @@ def _generate_log_action(self, job): return action def _generate_job_action(self, job): - """generate a single action to execute the job.""" + """ + Generate a single action to execute the job. + """ exec_job = self.format_job_exec(job) # The full command to download the archive, extract, and run @@ -764,7 +779,8 @@ def _get_jobname(self, job): return "snakejob-%s-%s-%s" % (self.run_namespace, job.name, job.jobid) def _generate_pipeline_labels(self, job): - """generate basic labels to identify the job, namespace, and that + """ + Generate basic labels to identify the job, namespace, and that snakemake is running the show! """ jobname = self._get_jobname(job) @@ -788,7 +804,8 @@ def _generate_environment(self): return envvars def _generate_pipeline(self, job): - """based on the job details, generate a google Pipeline object + """ + Based on the job details, generate a google Pipeline object to pass to pipelines.run. This includes actions, resources, environment, and timeout. """ @@ -864,7 +881,8 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None): ) def _job_was_successful(self, status): - """based on a status response (a [pipeline].projects.locations.operations.get + """ + Based on a status response (a [pipeline].projects.locations.operations.get debug print the list of events, return True if all return codes 0 and False otherwise (indication of failure). In that a nonzero exit status is found, we also debug print it for the user. @@ -897,7 +915,8 @@ def _job_was_successful(self, status): return success def _retry_request(self, request, timeout=2, attempts=3): - """The Google Python API client frequently has BrokenPipe errors. This + """ + The Google Python API client frequently has BrokenPipe errors. This function takes a request, and executes it up to number of retry, each time with a 2* increase in timeout. @@ -936,7 +955,8 @@ def _retry_request(self, request, timeout=2, attempts=3): raise ex def _wait_for_jobs(self): - """wait for jobs to complete. This means requesting their status, + """ + Wait for jobs to complete. This means requesting their status, and then marking them as finished when a "done" parameter shows up. Even for finished jobs, the status should still return """ diff --git a/snakemake/remote/GS.py b/snakemake/remote/GS.py index ea92e7994..d17135ad8 100644 --- a/snakemake/remote/GS.py +++ b/snakemake/remote/GS.py @@ -206,8 +206,10 @@ def exists(self): return True elif any(self.directory_entries()): return True - else: - return False + + # The blob object can get out of sync, one last try! + self.update_blob() + return self.blob.exists() @retry.Retry(predicate=google_cloud_retry_predicate) def mtime(self): diff --git a/tests/conftest.py b/tests/conftest.py index 2671d16ea..01f08aa9e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,9 @@ skip_on_windows = pytest.mark.skipif(ON_WINDOWS, reason="Unix stuff") only_on_windows = pytest.mark.skipif(not ON_WINDOWS, reason="Windows stuff") -needs_strace = pytest.mark.xfail(os.system("strace -o /dev/null true") != 0, reason="Missing strace") +needs_strace = pytest.mark.xfail( + os.system("strace -o /dev/null true") != 0, reason="Missing strace" +) @pytest.fixture(autouse=True) diff --git a/tests/test_expand.py b/tests/test_expand.py index daba72ea4..e98733a69 100644 --- a/tests/test_expand.py +++ b/tests/test_expand.py @@ -65,13 +65,10 @@ def test_allow_missing(): "4_{c}.b", ] # replace product - assert ( - expand( - ["{a}_{b}_{C}.ab", "{b}_{c}.b"], - zip, - a="1 2".split(), - b="3 4".split(), - allow_missing=True, - ) - == ["1_3_{C}.ab", "2_4_{C}.ab", "3_{c}.b", "4_{c}.b"] - ) + assert expand( + ["{a}_{b}_{C}.ab", "{b}_{c}.b"], + zip, + a="1 2".split(), + b="3 4".split(), + allow_missing=True, + ) == ["1_3_{C}.ab", "2_4_{C}.ab", "3_{c}.b", "4_{c}.b"] diff --git a/tests/test_github_issue1460/Snakefile b/tests/test_github_issue1460/Snakefile new file mode 100644 index 000000000..f0921895b --- /dev/null +++ b/tests/test_github_issue1460/Snakefile @@ -0,0 +1,29 @@ +rule all: + input: + "blob.txt", + "test.txt" + + +rule intermediate: + input: + "preblob.txt", + "pretest.txt", + output: + "blob.txt", + "test.txt", + shell: + """ + cp {input[0]} {output[0]} + cp {input[1]} {output[1]} + """ + + +rule create: + output: + "preblob.txt", + "pretest.txt" + shell: + ''' + echo "test file" > {output[0]} + echo "test file" > {output[1]} + ''' diff --git a/tests/test_github_issue1460/expected-results/.gitkeep b/tests/test_github_issue1460/expected-results/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_google_lifesciences.py b/tests/test_google_lifesciences.py index dc735f8fc..63a75a410 100644 --- a/tests/test_google_lifesciences.py +++ b/tests/test_google_lifesciences.py @@ -20,7 +20,7 @@ def has_google_credentials(): ) -def cleanup_google_storage(prefix, bucket_name="snakemake-testing"): +def cleanup_google_storage(prefix, bucket_name="snakemake-testing", restrict_to=None): """Given a storage prefix and a bucket, recursively delete files there This is intended to run after testing to ensure that the bucket is cleaned up. @@ -28,15 +28,21 @@ def cleanup_google_storage(prefix, bucket_name="snakemake-testing"): Arguments: prefix (str) : the "subfolder" or prefix for some files in the buckets bucket_name (str) : the name of the bucket, default snakemake-testing + restrict_to (list) : only delete files in these paths (None deletes all) """ client = storage.Client() bucket = client.get_bucket(bucket_name) blobs = bucket.list_blobs(prefix="source") for blob in blobs: blob.delete() - # Using API we get an exception about bucket deletion - shell("gsutil -m rm -r gs://{bucket.name}/* || true") - bucket.delete() + blobs = bucket.list_blobs(prefix=prefix) + for blob in blobs: + if restrict_to is None or f"{bucket_name}/{blob.name}" in restrict_to: + blob.delete() + if restrict_to is None: + # Using API we get an exception about bucket deletion + shell("gsutil -m rm -r gs://{bucket.name}/* || true") + bucket.delete() def create_google_storage(bucket_name="snakemake-testing"): @@ -126,3 +132,35 @@ def test_github_issue1396(): ) finally: cleanup_google_storage(storage_prefix, bucket_name) + + +def test_github_issue1460(): + bucket_name = "snakemake-testing-%s" % next(tempfile._get_candidate_names()) + create_google_storage(bucket_name) + storage_prefix = "test_github_issue1460" + prefix = "%s/%s" % (bucket_name, storage_prefix) + workdir = dpath("test_github_issue1460") + try: + run( + workdir, + default_remote_prefix=prefix, + google_lifesciences=True, + google_lifesciences_cache=False, + ) + cleanup_google_storage( + storage_prefix, + bucket_name, + restrict_to=[ + f"{prefix}/test.txt", + f"{prefix}/blob.txt", + f"{prefix}/pretest.txt", + ], + ) + run( + workdir, + default_remote_prefix=prefix, + google_lifesciences=True, + google_lifesciences_cache=False, + ) + finally: + cleanup_google_storage(storage_prefix, bucket_name) diff --git a/tests/testapi.py b/tests/testapi.py index b06a0b9d5..e3904ac78 100644 --- a/tests/testapi.py +++ b/tests/testapi.py @@ -72,4 +72,11 @@ def test_dicts_in_config(): ), file=f, ) - snakemake(path, workdir=tmpdir, config={"this_option": "does_not_break", "test": {'this_dict':'shoult_not_either'}}) + snakemake( + path, + workdir=tmpdir, + config={ + "this_option": "does_not_break", + "test": {"this_dict": "shoult_not_either"}, + }, + )