Skip to content

Commit

Permalink
try force update of inventory after upload
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Mar 30, 2022
1 parent 6d03584 commit 6e10aa6
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 19 deletions.
58 changes: 39 additions & 19 deletions snakemake/executors/google_lifesciences.py
Expand Up @@ -33,7 +33,8 @@


class GoogleLifeSciencesExecutor(ClusterExecutor):
"""the GoogleLifeSciences executor uses Google Cloud Storage, and
"""
The GoogleLifeSciences executor uses Google Cloud Storage, and
Compute Engine paired with the Google Life Sciences API.
https://cloud.google.com/life-sciences/docs/quickstart
"""
Expand Down Expand Up @@ -125,7 +126,8 @@ def get_default_resources_args(self, default_resources=None):
)

def _get_services(self):
"""use the Google Discovery Build to generate API clients
"""
Use the Google Discovery Build to generate API clients
for Life Sciences, and use the google storage python client
for storage.
"""
Expand Down Expand Up @@ -184,7 +186,8 @@ def build_request(http, *args, **kwargs):
self._bucket_service = storage.Client()

def _get_bucket(self):
"""get a connection to the storage bucket (self.bucket) and exit
"""
Get a connection to the storage bucket (self.bucket) and exit
if the name is taken or otherwise invalid.
Parameters
Expand Down Expand Up @@ -223,7 +226,8 @@ def _get_bucket(self):
logger.debug("logs=%s" % self.gs_logs)

def _set_location(self, location=None):
"""The location is where the Google Life Sciences API is located.
"""
The location is where the Google Life Sciences API is located.
This can be meaningful if the requester has data residency
requirements or multi-zone needs. To determine this value,
we first use the locations API to determine locations available,
Expand Down Expand Up @@ -290,7 +294,8 @@ def _set_location(self, location=None):
)

def shutdown(self):
"""shutdown deletes build packages if the user didn't request to clean
"""
Shutdown deletes build packages if the user didn't request to clean
up the cache. At this point we've already cancelled running jobs.
"""
from google.api_core import retry
Expand Down Expand Up @@ -334,7 +339,8 @@ def cancel(self):
self.shutdown()

def get_available_machine_types(self):
"""Using the regions available at self.regions, use the GCP API
"""
Using the regions available at self.regions, use the GCP API
to retrieve a lookup dictionary of all available machine types.
"""
# Regular expression to determine if zone in region
Expand Down Expand Up @@ -374,7 +380,8 @@ def get_available_machine_types(self):
return machine_types

def _add_gpu(self, gpu_count):
"""Add a number of NVIDIA gpus to the current executor. This works
"""
Add a number of NVIDIA gpus to the current executor. This works
by way of adding nvidia_gpu to the job default resources, and also
changing the default machine type prefix to be n1, which is
the currently only supported instance type for using GPUs for LHS.
Expand All @@ -393,7 +400,8 @@ def _add_gpu(self, gpu_count):
self._machine_type_prefix = "n1"

def _set_preemptible_rules(self, preemption_default=None, preemptible_rules=None):
"""define a lookup dictionary for preemptible instance retries, which
"""
Define a lookup dictionary for preemptible instance retries, which
is supported by the Google Life Science API. The user can set a default
for all steps, specify per step, or define a default for all steps
that aren't individually customized.
Expand All @@ -418,7 +426,8 @@ def _set_preemptible_rules(self, preemption_default=None, preemptible_rules=None
rule.restart_times = restart_times

def _generate_job_resources(self, job):
"""given a particular job, generate the resources that it needs,
"""
Given a particular job, generate the resources that it needs,
including default regions and the virtual machine configuration
"""
# Right now, do a best effort mapping of resources to instance types
Expand Down Expand Up @@ -563,7 +572,8 @@ def _generate_job_resources(self, job):
return resources

def _get_accelerator(self, gpu_count, zone, gpu_model=None):
"""Get an appropriate accelerator for a GPU given a zone selection.
"""
Get an appropriate accelerator for a GPU given a zone selection.
Currently Google offers NVIDIA Tesla T4 (likely the best),
NVIDIA P100, and the same T4 for a graphical workstation. Since
this isn't a graphical workstation use case, we choose the
Expand Down Expand Up @@ -620,7 +630,8 @@ def get_snakefile(self):
return self.workflow.main_snakefile.replace(self.workdir, "").strip(os.sep)

def _set_workflow_sources(self):
"""We only add files from the working directory that are config related
"""
We only add files from the working directory that are config related
(e.g., the Snakefile or a config.yml equivalent), or checked into git.
"""
self.workflow_sources = []
Expand All @@ -640,7 +651,8 @@ def _set_workflow_sources(self):
)

