diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index a3d48d9a6..b178ad76b 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -118,11 +118,6 @@ def __init__( self.scheduler_type = scheduler_type self.scheduler_ilp_solver = scheduler_ilp_solver self._tofinish = [] - self._toerror = [] - self.handle_job_success = True - self.update_resources = True - self.print_progress = not self.quiet and not self.dryrun - self.update_dynamic = not self.dryrun self.global_resources = { name: (sys.maxsize if res is None else res) @@ -150,7 +145,11 @@ def __init__( self._job_queue = None self._last_job_selection_empty = False self._submit_callback = self._noop - self._finish_callback = self._proceed + self._finish_callback = partial( + self._proceed, + update_dynamic=not self.dryrun, + print_progress=not self.quiet and not self.dryrun, + ) self._local_executor = None if dryrun: @@ -213,10 +212,13 @@ def __init__( keepmetadata=keepmetadata, ) if workflow.immediate_submit: - self.update_dynamic = False - self.print_progress = False - self.update_resources = False - self.handle_job_success = False + self._submit_callback = partial( + self._proceed, + update_dynamic=False, + print_progress=False, + update_resources=False, + handle_job_success=False, + ) else: self._executor = DRMAAExecutor( workflow, @@ -438,7 +440,6 @@ def schedule(self): # obtain needrun and running jobs in a thread-safe way with self._lock: self._finish_jobs() - self._error_jobs() needrun = set(self.open_jobs) running = list(self.running) errors = self._errors @@ -523,40 +524,10 @@ def schedule(self): def _finish_jobs(self): # must be called from within lock - for job in self._tofinish: - if self.handle_job_success: - try: - self.get_executor(job).handle_job_success(job) - except (RuleException, WorkflowError) as e: - # if an error occurs while processing job output, - # we do the same as in case of errors during execution - print_exception(e, self.workflow.linemaps) - self._handle_error(job) - return - - if self.update_resources: - # normal jobs have len=1, group jobs have len>1 - self.finished_jobs += len(job) - self.running.remove(job) - self._free_resources(job) - - if self.print_progress: - if job.is_group(): - for j in job: - logger.job_finished(jobid=j.jobid) - else: - logger.job_finished(jobid=job.jobid) - self.progress() - - self.dag.finish(job, update_dynamic=self.update_dynamic) + for job, update_dynamic in self._tofinish: + self.dag.finish(job, update_dynamic=update_dynamic) self._tofinish.clear() - def _error_jobs(self): - # must be called from within lock - for job in self._toerror: - self._handle_error(job) - self._toerror.clear() - def run(self, jobs, executor=None): if executor is None: executor = self._executor @@ -584,13 +555,42 @@ def _free_resources(self, job): def _proceed( self, job, + update_dynamic=True, + print_progress=False, + update_resources=True, + handle_job_success=True, ): """Do stuff after job is finished.""" with self._lock: - self._tofinish.append(job) + if handle_job_success: + # by calling this behind the lock, we avoid race conditions + try: + self.get_executor(job).handle_job_success(job) + except (RuleException, WorkflowError) as e: + # if an error occurs while processing job output, + # we do the same as in case of errors during execution + print_exception(e, self.workflow.linemaps) + self._handle_error(job) + return + + self._tofinish.append((job, update_dynamic)) + + if update_resources: + # normal jobs have len=1, group jobs have len>1 + self.finished_jobs += len(job) + self.running.remove(job) + self._free_resources(job) + + if print_progress: + if job.is_group(): + for j in job: + logger.job_finished(jobid=j.jobid) + else: + logger.job_finished(jobid=job.jobid) + self.progress() if self.dryrun: - if len(self.running) - len(self._tofinish) - len(self._toerror) <= 0: + if not self.running: # During dryrun, only release when all running jobs are done. # This saves a lot of time, as self.open_jobs has to be # evaluated less frequently. @@ -601,8 +601,7 @@ def _proceed( def _error(self, job): with self._lock: - self._toerror.append(job) - self._open_jobs.release() + self._handle_error(job) def _handle_error(self, job): """Clear jobs and stop the workflow. @@ -626,6 +625,7 @@ def _handle_error(self, job): self.failed.add(job) if self.keepgoing: logger.info("Job failed, going on with independent jobs.") + self._open_jobs.release() def exit_gracefully(self, *args): with self._lock: