Skip to content

Commit

Permalink
Memoizing all job ids.
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Feb 15, 2022
1 parent f26f762 commit 5d739a4
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions snakemake/executors/__init__.py
Expand Up @@ -949,6 +949,9 @@ def __init__(
self.cancelcmd = cancelcmd
self.cancelnargs = cancelnargs
self.external_jobid = dict()
# We need to collect all external ids so we can properly cancel even if
# the status update queue is running.
self.all_ext_jobids = list()

super().__init__(
workflow,
Expand Down Expand Up @@ -988,16 +991,7 @@ def _chunks(lst, n):
if self.cancelcmd: # We have --cluster-[m]cancel
# Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None)
# then pass all job ids at once
logger.info(
"%s waiting for lock"
% datetime.datetime.now().strftime("%Y%m%d%H%M.%S")
)
with self.lock:
logger.info(
"%s acquired jobs"
% datetime.datetime.now().strftime("%Y%m%d%H%M.%S")
)
jobids = [j.jobid for j in self.active_jobs]
jobids = list(self.all_ext_jobids)
logger.info(
"%s received logs: %s"
% (datetime.datetime.now().strftime("%Y%m%d%H%M.%S"), jobids)
Expand Down Expand Up @@ -1055,6 +1049,11 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
)
submit_callback(job)
with self.lock:
logger.info(
"%s appending job: %s"
% (datetime.datetime.now().strftime("%Y%m%d%H%M.%S"), ext_jobid)
)
self.all_ext_jobids.append(ext_jobid)
self.active_jobs.append(
GenericClusterJob(
job,
Expand Down Expand Up @@ -1110,6 +1109,11 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
submit_callback(job)

with self.lock:
logger.info(
"%s appending job: %s"
% (datetime.datetime.now().strftime("%Y%m%d%H%M.%S"), ext_jobid)
)
self.all_ext_jobids.append(ext_jobid)
self.active_jobs.append(
GenericClusterJob(
job,
Expand Down

0 comments on commit 5d739a4

Please sign in to comment.