Skip to content

Commit

Permalink
Adressing points from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Feb 15, 2022
1 parent 8970ce2 commit dc9a871
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 65 deletions.
10 changes: 8 additions & 2 deletions docs/executing/cluster.rst
Expand Up @@ -48,8 +48,7 @@ and forward it to the cluster scheduler:
In order to avoid specifying ``runtime_min`` for each rule, you can make use of the ``--default-resources`` flag, see ``snakemake --help``.

If your cluster system supports `DRMAA <https://www.drmaa.org/>`_, Snakemake can make use of that to increase the control over jobs.
E.g. jobs can be cancelled upon pressing ``Ctrl+C``, which is not possible with the generic ``--cluster`` support.
If your cluster system supports `DRMAA <https://www.drmaa.org/>`_, Snakemake can make use of that to control jobs.
With DRMAA, no ``qsub`` command needs to be provided, but system specific arguments can still be given as a string, e.g.

.. code-block:: console
Expand All @@ -61,6 +60,13 @@ Else, the arguments will be interpreted as part of the normal Snakemake argument

Adapting to a specific cluster can involve quite a lot of options. It is therefore a good idea to setup a :ref:`a profile <profiles>`.

.. note::
Are you using the SLURM job scheduler?

In this case, it will be more robust to use the ``--cluster``, ``--cluster-status``, and ``--cluster-cancel`` arguments than using DRMAA.
The reason is that the slurm-drmaa package is not maintained by the SLURM vendor SchedMD and less wells supporte.
Effectively, you will run into timeouts in DRMAA calls sooner.

--------------
Job Properties
--------------
Expand Down
9 changes: 4 additions & 5 deletions docs/tutorial/additional_features.rst
Expand Up @@ -272,13 +272,12 @@ To use this script call snakemake similar to below, where ``status.py`` is the s
$ snakemake all --jobs 100 --cluster "sbatch --cpus-per-task=1 --parsable" --cluster-status ./status.py
Using --cluster-cancel and --cluster-mcancel
::::::::::::::::::::::::::::::::::::::::::::
Using --cluster-cancel
::::::::::::::::::::::

