Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding --cluster-cancel and --cluster-cancel-nargs #1395

Merged
merged 4 commits into from Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/executing/cluster.rst
Expand Up @@ -48,8 +48,7 @@ and forward it to the cluster scheduler:

In order to avoid specifying ``runtime_min`` for each rule, you can make use of the ``--default-resources`` flag, see ``snakemake --help``.

If your cluster system supports `DRMAA <https://www.drmaa.org/>`_, Snakemake can make use of that to increase the control over jobs.
E.g. jobs can be cancelled upon pressing ``Ctrl+C``, which is not possible with the generic ``--cluster`` support.
If your cluster system supports `DRMAA <https://www.drmaa.org/>`_, Snakemake can make use of that to control jobs.
With DRMAA, no ``qsub`` command needs to be provided, but system specific arguments can still be given as a string, e.g.

.. code-block:: console
Expand All @@ -61,6 +60,13 @@ Else, the arguments will be interpreted as part of the normal Snakemake argument

Adapting to a specific cluster can involve quite a lot of options. It is therefore a good idea to setup a :ref:`a profile <profiles>`.

.. note::
Are you using the SLURM job scheduler?

In this case, it will be more robust to use the ``--cluster``, ``--cluster-status``, and ``--cluster-cancel`` arguments than using DRMAA.
The reason is that the slurm-drmaa package is not maintained by the SLURM vendor SchedMD and less well supported.
Effectively, you will run into timeouts in DRMAA calls sooner.

--------------
Job Properties
--------------
Expand Down
7 changes: 7 additions & 0 deletions docs/tutorial/additional_features.rst
Expand Up @@ -272,6 +272,13 @@ To use this script call snakemake similar to below, where ``status.py`` is the s
$ snakemake all --jobs 100 --cluster "sbatch --cpus-per-task=1 --parsable" --cluster-status ./status.py


Using --cluster-cancel
::::::::::::::::::::::

When snakemake is terminated by pressing ``Ctrl-C``, it will cancel all currently running node when using ``--drmaa``.
You can get the same behaviour with ``--cluster`` by adding ``--cluster-cancel`` and passing a command to use for canceling jobs by their jobid (e.g., ``scancel`` for SLURM or ``qdel`` for SGE).
Most job schedulers can be passed multiple jobids and you can use ``--cluster-cancel-nargs`` to limit the number of arguments (default is 1000 which is reasonable for most schedulers).

Constraining wildcards
::::::::::::::::::::::

Expand Down
28 changes: 26 additions & 2 deletions snakemake/__init__.py
Expand Up @@ -166,6 +166,8 @@ def snakemake(
tibanna_config=False,
assume_shared_fs=True,
cluster_status=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
export_cwl=None,
show_failed_logs=False,
keep_incomplete=False,
Expand Down Expand Up @@ -296,6 +298,8 @@ def snakemake(
tibanna_config (list): Additional tibanna config e.g. --tibanna-config spot_instance=true subnet=<subnet_id> security group=<security_group_id>
assume_shared_fs (bool): assume that cluster nodes share a common filesystem (default true).
cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as a single argument.
cluster_cancel (str): command to cancel multiple job IDs (like SLURM 'scancel') (default None)
cluster_cancel_nargs (int): maximal number of job ids to pass to cluster_cancel (default 1000)
export_cwl (str): Compile workflow to CWL and save to given file
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
keep_incomplete (bool): keep incomplete output files of failed jobs
Expand Down Expand Up @@ -691,6 +695,8 @@ def snakemake(
tibanna_config=tibanna_config,
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second,
overwrite_groups=overwrite_groups,
Expand Down Expand Up @@ -777,6 +783,8 @@ def snakemake(
conda_create_envs_only=conda_create_envs_only,
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
report=report,
report_stylesheet=report_stylesheet,
export_cwl=export_cwl,
Expand Down Expand Up @@ -2131,6 +2139,19 @@ def get_argument_parser(profile=None):
"'success' if the job was successfull, 'failed' if the job failed and "
"'running' if the job still runs.",
)
group_cluster.add_argument(
"--cluster-cancel",
default=None,
help="Specify a command that allows to stop currently running jobs. "
"The command will be passed a single argument, the job id.",
)
group_cluster.add_argument(
"--cluster-cancel-nargs",
type=int,
default=1000,
help="Specify maximal number of job ids to pass to --cluster-cancel "
"command, defaults to 1000.",
)
group_cluster.add_argument(
"--drmaa-log-dir",
metavar="DIR",
Expand Down Expand Up @@ -2403,8 +2424,9 @@ def adjust_path(f):
args.cluster_config = adjust_path(args.cluster_config)
if args.cluster_sync:
args.cluster_sync = adjust_path(args.cluster_sync)
if args.cluster_status:
args.cluster_status = adjust_path(args.cluster_status)
for key in "cluster_status", "cluster_cancel":
if getattr(args, key):
setattr(args, key, adjust_path(getattr(args, key)))
if args.report_stylesheet:
args.report_stylesheet = adjust_path(args.report_stylesheet)

Expand Down Expand Up @@ -2876,6 +2898,8 @@ def open_browser():
default_remote_prefix=args.default_remote_prefix,
assume_shared_fs=not args.no_shared_fs,
cluster_status=args.cluster_status,
cluster_cancel=args.cluster_cancel,
cluster_cancel_nargs=args.cluster_cancel_nargs,
export_cwl=args.export_cwl,
show_failed_logs=args.show_failed_logs,
keep_incomplete=args.keep_incomplete,
Expand Down
45 changes: 42 additions & 3 deletions snakemake/executors/__init__.py
Expand Up @@ -108,7 +108,7 @@ def get_set_resources_args(self):
"{}:{}={}".format(rule, name, value)
for rule, res in self.workflow.overwrite_resources.items()
for name, value in res.items()
),
)
)
return ""

Expand Down Expand Up @@ -923,6 +923,8 @@ def __init__(
cores,
submitcmd="qsub",
statuscmd=None,
cancelcmd=None,
cancelnargs=None,
cluster_config=None,
jobname="snakejob.{rulename}.{jobid}.sh",
printreason=False,
Expand All @@ -944,7 +946,12 @@ def __init__(
)

self.statuscmd = statuscmd
self.cancelcmd = cancelcmd
self.cancelnargs = cancelnargs
self.external_jobid = dict()
# We need to collect all external ids so we can properly cancel even if
# the status update queue is running.
self.all_ext_jobids = list()

super().__init__(
workflow,
Expand Down Expand Up @@ -976,8 +983,38 @@ def __init__(
)

def cancel(self):
logger.info("Will exit after finishing currently running jobs.")
self.shutdown()
def _chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]

if self.cancelcmd: # We have --cluster-[m]cancel
# Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None)
# then pass all job ids at once
jobids = list(self.all_ext_jobids)
chunks = list(_chunks(jobids, self.cancelnargs or len(jobids)))
# Go through the chunks and cancel the jobs, warn in case of failures.
failures = 0
for chunk in chunks:
try:
cancel_timeout = 2 # rather fail on timeout than miss canceling all
subprocess.check_call(
[self.cancelcmd] + chunk, shell=False, timeout=cancel_timeout
)
except subprocess.SubprocessError:
failures += 1
if failures:
logger.info(
(
"{} out of {} calls to --cluster-cancel failed. This is safe to "
"ignore in most cases."
).format(failures, len(chunks))
)
else:
logger.info(
"No --cluster-cancel given. Will exit after finishing currently running jobs."
)
self.shutdown()

def register_job(self, job):
# Do not register job here.
Expand Down Expand Up @@ -1008,6 +1045,7 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
)
submit_callback(job)
with self.lock:
self.all_ext_jobids.append(ext_jobid)
self.active_jobs.append(
GenericClusterJob(
job,
Expand Down Expand Up @@ -1063,6 +1101,7 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
submit_callback(job)

with self.lock:
self.all_ext_jobids.append(ext_jobid)
self.active_jobs.append(
GenericClusterJob(
job,
Expand Down
17 changes: 8 additions & 9 deletions snakemake/scheduler.py
Expand Up @@ -4,6 +4,7 @@
__license__ = "MIT"

import os, signal, sys
import datetime
import threading
import operator
import time
Expand Down Expand Up @@ -63,6 +64,8 @@ def __init__(
cluster_status=None,
cluster_config=None,
cluster_sync=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
drmaa=None,
drmaa_log_dir=None,
kubernetes=None,
Expand Down Expand Up @@ -194,6 +197,8 @@ def __init__(
constructor = partial(
GenericClusterExecutor,
statuscmd=cluster_status,
cancelcmd=cluster_cancel,
cancelnargs=cluster_cancel_nargs,
max_status_checks_per_second=max_status_checks_per_second,
)

Expand Down Expand Up @@ -449,7 +454,7 @@ def schedule(self):
if user_kill or (not self.keepgoing and errors) or executor_error:
if user_kill == "graceful":
logger.info(
"Will exit after finishing " "currently running jobs."
"Will exit after finishing currently running jobs (scheduler)."
)

if executor_error:
Expand Down Expand Up @@ -581,10 +586,7 @@ def _free_resources(self, job):
value = self.calc_resource(name, value)
self.resources[name] += value

def _proceed(
self,
job,
):
def _proceed(self, job):
"""Do stuff after job is finished."""
with self._lock:
self._tofinish.append(job)
Expand Down Expand Up @@ -653,10 +655,7 @@ def job_selector_ilp(self, jobs):
# assert self.resources["_cores"] > 0
scheduled_jobs = {
job: pulp.LpVariable(
"job_{}".format(idx),
lowBound=0,
upBound=1,
cat=pulp.LpInteger,
"job_{}".format(idx), lowBound=0, upBound=1, cat=pulp.LpInteger
)
for idx, job in enumerate(jobs)
}
Expand Down
4 changes: 4 additions & 0 deletions snakemake/workflow.py
Expand Up @@ -615,6 +615,8 @@ def execute(
conda_create_envs_only=False,
assume_shared_fs=True,
cluster_status=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
report=None,
report_stylesheet=None,
export_cwl=False,
Expand Down Expand Up @@ -982,6 +984,8 @@ def files(items):
touch=touch,
cluster=cluster,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_config=cluster_config,
cluster_sync=cluster_sync,
jobname=jobname,
Expand Down
24 changes: 18 additions & 6 deletions tests/common.py
Expand Up @@ -4,8 +4,11 @@
__license__ = "MIT"

import os
import signal
import sys
import shlex
import shutil
import time
from os.path import join
import tempfile
import hashlib
Expand Down Expand Up @@ -98,6 +101,7 @@ def run(
targets=None,
container_image=os.environ.get("CONTAINER_IMAGE", "snakemake/snakemake:latest"),
shellcmd=None,
sigint_after=None,
**params,
):
"""
Expand Down Expand Up @@ -156,16 +160,24 @@ def run(
raise ValueError("shellcmd does not start with snakemake")
shellcmd = "{} -m {}".format(sys.executable, shellcmd)
try:
subprocess.check_output(
shellcmd,
cwd=path if no_tmpdir else tmpdir,
shell=True,
)
success = True
if sigint_after is None:
subprocess.check_output(
shellcmd, cwd=path if no_tmpdir else tmpdir, shell=True
)
success = True
else:
with subprocess.Popen(
shlex.split(shellcmd), cwd=path if no_tmpdir else tmpdir
) as process:
time.sleep(sigint_after)
process.send_signal(signal.SIGINT)
time.sleep(2)
success = process.returncode == 0
except subprocess.CalledProcessError as e:
success = False
print(e.stderr, file=sys.stderr)
else:
assert sigint_after is None, "Cannot sent SIGINT when calling directly"
success = snakemake(
snakefile=original_snakefile if no_tmpdir else snakefile,
cores=cores,
Expand Down
13 changes: 13 additions & 0 deletions tests/test_cluster_cancelscript/Snakefile.nonstandard
@@ -0,0 +1,13 @@
from snakemake import shell


rule all:
input: 'f.1', 'f.2'

rule one:
output: 'f.1'
shell: "sleep 120s; touch {output}"

rule two:
output: 'f.2'
shell: "sleep 120s; touch {output}"
@@ -0,0 +1,2 @@
cancel
cancel
10 changes: 10 additions & 0 deletions tests/test_cluster_cancelscript/sbatch
@@ -0,0 +1,10 @@
#!/bin/bash
set -x
echo --sbatch-- >> sbatch.log
echo `date` >> sbatch.log
tail -n1 $1 >> sbatch.log
cat $1 >> sbatch.log
# daemonize job script
nohup sh $1 0<&- &>/dev/null &
# print PID for job number
echo $!
20 changes: 20 additions & 0 deletions tests/test_cluster_cancelscript/scancel.sh
@@ -0,0 +1,20 @@
#!/bin/bash
set -x
echo `date`
echo cancel $* >>scancel.txt

list_descendants ()
{
local children=$(ps -o pid= --ppid "$1")

for pid in $children
do
list_descendants "$pid"
done

echo "$children"
}

for x in $*; do
kill $(list_descendants $x)
done
1 change: 1 addition & 0 deletions tests/test_cluster_cancelscript/status.sh
@@ -0,0 +1 @@
echo running
1 change: 1 addition & 0 deletions tests/test_cluster_cancelscript/test.in
@@ -0,0 +1 @@
testz0r