Skip to content

Commit

Permalink
fix: improved error handling for cluster status scripts and smarter j…
Browse files Browse the repository at this point in the history
…ob selector choice in case of cluster submission (use greedy for single jobs). (#1142)

* fix: improved error handling for cluster status scripts and smarter job selector choice in case of cluster submission (use greedy for single jobs).

* refactor: fmt

* temporarily deactivate greedy fallback in case of a single job (see if this is the reason for the failure in GLS).

* handle missing file in job reward.
  • Loading branch information
johanneskoester committed Aug 27, 2021
1 parent 68c13fd commit 48d2dd9
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 21 deletions.
47 changes: 34 additions & 13 deletions snakemake/executors/__init__.py
Expand Up @@ -712,7 +712,7 @@ def __init__(
self.active_jobs = list()
self.lock = threading.Lock()
self.wait = True
self.wait_thread = threading.Thread(target=self._wait_for_jobs)
self.wait_thread = threading.Thread(target=self._wait_thread)
self.wait_thread.daemon = True
self.wait_thread.start()

Expand All @@ -722,6 +722,12 @@ def __init__(
max_calls=self.max_status_checks_per_second, period=1
)

def _wait_thread(self):
try:
self._wait_for_jobs()
except Exception as e:
self.workflow.scheduler.executor_error_callback(e)

def shutdown(self):
with self.lock:
self.wait = False
Expand Down Expand Up @@ -1065,21 +1071,18 @@ def _wait_for_jobs(self):
success = "success"
failed = "failed"
running = "running"
status_cmd_kills = set()
if self.statuscmd is not None:

def job_status(job):
def job_status(job, valid_returns=["running", "success", "failed"]):
try:
# this command shall return "success", "failed" or "running"
return (
subprocess.check_output(
"{statuscmd} {jobid}".format(
jobid=job.jobid, statuscmd=self.statuscmd
),
shell=True,
)
.decode()
.split("\n")[0]
)
ret = subprocess.check_output(
"{statuscmd} {jobid}".format(
jobid=job.jobid, statuscmd=self.statuscmd
),
shell=True,
).decode()
except subprocess.CalledProcessError as e:
if e.returncode < 0:
# Ignore SIGINT and all other issues due to signals
Expand All @@ -1088,13 +1091,31 @@ def job_status(job):
# snakemake.
# Snakemake will handle the signal in
# the main process.
pass
status_cmd_kills.add(e.returncode)
if len(status_cmd_kills) > 10:
logger.info(
"Cluster status command {} was killed >10 times with signal(s) {} "
"(if this happens unexpectedly during your workflow execution, "
"have a closer look.).".format(
self.statuscmd, ",".join(status_cmd_kills)
)
)
status_cmd_kills.clear()
else:
raise WorkflowError(
"Failed to obtain job status. "
"See above for error message."
)

ret = ret.strip().split("\n")
if len(ret) != 1 or ret[0] not in valid_returns:
raise WorkflowError(
"Cluster status command {} returned {} but just a single line with one of {} is expected.".format(
self.statuscmd, "\\n".join(ret), ",".join(valid_returns)
)
)
return ret[0]

else:

def job_status(job):
Expand Down
36 changes: 31 additions & 5 deletions snakemake/scheduler.py
Expand Up @@ -140,8 +140,10 @@ def __init__(
self._lock = threading.Lock()

self._errors = False
self._executor_error = None
self._finished = False
self._job_queue = None
self._last_job_selection_empty = False
self._submit_callback = self._noop
self._finish_callback = partial(
self._proceed,
Expand Down Expand Up @@ -391,6 +393,12 @@ def __init__(
pass
self._open_jobs.release()

def executor_error_callback(self, exception):
with self._lock:
self._executor_error = exception
# next scheduling round to catch and raise error
self._open_jobs.release()

@property
def stats(self):
try:
Expand Down Expand Up @@ -435,16 +443,20 @@ def schedule(self):
needrun = set(self.open_jobs)
running = list(self.running)
errors = self._errors
executor_error = self._executor_error
user_kill = self._user_kill

# handle errors
if user_kill or (not self.keepgoing and errors):
if user_kill or (not self.keepgoing and errors) or executor_error:
if user_kill == "graceful":
logger.info(
"Will exit after finishing " "currently running jobs."
)

if not running:
if executor_error:
print_exception(executor_error, self.workflow.linemaps)

if executor_error or not running:
logger.info("Shutting down, this might take some time.")
self._executor.shutdown()
if not user_kill:
Expand Down Expand Up @@ -474,7 +486,11 @@ def schedule(self):
"Ready jobs ({}):\n\t".format(len(needrun))
+ "\n\t".join(map(str, needrun))
)

if not self._last_job_selection_empty:
logger.info("Select jobs to execute...")
run = self.job_selector(needrun)
self._last_job_selection_empty = not run

logger.debug(
"Selected jobs ({}):\n\t".format(len(run))
Expand Down Expand Up @@ -624,7 +640,11 @@ def job_selector_ilp(self, jobs):
from pulp import lpSum
from stopit import ThreadingTimeout as Timeout, TimeoutException

logger.info("Select jobs to execute...")
if len(jobs) == 1:
logger.debug(
"Using greedy selector because only single job has to be scheduled."
)
return self.job_selector_greedy(jobs)

with self._lock:
if not self.resources["_cores"]:
Expand Down Expand Up @@ -895,8 +915,14 @@ def job_reward(self, job):
temp_size = 0
input_size = 0
else:
temp_size = self.dag.temp_size(job)
input_size = job.inputsize
try:
temp_size = self.dag.temp_size(job)
input_size = job.inputsize
except FileNotFoundError:
# If the file is not yet present, this shall not affect the
# job selection.
temp_size = 0
input_size = 0

# Usually, this should guide the scheduler to first schedule all jobs
# that remove the largest temp file, then the second largest and so on.
Expand Down
7 changes: 4 additions & 3 deletions snakemake/workflow.py
Expand Up @@ -217,6 +217,7 @@ def __init__(
self.check_envvars = check_envvars
self.max_threads = max_threads
self.all_temp = all_temp
self.scheduler = None

_globals = globals()
_globals["workflow"] = self
Expand Down Expand Up @@ -953,7 +954,7 @@ def files(items):
self.persistence.conda_cleanup_envs()
return True

scheduler = JobScheduler(
self.scheduler = JobScheduler(
self,
dag,
local_cores=local_cores,
Expand Down Expand Up @@ -1053,7 +1054,7 @@ def files(items):
if not dryrun and not no_hooks:
self._onstart(logger.get_logfile())

success = scheduler.schedule()
success = self.scheduler.schedule()

if not immediate_submit and not dryrun:
dag.cleanup_workdir()
Expand All @@ -1069,7 +1070,7 @@ def files(items):
logger.remove_logfile()
else:
if stats:
scheduler.stats.to_json(stats)
self.scheduler.stats.to_json(stats)
logger.logfile_hint()
if not dryrun and not no_hooks:
self._onsuccess(logger.get_logfile())
Expand Down
32 changes: 32 additions & 0 deletions tests/test_cluster_statusscript/Snakefile.nonstandard
@@ -0,0 +1,32 @@
from snakemake import shell

chromosomes = [1,2,3,4,5]

envvars:
"TESTVAR"



rule all:
input: 'test.predictions', 'test.2.inter2'

rule compute1:
input: '{name}.in'
output: ['{name}.%s.inter'%c for c in chromosomes]
params: prefix="{name}"
run:
for out in output:
shell('(cat {input[0]} && echo "Part {out}") > {out}')

rule compute2:
input: '{name}.{chromosome}.inter'
output: '{name}.{chromosome}.inter2'
params: test="a=b"
threads: workflow.cores * 0.5
shell: 'echo copy; cp {input[0]} {output[0]}'

rule gather:
input: ['{name}.%s.inter2'%c for c in chromosomes]
output: '{name}.predictions'
run:
shell('cat {} > {}'.format(' '.join(input), output[0]))
2 changes: 2 additions & 0 deletions tests/test_cluster_statusscript/expected-results/test.1.inter
@@ -0,0 +1,2 @@
testz0r
Part test.1.inter
@@ -0,0 +1,2 @@
testz0r
Part test.1.inter
2 changes: 2 additions & 0 deletions tests/test_cluster_statusscript/expected-results/test.2.inter
@@ -0,0 +1,2 @@
testz0r
Part test.2.inter
@@ -0,0 +1,2 @@
testz0r
Part test.2.inter
2 changes: 2 additions & 0 deletions tests/test_cluster_statusscript/expected-results/test.3.inter
@@ -0,0 +1,2 @@
testz0r
Part test.3.inter
@@ -0,0 +1,2 @@
testz0r
Part test.3.inter
10 changes: 10 additions & 0 deletions tests/test_cluster_statusscript/expected-results/test.predictions
@@ -0,0 +1,10 @@
testz0r
Part test.1.inter
testz0r
Part test.2.inter
testz0r
Part test.3.inter
testz0r
Part test.4.inter
testz0r
Part test.5.inter
7 changes: 7 additions & 0 deletions tests/test_cluster_statusscript/qsub
@@ -0,0 +1,7 @@
#!/bin/bash
echo `date` >> qsub.log
tail -n1 $1 >> qsub.log
# simulate printing of job id by a random number
echo $RANDOM
cat $1 >> qsub.log
sh $1
1 change: 1 addition & 0 deletions tests/test_cluster_statusscript/status.sh
@@ -0,0 +1 @@
echo success
1 change: 1 addition & 0 deletions tests/test_cluster_statusscript/test.in
@@ -0,0 +1 @@
testz0r
11 changes: 11 additions & 0 deletions tests/tests.py
Expand Up @@ -104,6 +104,17 @@ def test14():
run(dpath("test14"), snakefile="Snakefile.nonstandard", cluster="./qsub")


@skip_on_windows
def test_cluster_statusscript():
os.environ["TESTVAR"] = "test"
run(
dpath("test_cluster_statusscript"),
snakefile="Snakefile.nonstandard",
cluster="./qsub",
cluster_status="./status.sh",
)


def test15():
run(dpath("test15"))

Expand Down

0 comments on commit 48d2dd9

Please sign in to comment.