From 41a5071b750dca5d7fceec324d81d9a93c86bdb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Fri, 24 Sep 2021 14:09:32 +0200 Subject: [PATCH] fix: potential memory corruption caused by Google storage objects accessed from different threads (#1174) * fix: memory corruption caused by Google storage objects accessed from different threads * invoke error job handling * clear toerror jobs --- snakemake/scheduler.py | 94 +++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index b178ad76b..a3d48d9a6 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -118,6 +118,11 @@ def __init__( self.scheduler_type = scheduler_type self.scheduler_ilp_solver = scheduler_ilp_solver self._tofinish = [] + self._toerror = [] + self.handle_job_success = True + self.update_resources = True + self.print_progress = not self.quiet and not self.dryrun + self.update_dynamic = not self.dryrun self.global_resources = { name: (sys.maxsize if res is None else res) @@ -145,11 +150,7 @@ def __init__( self._job_queue = None self._last_job_selection_empty = False self._submit_callback = self._noop - self._finish_callback = partial( - self._proceed, - update_dynamic=not self.dryrun, - print_progress=not self.quiet and not self.dryrun, - ) + self._finish_callback = self._proceed self._local_executor = None if dryrun: @@ -212,13 +213,10 @@ def __init__( keepmetadata=keepmetadata, ) if workflow.immediate_submit: - self._submit_callback = partial( - self._proceed, - update_dynamic=False, - print_progress=False, - update_resources=False, - handle_job_success=False, - ) + self.update_dynamic = False + self.print_progress = False + self.update_resources = False + self.handle_job_success = False else: self._executor = DRMAAExecutor( workflow, @@ -440,6 +438,7 @@ def schedule(self): # obtain needrun and running jobs in a thread-safe way with self._lock: self._finish_jobs() + self._error_jobs() needrun = set(self.open_jobs) running = list(self.running) errors = self._errors @@ -524,10 +523,40 @@ def schedule(self): def _finish_jobs(self): # must be called from within lock - for job, update_dynamic in self._tofinish: - self.dag.finish(job, update_dynamic=update_dynamic) + for job in self._tofinish: + if self.handle_job_success: + try: + self.get_executor(job).handle_job_success(job) + except (RuleException, WorkflowError) as e: + # if an error occurs while processing job output, + # we do the same as in case of errors during execution + print_exception(e, self.workflow.linemaps) + self._handle_error(job) + return + + if self.update_resources: + # normal jobs have len=1, group jobs have len>1 + self.finished_jobs += len(job) + self.running.remove(job) + self._free_resources(job) + + if self.print_progress: + if job.is_group(): + for j in job: + logger.job_finished(jobid=j.jobid) + else: + logger.job_finished(jobid=job.jobid) + self.progress() + + self.dag.finish(job, update_dynamic=self.update_dynamic) self._tofinish.clear() + def _error_jobs(self): + # must be called from within lock + for job in self._toerror: + self._handle_error(job) + self._toerror.clear() + def run(self, jobs, executor=None): if executor is None: executor = self._executor @@ -555,42 +584,13 @@ def _free_resources(self, job): def _proceed( self, job, - update_dynamic=True, - print_progress=False, - update_resources=True, - handle_job_success=True, ): """Do stuff after job is finished.""" with self._lock: - if handle_job_success: - # by calling this behind the lock, we avoid race conditions - try: - self.get_executor(job).handle_job_success(job) - except (RuleException, WorkflowError) as e: - # if an error occurs while processing job output, - # we do the same as in case of errors during execution - print_exception(e, self.workflow.linemaps) - self._handle_error(job) - return - - self._tofinish.append((job, update_dynamic)) - - if update_resources: - # normal jobs have len=1, group jobs have len>1 - self.finished_jobs += len(job) - self.running.remove(job) - self._free_resources(job) - - if print_progress: - if job.is_group(): - for j in job: - logger.job_finished(jobid=j.jobid) - else: - logger.job_finished(jobid=job.jobid) - self.progress() + self._tofinish.append(job) if self.dryrun: - if not self.running: + if len(self.running) - len(self._tofinish) - len(self._toerror) <= 0: # During dryrun, only release when all running jobs are done. # This saves a lot of time, as self.open_jobs has to be # evaluated less frequently. @@ -601,7 +601,8 @@ def _proceed( def _error(self, job): with self._lock: - self._handle_error(job) + self._toerror.append(job) + self._open_jobs.release() def _handle_error(self, job): """Clear jobs and stop the workflow. @@ -625,7 +626,6 @@ def _handle_error(self, job): self.failed.add(job) if self.keepgoing: logger.info("Job failed, going on with independent jobs.") - self._open_jobs.release() def exit_gracefully(self, *args): with self._lock: