diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index 3293a9321..745f24315 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -1402,6 +1402,8 @@ def shutdown(self): def _wait_for_jobs(self): import drmaa + suspended_msg = set() + while True: with self.lock: if not self.wait: @@ -1412,9 +1414,7 @@ def _wait_for_jobs(self): for active_job in active_jobs: with self.status_rate_limiter: try: - retval = self.session.wait( - active_job.jobid, drmaa.Session.TIMEOUT_NO_WAIT - ) + retval = self.session.jobStatus(active_job.jobid) except drmaa.ExitTimeoutException as e: # job still active still_running.append(active_job) @@ -1427,20 +1427,40 @@ def _wait_for_jobs(self): os.remove(active_job.jobscript) active_job.error_callback(active_job.job) continue - # job exited - os.remove(active_job.jobscript) - if ( - not retval.wasAborted - and retval.hasExited - and retval.exitStatus == 0 - ): + if retval == drmaa.JobState.DONE: + os.remove(active_job.jobscript) active_job.callback(active_job.job) - else: + elif retval == drmaa.JobState.FAILED: + os.remove(active_job.jobscript) self.print_job_error(active_job.job) self.print_cluster_job_error( active_job, self.dag.jobid(active_job.job) ) active_job.error_callback(active_job.job) + else: + # still running + still_running.append(active_job) + + def handle_suspended(by): + if active_job.job.jobid not in suspended_msg: + logger.warning( + "Job {} (DRMAA id: {}) was suspended by {}.".format( + active_job.job.jobid, active_job.jobid, by + ) + ) + suspended_msg.add(active_job.job.jobid) + + if retval == drmaa.JobState.USER_SUSPENDED: + handle_suspended("user") + elif retval == drmaa.JobState.SYSTEM_SUSPENDED: + handle_suspended("system") + else: + try: + suspended_msg.remove(active_job.job.jobid) + except KeyError: + # there was nothing to remove + pass + with self.lock: self.active_jobs.extend(still_running) sleep()