When snakemake is terminated by pressing ``Ctrl-C``, it will cancel all currently running node when using ``--drmaa``.
You can get the same behaviour with ``--cluster`` by adding ``--cluster-cancel`` (respectively ``--cluster-mcancel``, both arguments are mutually exclusive).
You can pass a command to terminate single jobs by their ID to ``--cluster-cancel`` or a command to terminate multiple jobs to ``--cluster-mcancel``.
When using SLURM, for example, you would use `--cluster-mcancel "scancel"``.
You can get the same behaviour with ``--cluster`` by adding ``--cluster-cancel`` and passing a command to use for canceling jobs by their jobid (e.g., ``scancel`` for SLURM or ``qdel`` for SGE).
Most job schedulers can be passed multiple jobids and you can use ``--cluster-cancel-nargs`` to limit the number of arguments (default is 1000 which is reasonable for most schedulers).

Constraining wildcards
::::::::::::::::::::::
Expand Down
28 changes: 12 additions & 16 deletions snakemake/__init__.py
Expand Up @@ -167,7 +167,7 @@ def snakemake(
assume_shared_fs=True,
cluster_status=None,
cluster_cancel=None,
cluster_mcancel=None,
cluster_cancel_nargs=None,
export_cwl=None,
show_failed_logs=False,
keep_incomplete=False,
Expand Down Expand Up @@ -298,8 +298,8 @@ def snakemake(
tibanna_config (list): Additional tibanna config e.g. --tibanna-config spot_instance=true subnet=<subnet_id> security group=<security_group_id>
assume_shared_fs (bool): assume that cluster nodes share a common filesystem (default true).
cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as a single argument.
cluster_cancel (str): command to cancel multiple job IDs (like slurm 'scancel') (default None)
cluster_mcancel (str): command to cancel multiple job IDs (like slurm 'scancel') (default None)
cluster_cancel (str): command to cancel multiple job IDs (like SLURM 'scancel') (default None)
cluster_cancel_nargs (int): maximal number of job ids to pass to cluster_cancel (default 1000)
export_cwl (str): Compile workflow to CWL and save to given file
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
keep_incomplete (bool): keep incomplete output files of failed jobs
Expand Down Expand Up @@ -696,7 +696,7 @@ def snakemake(
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_mcancel=cluster_mcancel,
cluster_cancel_nargs=cluster_cancel_nargs,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second,
overwrite_groups=overwrite_groups,
Expand Down Expand Up @@ -784,7 +784,7 @@ def snakemake(
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_mcancel=cluster_mcancel,
cluster_cancel_nargs=cluster_cancel_nargs,
report=report,
report_stylesheet=report_stylesheet,
export_cwl=export_cwl,
Expand Down Expand Up @@ -2146,11 +2146,11 @@ def get_argument_parser(profile=None):
"The command will be passed a single argument, the job id.",
)
group_cluster.add_argument(
"--cluster-mcancel",
default=None,
help="Specify a command that can cancel multiple jobs at once. The "
"command will be passed a number of arguments, each one an id of a job "
"to terminate/cancel.",
"--cluster-cancel-nargs",
type=int,
default=1000,
help="Specify maximal number of job ids to pass to --cluster-cancel "
"command, defaults to 1000.",
)
group_cluster.add_argument(
"--drmaa-log-dir",
Expand Down Expand Up @@ -2424,7 +2424,7 @@ def adjust_path(f):
args.cluster_config = adjust_path(args.cluster_config)
if args.cluster_sync:
args.cluster_sync = adjust_path(args.cluster_sync)
for arg in "cluster_status", "cluster_cancel", "cluster_mcancel":
for arg in "cluster_status", "cluster_cancel", "cluster_cancel_nargs":
if getattr(args, arg):
setattr(args, arg, adjust_path(getattr(arg, arg)))
if args.report_stylesheet:
Expand Down Expand Up @@ -2558,10 +2558,6 @@ def parse_cores(cores):
if not os.path.isabs(args.drmaa_log_dir):
args.drmaa_log_dir = os.path.abspath(os.path.expanduser(args.drmaa_log_dir))

if args.cluster_cancel and args.cluster_mcancel:
print("--cancel and --mcancel are mutually exclusive!", file=sys.stderr)
sys.exit(1)

if args.runtime_profile:
import yappi

Expand Down Expand Up @@ -2903,7 +2899,7 @@ def open_browser():
assume_shared_fs=not args.no_shared_fs,
cluster_status=args.cluster_status,
cluster_cancel=args.cluster_cancel,
cluster_mcancel=args.cluster_mcancel,
cluster_cancel_nargs=args.cluster_cancel_nargs,
export_cwl=args.export_cwl,
show_failed_logs=args.show_failed_logs,
keep_incomplete=args.keep_incomplete,
Expand Down
51 changes: 28 additions & 23 deletions snakemake/executors/__init__.py
Expand Up @@ -924,7 +924,7 @@ def __init__(
submitcmd="qsub",
statuscmd=None,
cancelcmd=None,
mcancelcmd=None,
cancelnargs=None,
cluster_config=None,
jobname="snakejob.{rulename}.{jobid}.sh",
printreason=False,
Expand All @@ -947,8 +947,11 @@ def __init__(

self.statuscmd = statuscmd
self.cancelcmd = cancelcmd
self.mcancelcmd = mcancelcmd
self.cancelnargs = cancelnargs
self.external_jobid = dict()
# We need to collect all external ids so we can properly cancel even if
# the status update queue is running.
self.all_ext_jobids = list()

super().__init__(
workflow,
Expand Down Expand Up @@ -980,36 +983,36 @@ def __init__(
)

def cancel(self):
# maximum number of jobs to cancel at once
max_mcancel = 1000

def _chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]

if self.cancelcmd or self.mcancelcmd: # We have --cluster-[m]cancel
# Enumerate job IDs and create chunks. Set size to 1 if not mcancel, else
# limit to a reasonable size (few cancel commands but not too long command
# line).
jobids = [j.jobid for j in self.active_jobs]
if self.mcancelcmd:
cmd = self.mcancelcmd
chunks = list(_chunks(jobids, max_mcancel))
else:
cmd = self.cancelcmd
chunks = list(_chunks(jobids, 1))
# Go through the chunks and cancel the jobs.
if self.cancelcmd: # We have --cluster-[m]cancel
# Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None)
# then pass all job ids at once
jobids = list(self.all_ext_jobids)
chunks = list(_chunks(jobids, self.cancelnargs or len(jobids)))
# Go through the chunks and cancel the jobs, warn in case of failures.
failures = 0
for chunk in chunks:
try:
subprocess.Popen([cmd] + chunk, shell=False)
except:
# This is common - logging a warning would probably confuse the user.
pass
cancel_timeout = 2 # rather fail on timeout than miss canceling all
subprocess.check_call(
[self.cancelcmd] + chunk, shell=False, timeout=cancel_timeout
)
except subprocess.SubprocessError:
failures += 1
if failures:
logger.info(
(
"{} out of {} calls to --cluster-cancel failed. This is safe to "
"ignore in most cases."
).format(failures, len(chunks))
)
else:
logger.info(
"No --cluster-cancel/--cluster-mcancel given. "
"Will exit after finishing currently running jobs."
"No --cluster-cancel given. Will exit after finishing currently running jobs."
)
self.shutdown()

Expand Down Expand Up @@ -1042,6 +1045,7 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
)
submit_callback(job)
with self.lock:
self.all_ext_jobids.append(ext_jobid)
self.active_jobs.append(
GenericClusterJob(
job,
Expand Down Expand Up @@ -1097,6 +1101,7 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
submit_callback(job)

with self.lock:
self.all_ext_jobids.append(ext_jobid)
self.active_jobs.append(
GenericClusterJob(
job,
Expand Down
5 changes: 3 additions & 2 deletions snakemake/scheduler.py
Expand Up @@ -4,6 +4,7 @@
__license__ = "MIT"

import os, signal, sys
import datetime
import threading
import operator
import time
Expand Down Expand Up @@ -64,7 +65,7 @@ def __init__(
cluster_config=None,
cluster_sync=None,
cluster_cancel=None,
cluster_mcancel=None,
cluster_cancel_nargs=None,
drmaa=None,
drmaa_log_dir=None,
kubernetes=None,
Expand Down Expand Up @@ -197,7 +198,7 @@ def __init__(
GenericClusterExecutor,
statuscmd=cluster_status,
cancelcmd=cluster_cancel,
mcancelcmd=cluster_mcancel,
cancelnargs=cluster_cancel_nargs,
max_status_checks_per_second=max_status_checks_per_second,
)

Expand Down
4 changes: 2 additions & 2 deletions snakemake/workflow.py
Expand Up @@ -616,7 +616,7 @@ def execute(
assume_shared_fs=True,
cluster_status=None,
cluster_cancel=None,
cluster_mcancel=None,
cluster_cancel_nargs=None,
report=None,
report_stylesheet=None,
export_cwl=False,
Expand Down Expand Up @@ -985,7 +985,7 @@ def files(items):
cluster=cluster,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_mcancel=cluster_mcancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_config=cluster_config,
cluster_sync=cluster_sync,
jobname=jobname,
Expand Down
5 changes: 3 additions & 2 deletions tests/common.py
Expand Up @@ -6,6 +6,7 @@
import os
import signal
import sys
import shlex
import shutil
import time
from os.path import join
Expand Down Expand Up @@ -166,11 +167,11 @@ def run(
success = True
else:
with subprocess.Popen(
shellcmd.split(" "), cwd=path if no_tmpdir else tmpdir
shlex.split(shellcmd), cwd=path if no_tmpdir else tmpdir
) as process:
time.sleep(sigint_after)
process.send_signal(signal.SIGINT)
process.wait(5)
time.sleep(2)
success = process.returncode == 0
except subprocess.CalledProcessError as e:
success = False
Expand Down
4 changes: 0 additions & 4 deletions tests/test_cluster_cancelscript/mscancel.sh

This file was deleted.

1 change: 1 addition & 0 deletions tests/test_cluster_cancelscript/sbatch
@@ -1,4 +1,5 @@
#!/bin/bash
set -x
echo --sbatch-- >> sbatch.log
echo `date` >> sbatch.log
tail -n1 $1 >> sbatch.log
Expand Down
21 changes: 19 additions & 2 deletions tests/test_cluster_cancelscript/scancel.sh
@@ -1,3 +1,20 @@
#!/bin/bash
echo cancel >>scancel.txt
kill $* &>/dev/null
set -x
echo `date`
echo cancel $* >>scancel.txt

list_descendants ()
{
local children=$(ps -o pid= --ppid "$1")

for pid in $children
do
list_descendants "$pid"
done

echo "$children"
}

for x in $*; do
kill $(list_descendants $x)
done
1 change: 1 addition & 0 deletions tests/test_cluster_cancelscript/status.sh
@@ -0,0 +1 @@
echo running
23 changes: 16 additions & 7 deletions tests/tests.py
Expand Up @@ -122,31 +122,40 @@ def test_cluster_cancelscript():
snakefile="Snakefile.nonstandard",
shellcmd=(
"snakemake -j 10 --cluster=./sbatch --cluster-cancel=./scancel.sh "
"-s Snakefile.nonstandard"
"--cluster-status=./status.sh -s Snakefile.nonstandard"
),
shouldfail=True,
cleanup=False,
sigint_after=2,
sigint_after=4,
)
scancel_txt = open("%s/scancel.txt" % outdir).read()
assert scancel_txt == "cancel\ncancel\n"
scancel_lines = scancel_txt.splitlines()
assert len(scancel_lines) == 1
assert scancel_lines[0].startswith("cancel")
assert len(scancel_lines[0].split(" ")) == 3


@skip_on_windows
def test_cluster_mcancelscript():
def test_cluster_cancelscript_nargs1():
outdir = run(
dpath("test_cluster_cancelscript"),
snakefile="Snakefile.nonstandard",
shellcmd=(
"snakemake -j 10 --cluster=./sbatch --cluster-mcancel=./mscancel.sh "
"snakemake -j 10 --cluster=./sbatch --cluster-cancel=./scancel.sh "
"--cluster-status=./status.sh --cluster-cancel-nargs=1 "
"-s Snakefile.nonstandard"
),
shouldfail=True,
cleanup=False,
sigint_after=2,
sigint_after=4,
)
scancel_txt = open("%s/scancel.txt" % outdir).read()
assert scancel_txt == "mcancel\n"
scancel_lines = scancel_txt.splitlines()
assert len(scancel_lines) == 2
assert scancel_lines[0].startswith("cancel")
assert scancel_lines[1].startswith("cancel")
assert len(scancel_lines[0].split(" ")) == 2
assert len(scancel_lines[1].split(" ")) == 2


def test15():
Expand Down

0 comments on commit dc9a871

Please sign in to comment.