From 48d2dd99a745fd54b74b1435cbb7e41e0ee1b4ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Fri, 27 Aug 2021 13:58:57 +0200 Subject: [PATCH] fix: improved error handling for cluster status scripts and smarter job selector choice in case of cluster submission (use greedy for single jobs). (#1142) * fix: improved error handling for cluster status scripts and smarter job selector choice in case of cluster submission (use greedy for single jobs). * refactor: fmt * temporarily deactivate greedy fallback in case of a single job (see if this is the reason for the failure in GLS). * handle missing file in job reward. --- snakemake/executors/__init__.py | 47 ++++++++++++++----- snakemake/scheduler.py | 36 ++++++++++++-- snakemake/workflow.py | 7 +-- .../Snakefile.nonstandard | 32 +++++++++++++ .../expected-results/test.1.inter | 2 + .../expected-results/test.1.inter2 | 2 + .../expected-results/test.2.inter | 2 + .../expected-results/test.2.inter2 | 2 + .../expected-results/test.3.inter | 2 + .../expected-results/test.3.inter2 | 2 + .../expected-results/test.predictions | 10 ++++ tests/test_cluster_statusscript/qsub | 7 +++ tests/test_cluster_statusscript/status.sh | 1 + tests/test_cluster_statusscript/test.in | 1 + tests/tests.py | 11 +++++ 15 files changed, 143 insertions(+), 21 deletions(-) create mode 100644 tests/test_cluster_statusscript/Snakefile.nonstandard create mode 100644 tests/test_cluster_statusscript/expected-results/test.1.inter create mode 100644 tests/test_cluster_statusscript/expected-results/test.1.inter2 create mode 100644 tests/test_cluster_statusscript/expected-results/test.2.inter create mode 100644 tests/test_cluster_statusscript/expected-results/test.2.inter2 create mode 100644 tests/test_cluster_statusscript/expected-results/test.3.inter create mode 100644 tests/test_cluster_statusscript/expected-results/test.3.inter2 create mode 100644 tests/test_cluster_statusscript/expected-results/test.predictions create mode 100755 tests/test_cluster_statusscript/qsub create mode 100755 tests/test_cluster_statusscript/status.sh create mode 100644 tests/test_cluster_statusscript/test.in diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index 5c422ca5f..3293a9321 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -712,7 +712,7 @@ def __init__( self.active_jobs = list() self.lock = threading.Lock() self.wait = True - self.wait_thread = threading.Thread(target=self._wait_for_jobs) + self.wait_thread = threading.Thread(target=self._wait_thread) self.wait_thread.daemon = True self.wait_thread.start() @@ -722,6 +722,12 @@ def __init__( max_calls=self.max_status_checks_per_second, period=1 ) + def _wait_thread(self): + try: + self._wait_for_jobs() + except Exception as e: + self.workflow.scheduler.executor_error_callback(e) + def shutdown(self): with self.lock: self.wait = False @@ -1065,21 +1071,18 @@ def _wait_for_jobs(self): success = "success" failed = "failed" running = "running" + status_cmd_kills = set() if self.statuscmd is not None: - def job_status(job): + def job_status(job, valid_returns=["running", "success", "failed"]): try: # this command shall return "success", "failed" or "running" - return ( - subprocess.check_output( - "{statuscmd} {jobid}".format( - jobid=job.jobid, statuscmd=self.statuscmd - ), - shell=True, - ) - .decode() - .split("\n")[0] - ) + ret = subprocess.check_output( + "{statuscmd} {jobid}".format( + jobid=job.jobid, statuscmd=self.statuscmd + ), + shell=True, + ).decode() except subprocess.CalledProcessError as e: if e.returncode < 0: # Ignore SIGINT and all other issues due to signals @@ -1088,13 +1091,31 @@ def job_status(job): # snakemake. # Snakemake will handle the signal in # the main process. - pass + status_cmd_kills.add(e.returncode) + if len(status_cmd_kills) > 10: + logger.info( + "Cluster status command {} was killed >10 times with signal(s) {} " + "(if this happens unexpectedly during your workflow execution, " + "have a closer look.).".format( + self.statuscmd, ",".join(status_cmd_kills) + ) + ) + status_cmd_kills.clear() else: raise WorkflowError( "Failed to obtain job status. " "See above for error message." ) + ret = ret.strip().split("\n") + if len(ret) != 1 or ret[0] not in valid_returns: + raise WorkflowError( + "Cluster status command {} returned {} but just a single line with one of {} is expected.".format( + self.statuscmd, "\\n".join(ret), ",".join(valid_returns) + ) + ) + return ret[0] + else: def job_status(job): diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index c4bb6b0e4..b178ad76b 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -140,8 +140,10 @@ def __init__( self._lock = threading.Lock() self._errors = False + self._executor_error = None self._finished = False self._job_queue = None + self._last_job_selection_empty = False self._submit_callback = self._noop self._finish_callback = partial( self._proceed, @@ -391,6 +393,12 @@ def __init__( pass self._open_jobs.release() + def executor_error_callback(self, exception): + with self._lock: + self._executor_error = exception + # next scheduling round to catch and raise error + self._open_jobs.release() + @property def stats(self): try: @@ -435,16 +443,20 @@ def schedule(self): needrun = set(self.open_jobs) running = list(self.running) errors = self._errors + executor_error = self._executor_error user_kill = self._user_kill # handle errors - if user_kill or (not self.keepgoing and errors): + if user_kill or (not self.keepgoing and errors) or executor_error: if user_kill == "graceful": logger.info( "Will exit after finishing " "currently running jobs." ) - if not running: + if executor_error: + print_exception(executor_error, self.workflow.linemaps) + + if executor_error or not running: logger.info("Shutting down, this might take some time.") self._executor.shutdown() if not user_kill: @@ -474,7 +486,11 @@ def schedule(self): "Ready jobs ({}):\n\t".format(len(needrun)) + "\n\t".join(map(str, needrun)) ) + + if not self._last_job_selection_empty: + logger.info("Select jobs to execute...") run = self.job_selector(needrun) + self._last_job_selection_empty = not run logger.debug( "Selected jobs ({}):\n\t".format(len(run)) @@ -624,7 +640,11 @@ def job_selector_ilp(self, jobs): from pulp import lpSum from stopit import ThreadingTimeout as Timeout, TimeoutException - logger.info("Select jobs to execute...") + if len(jobs) == 1: + logger.debug( + "Using greedy selector because only single job has to be scheduled." + ) + return self.job_selector_greedy(jobs) with self._lock: if not self.resources["_cores"]: @@ -895,8 +915,14 @@ def job_reward(self, job): temp_size = 0 input_size = 0 else: - temp_size = self.dag.temp_size(job) - input_size = job.inputsize + try: + temp_size = self.dag.temp_size(job) + input_size = job.inputsize + except FileNotFoundError: + # If the file is not yet present, this shall not affect the + # job selection. + temp_size = 0 + input_size = 0 # Usually, this should guide the scheduler to first schedule all jobs # that remove the largest temp file, then the second largest and so on. diff --git a/snakemake/workflow.py b/snakemake/workflow.py index eef12b430..26f08aff6 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -217,6 +217,7 @@ def __init__( self.check_envvars = check_envvars self.max_threads = max_threads self.all_temp = all_temp + self.scheduler = None _globals = globals() _globals["workflow"] = self @@ -953,7 +954,7 @@ def files(items): self.persistence.conda_cleanup_envs() return True - scheduler = JobScheduler( + self.scheduler = JobScheduler( self, dag, local_cores=local_cores, @@ -1053,7 +1054,7 @@ def files(items): if not dryrun and not no_hooks: self._onstart(logger.get_logfile()) - success = scheduler.schedule() + success = self.scheduler.schedule() if not immediate_submit and not dryrun: dag.cleanup_workdir() @@ -1069,7 +1070,7 @@ def files(items): logger.remove_logfile() else: if stats: - scheduler.stats.to_json(stats) + self.scheduler.stats.to_json(stats) logger.logfile_hint() if not dryrun and not no_hooks: self._onsuccess(logger.get_logfile()) diff --git a/tests/test_cluster_statusscript/Snakefile.nonstandard b/tests/test_cluster_statusscript/Snakefile.nonstandard new file mode 100644 index 000000000..02a08c26f --- /dev/null +++ b/tests/test_cluster_statusscript/Snakefile.nonstandard @@ -0,0 +1,32 @@ +from snakemake import shell + +chromosomes = [1,2,3,4,5] + +envvars: + "TESTVAR" + + + +rule all: + input: 'test.predictions', 'test.2.inter2' + +rule compute1: + input: '{name}.in' + output: ['{name}.%s.inter'%c for c in chromosomes] + params: prefix="{name}" + run: + for out in output: + shell('(cat {input[0]} && echo "Part {out}") > {out}') + +rule compute2: + input: '{name}.{chromosome}.inter' + output: '{name}.{chromosome}.inter2' + params: test="a=b" + threads: workflow.cores * 0.5 + shell: 'echo copy; cp {input[0]} {output[0]}' + +rule gather: + input: ['{name}.%s.inter2'%c for c in chromosomes] + output: '{name}.predictions' + run: + shell('cat {} > {}'.format(' '.join(input), output[0])) diff --git a/tests/test_cluster_statusscript/expected-results/test.1.inter b/tests/test_cluster_statusscript/expected-results/test.1.inter new file mode 100644 index 000000000..5cc1d91f6 --- /dev/null +++ b/tests/test_cluster_statusscript/expected-results/test.1.inter @@ -0,0 +1,2 @@ +testz0r +Part test.1.inter diff --git a/tests/test_cluster_statusscript/expected-results/test.1.inter2 b/tests/test_cluster_statusscript/expected-results/test.1.inter2 new file mode 100644 index 000000000..5cc1d91f6 --- /dev/null +++ b/tests/test_cluster_statusscript/expected-results/test.1.inter2 @@ -0,0 +1,2 @@ +testz0r +Part test.1.inter diff --git a/tests/test_cluster_statusscript/expected-results/test.2.inter b/tests/test_cluster_statusscript/expected-results/test.2.inter new file mode 100644 index 000000000..8b02f7f0b --- /dev/null +++ b/tests/test_cluster_statusscript/expected-results/test.2.inter @@ -0,0 +1,2 @@ +testz0r +Part test.2.inter diff --git a/tests/test_cluster_statusscript/expected-results/test.2.inter2 b/tests/test_cluster_statusscript/expected-results/test.2.inter2 new file mode 100644 index 000000000..8b02f7f0b --- /dev/null +++ b/tests/test_cluster_statusscript/expected-results/test.2.inter2 @@ -0,0 +1,2 @@ +testz0r +Part test.2.inter diff --git a/tests/test_cluster_statusscript/expected-results/test.3.inter b/tests/test_cluster_statusscript/expected-results/test.3.inter new file mode 100644 index 000000000..5144542ec --- /dev/null +++ b/tests/test_cluster_statusscript/expected-results/test.3.inter @@ -0,0 +1,2 @@ +testz0r +Part test.3.inter diff --git a/tests/test_cluster_statusscript/expected-results/test.3.inter2 b/tests/test_cluster_statusscript/expected-results/test.3.inter2 new file mode 100644 index 000000000..5144542ec --- /dev/null +++ b/tests/test_cluster_statusscript/expected-results/test.3.inter2 @@ -0,0 +1,2 @@ +testz0r +Part test.3.inter diff --git a/tests/test_cluster_statusscript/expected-results/test.predictions b/tests/test_cluster_statusscript/expected-results/test.predictions new file mode 100644 index 000000000..7d97db630 --- /dev/null +++ b/tests/test_cluster_statusscript/expected-results/test.predictions @@ -0,0 +1,10 @@ +testz0r +Part test.1.inter +testz0r +Part test.2.inter +testz0r +Part test.3.inter +testz0r +Part test.4.inter +testz0r +Part test.5.inter diff --git a/tests/test_cluster_statusscript/qsub b/tests/test_cluster_statusscript/qsub new file mode 100755 index 000000000..0bc8aabba --- /dev/null +++ b/tests/test_cluster_statusscript/qsub @@ -0,0 +1,7 @@ +#!/bin/bash +echo `date` >> qsub.log +tail -n1 $1 >> qsub.log +# simulate printing of job id by a random number +echo $RANDOM +cat $1 >> qsub.log +sh $1 diff --git a/tests/test_cluster_statusscript/status.sh b/tests/test_cluster_statusscript/status.sh new file mode 100755 index 000000000..e9d4078f6 --- /dev/null +++ b/tests/test_cluster_statusscript/status.sh @@ -0,0 +1 @@ +echo success diff --git a/tests/test_cluster_statusscript/test.in b/tests/test_cluster_statusscript/test.in new file mode 100644 index 000000000..ce667834a --- /dev/null +++ b/tests/test_cluster_statusscript/test.in @@ -0,0 +1 @@ +testz0r diff --git a/tests/tests.py b/tests/tests.py index 1400f4d4a..763a5f77b 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -104,6 +104,17 @@ def test14(): run(dpath("test14"), snakefile="Snakefile.nonstandard", cluster="./qsub") +@skip_on_windows +def test_cluster_statusscript(): + os.environ["TESTVAR"] = "test" + run( + dpath("test_cluster_statusscript"), + snakefile="Snakefile.nonstandard", + cluster="./qsub", + cluster_status="./status.sh", + ) + + def test15(): run(dpath("test15"))