def _generate_build_source_package(self):
"""in order for the instance to access the working directory in storage,
"""
In order for the instance to access the working directory in storage,
we need to upload it. This file is cleaned up at the end of the run.
We do this, and then obtain from the instance and extract.
"""
Expand Down Expand Up @@ -687,7 +699,8 @@ def _generate_build_source_package(self):
return hash_tar

def _upload_build_source_package(self, targz):
"""given a .tar.gz created for a workflow, upload it to source/cache
"""
Given a .tar.gz created for a workflow, upload it to source/cache
of Google storage, only if the blob doesn't already exist.
"""
from google.api_core import retry
Expand Down Expand Up @@ -728,7 +741,9 @@ def _generate_log_action(self, job):
return action

def _generate_job_action(self, job):
"""generate a single action to execute the job."""
"""
Generate a single action to execute the job.
"""
exec_job = self.format_job_exec(job)

# The full command to download the archive, extract, and run
Expand Down Expand Up @@ -764,7 +779,8 @@ def _get_jobname(self, job):
return "snakejob-%s-%s-%s" % (self.run_namespace, job.name, job.jobid)

def _generate_pipeline_labels(self, job):
"""generate basic labels to identify the job, namespace, and that
"""
Generate basic labels to identify the job, namespace, and that
snakemake is running the show!
"""
jobname = self._get_jobname(job)
Expand All @@ -788,7 +804,8 @@ def _generate_environment(self):
return envvars

def _generate_pipeline(self, job):
"""based on the job details, generate a google Pipeline object
"""
Based on the job details, generate a google Pipeline object
to pass to pipelines.run. This includes actions, resources,
environment, and timeout.
"""
Expand Down Expand Up @@ -864,7 +881,8 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
)

def _job_was_successful(self, status):
"""based on a status response (a [pipeline].projects.locations.operations.get
"""
Based on a status response (a [pipeline].projects.locations.operations.get
debug print the list of events, return True if all return codes 0
and False otherwise (indication of failure). In that a nonzero exit
status is found, we also debug print it for the user.
Expand Down Expand Up @@ -897,7 +915,8 @@ def _job_was_successful(self, status):
return success

def _retry_request(self, request, timeout=2, attempts=3):
"""The Google Python API client frequently has BrokenPipe errors. This
"""
The Google Python API client frequently has BrokenPipe errors. This
function takes a request, and executes it up to number of retry,
each time with a 2* increase in timeout.
Expand Down Expand Up @@ -936,7 +955,8 @@ def _retry_request(self, request, timeout=2, attempts=3):
raise ex

def _wait_for_jobs(self):
"""wait for jobs to complete. This means requesting their status,
"""
Wait for jobs to complete. This means requesting their status,
and then marking them as finished when a "done" parameter
shows up. Even for finished jobs, the status should still return
"""
Expand Down
11 changes: 11 additions & 0 deletions snakemake/io.py
Expand Up @@ -269,6 +269,14 @@ def wrapper(self, *args, **kwargs):
def inventory(self):
async_run(self._inventory())

async def _remote_inventory(self):
"""Force a cache update for the remote inventory, which always changes.
"""
cache = self.rule.workflow.iocache
if not self.is_remote or not cache.active:
return
await asyncio.gather(*[self.remote_object.inventory(cache)])

async def _inventory(self):
"""Starting from the given file, try to cache as much existence and
modification date information of this and other files as possible.
Expand Down Expand Up @@ -606,6 +614,9 @@ def upload_to_remote(self):
self.remote_object.upload()
logger.info("Finished upload.")

# Ensure we update the remote cache
async_run(self._remote_inventory())

def prepare(self):
path_until_wildcard = re.split(DYNAMIC_FILL, self.file)[0]
dir = os.path.dirname(path_until_wildcard)
Expand Down

0 comments on commit 6e10aa6

Please sign in to comment.