Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improved error handling for cluster status scripts and smarter job selector choice in case of cluster submission (use greedy for single jobs). #1142

Merged
merged 6 commits into from Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 34 additions & 13 deletions snakemake/executors/__init__.py
Expand Up @@ -712,7 +712,7 @@ def __init__(
self.active_jobs = list()
self.lock = threading.Lock()
self.wait = True
self.wait_thread = threading.Thread(target=self._wait_for_jobs)
self.wait_thread = threading.Thread(target=self._wait_thread)
self.wait_thread.daemon = True
self.wait_thread.start()

Expand All @@ -722,6 +722,12 @@ def __init__(
max_calls=self.max_status_checks_per_second, period=1
)

def _wait_thread(self):
try:
self._wait_for_jobs()
except Exception as e:
self.workflow.scheduler.executor_error_callback(e)

def shutdown(self):
with self.lock:
self.wait = False
Expand Down Expand Up @@ -1065,21 +1071,18 @@ def _wait_for_jobs(self):
success = "success"
failed = "failed"
running = "running"
status_cmd_kills = set()
if self.statuscmd is not None:

def job_status(job):
def job_status(job, valid_returns=["running", "success", "failed"]):
try:
# this command shall return "success", "failed" or "running"
return (
subprocess.check_output(
"{statuscmd} {jobid}".format(
jobid=job.jobid, statuscmd=self.statuscmd
),
shell=True,
)
.decode()
.split("\n")[0]
)
ret = subprocess.check_output(
"{statuscmd} {jobid}".format(
jobid=job.jobid, statuscmd=self.statuscmd
),
shell=True,
).decode()
except subprocess.CalledProcessError as e:
if e.returncode < 0:
# Ignore SIGINT and all other issues due to signals
Expand All @@ -1088,13 +1091,31 @@ def job_status(job):
# snakemake.
# Snakemake will handle the signal in
# the main process.
pass
status_cmd_kills.add(e.returncode)
if len(status_cmd_kills) > 10:
logger.info(
"Cluster status command {} was killed >10 times with signal(s) {} "
"(if this happens unexpectedly during your workflow execution, "
"have a closer look.).".format(
self.statuscmd, ",".join(status_cmd_kills)
)
)
status_cmd_kills.clear()
else:
raise WorkflowError(
"Failed to obtain job status. "
"See above for error message."
)

ret = ret.strip().split("\n")
if len(ret) != 1 or ret[0] not in valid_returns:
raise WorkflowError(
"Cluster status command {} returned {} but just a single line with one of {} is expected.".format(
self.statuscmd, "\\n".join(ret), ",".join(valid_returns)
)
)
return ret[0]

else:

def job_status(job):
Expand Down
36 changes: 31 additions & 5 deletions snakemake/scheduler.py
Expand Up @@ -140,8 +140,10 @@ def __init__(
self._lock = threading.Lock()

self._errors = False
self._executor_error = None
self._finished = False
self._job_queue = None
self._last_job_selection_empty = False
self._submit_callback = self._noop
self._finish_callback = partial(
self._proceed,
Expand Down Expand Up @@ -391,6 +393,12 @@ def __init__(
pass
self._open_jobs.release()

def executor_error_callback(self, exception):
with self._lock:
self._executor_error = exception
# next scheduling round to catch and raise error
self._open_jobs.release()

@property
def stats(self):
try:
Expand Down Expand Up @@ -435,16 +443,20 @@ def schedule(self):
needrun = set(self.open_jobs)
running = list(self.running)
errors = self._errors
executor_error = self._executor_error
user_kill = self._user_kill

# handle errors
if user_kill or (not self.keepgoing and errors):
if user_kill or (not self.keepgoing and errors) or executor_error:
if user_kill == "graceful":
logger.info(
"Will exit after finishing " "currently running jobs."
)

if not running:
if executor_error:
print_exception(executor_error, self.workflow.linemaps)

if executor_error or not running:
logger.info("Shutting down, this might take some time.")
self._executor.shutdown()
if not user_kill:
Expand Down Expand Up @@ -474,7 +486,11 @@ def schedule(self):
"Ready jobs ({}):\n\t".format(len(needrun))
+ "\n\t".join(map(str, needrun))
)

if not self._last_job_selection_empty:
logger.info("Select jobs to execute...")
run = self.job_selector(needrun)
self._last_job_selection_empty = not run

logger.debug(
"Selected jobs ({}):\n\t".format(len(run))
Expand Down Expand Up @@ -624,7 +640,11 @@ def job_selector_ilp(self, jobs):
from pulp import lpSum
from stopit import ThreadingTimeout as Timeout, TimeoutException

logger.info("Select jobs to execute...")
if len(jobs) == 1:
logger.debug(
"Using greedy selector because only single job has to be scheduled."
)
return self.job_selector_greedy(jobs)

with self._lock:
if not self.resources["_cores"]:
Expand Down Expand Up @@ -895,8 +915,14 @@ def job_reward(self, job):
temp_size = 0
input_size = 0
else:
temp_size = self.dag.temp_size(job)
input_size = job.inputsize
try:
temp_size = self.dag.temp_size(job)
input_size = job.inputsize
except FileNotFoundError:
# If the file is not yet present, this shall not affect the
# job selection.
temp_size = 0
input_size = 0

# Usually, this should guide the scheduler to first schedule all jobs
# that remove the largest temp file, then the second largest and so on.
Expand Down
7 changes: 4 additions & 3 deletions snakemake/workflow.py
Expand Up @@ -217,6 +217,7 @@ def __init__(
self.check_envvars = check_envvars
self.max_threads = max_threads
self.all_temp = all_temp
self.scheduler = None

_globals = globals()
_globals["workflow"] = self
Expand Down Expand Up @@ -953,7 +954,7 @@ def files(items):
self.persistence.conda_cleanup_envs()
return True

scheduler = JobScheduler(
self.scheduler = JobScheduler(
self,
dag,
local_cores=local_cores,
Expand Down Expand Up @@ -1053,7 +1054,7 @@ def files(items):
if not dryrun and not no_hooks:
self._onstart(logger.get_logfile())

success = scheduler.schedule()
success = self.scheduler.schedule()

if not immediate_submit and not dryrun:
dag.cleanup_workdir()
Expand All @@ -1069,7 +1070,7 @@ def files(items):
logger.remove_logfile()
else:
if stats:
scheduler.stats.to_json(stats)
self.scheduler.stats.to_json(stats)
logger.logfile_hint()
if not dryrun and not no_hooks:
self._onsuccess(logger.get_logfile())
Expand Down
32 changes: 32 additions & 0 deletions tests/test_cluster_statusscript/Snakefile.nonstandard
@@ -0,0 +1,32 @@
from snakemake import shell

chromosomes = [1,2,3,4,5]

envvars:
"TESTVAR"



rule all:
input: 'test.predictions', 'test.2.inter2'

rule compute1:
input: '{name}.in'
output: ['{name}.%s.inter'%c for c in chromosomes]
params: prefix="{name}"
run:
for out in output:
shell('(cat {input[0]} && echo "Part {out}") > {out}')

rule compute2:
input: '{name}.{chromosome}.inter'
output: '{name}.{chromosome}.inter2'
params: test="a=b"
threads: workflow.cores * 0.5
shell: 'echo copy; cp {input[0]} {output[0]}'

rule gather:
input: ['{name}.%s.inter2'%c for c in chromosomes]
output: '{name}.predictions'
run:
shell('cat {} > {}'.format(' '.join(input), output[0]))
@@ -0,0 +1,2 @@
testz0r
Part test.1.inter
@@ -0,0 +1,2 @@
testz0r
Part test.1.inter
@@ -0,0 +1,2 @@
testz0r
Part test.2.inter
@@ -0,0 +1,2 @@
testz0r
Part test.2.inter
@@ -0,0 +1,2 @@
testz0r
Part test.3.inter
@@ -0,0 +1,2 @@
testz0r
Part test.3.inter
@@ -0,0 +1,10 @@
testz0r
Part test.1.inter
testz0r
Part test.2.inter
testz0r
Part test.3.inter
testz0r
Part test.4.inter
testz0r
Part test.5.inter
7 changes: 7 additions & 0 deletions tests/test_cluster_statusscript/qsub
@@ -0,0 +1,7 @@
#!/bin/bash
echo `date` >> qsub.log
tail -n1 $1 >> qsub.log
# simulate printing of job id by a random number
echo $RANDOM
cat $1 >> qsub.log
sh $1
1 change: 1 addition & 0 deletions tests/test_cluster_statusscript/status.sh
@@ -0,0 +1 @@
echo success
1 change: 1 addition & 0 deletions tests/test_cluster_statusscript/test.in
@@ -0,0 +1 @@
testz0r
11 changes: 11 additions & 0 deletions tests/tests.py
Expand Up @@ -104,6 +104,17 @@ def test14():
run(dpath("test14"), snakefile="Snakefile.nonstandard", cluster="./qsub")


@skip_on_windows
def test_cluster_statusscript():
os.environ["TESTVAR"] = "test"
run(
dpath("test_cluster_statusscript"),
snakefile="Snakefile.nonstandard",
cluster="./qsub",
cluster_status="./status.sh",
)


def test15():
run(dpath("test15"))

Expand Down