Navigation Menu

Skip to content

Commit

Permalink
feat: cluster sidecar (#1397)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Feb 17, 2022
1 parent 0593de1 commit b992cd1
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 3 deletions.
13 changes: 13 additions & 0 deletions docs/tutorial/additional_features.rst
Expand Up @@ -279,6 +279,19 @@ When snakemake is terminated by pressing ``Ctrl-C``, it will cancel all currentl
You can get the same behaviour with ``--cluster`` by adding ``--cluster-cancel`` and passing a command to use for canceling jobs by their jobid (e.g., ``scancel`` for SLURM or ``qdel`` for SGE).
Most job schedulers can be passed multiple jobids and you can use ``--cluster-cancel-nargs`` to limit the number of arguments (default is 1000 which is reasonable for most schedulers).

Using --cluster-sidecar
:::::::::::::::::::::::

In certain situations, it is necessary to not perform calls to cluster commands directly and instead have a "sidecar" process, e.g., providing a REST API.
One example is when using SLURM where regular calls to ``scontrol show job JOBID`` or ``sacct -j JOBID`` puts a high load on the controller.
Rather, it is better to use the ``squeue`` command with the ``-i/--iterate`` option.

When using ``--cluster``, you can use ``--cluster-sidecar`` to pass in a command that starts a sidecar server.
The command should print one line to stdout and then block and accept connections.
The line will subsequently be available in the calls to ``--cluster``, ``--cluster-status``, and ``--cluster-cancel`` in the environment variable ``SNAKEMAKE_CLUSTER_SIDECAR_VARS``.
In the case of a REST server, you can use this to return the port that the server is listening on and credentials.
When the Snakemake process terminates, the sidecar process will be terminated as well.

Constraining wildcards
::::::::::::::::::::::

Expand Down
13 changes: 12 additions & 1 deletion snakemake/__init__.py
Expand Up @@ -168,6 +168,7 @@ def snakemake(
cluster_status=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
cluster_sidecar=None,
export_cwl=None,
show_failed_logs=False,
keep_incomplete=False,
Expand Down Expand Up @@ -300,6 +301,7 @@ def snakemake(
cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as a single argument.
cluster_cancel (str): command to cancel multiple job IDs (like SLURM 'scancel') (default None)
cluster_cancel_nargs (int): maximal number of job ids to pass to cluster_cancel (default 1000)
cluster_sidecar (str): command that starts a sidecar process, see cluster documentation (default None)
export_cwl (str): Compile workflow to CWL and save to given file
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
keep_incomplete (bool): keep incomplete output files of failed jobs
Expand Down Expand Up @@ -697,6 +699,7 @@ def snakemake(
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_sidecar=cluster_sidecar,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second,
overwrite_groups=overwrite_groups,
Expand Down Expand Up @@ -785,6 +788,7 @@ def snakemake(
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_sidecar=cluster_sidecar,
report=report,
report_stylesheet=report_stylesheet,
export_cwl=export_cwl,
Expand Down Expand Up @@ -2152,6 +2156,12 @@ def get_argument_parser(profile=None):
help="Specify maximal number of job ids to pass to --cluster-cancel "
"command, defaults to 1000.",
)
group_cluster.add_argument(
"--cluster-sidecar",
default=None,
help="Optional command to start a sidecar process during cluster "
"execution. Only active when --cluster is given as well.",
)
group_cluster.add_argument(
"--drmaa-log-dir",
metavar="DIR",
Expand Down Expand Up @@ -2424,7 +2434,7 @@ def adjust_path(f):
args.cluster_config = adjust_path(args.cluster_config)
if args.cluster_sync:
args.cluster_sync = adjust_path(args.cluster_sync)
for key in "cluster_status", "cluster_cancel":
for key in "cluster_status", "cluster_cancel", "cluster_sidecar":
if getattr(args, key):
setattr(args, key, adjust_path(getattr(args, key)))
if args.report_stylesheet:
Expand Down Expand Up @@ -2900,6 +2910,7 @@ def open_browser():
cluster_status=args.cluster_status,
cluster_cancel=args.cluster_cancel,
cluster_cancel_nargs=args.cluster_cancel_nargs,
cluster_sidecar=args.cluster_sidecar,
export_cwl=args.export_cwl,
show_failed_logs=args.show_failed_logs,
keep_incomplete=args.keep_incomplete,
Expand Down
65 changes: 63 additions & 2 deletions snakemake/executors/__init__.py
Expand Up @@ -18,6 +18,7 @@
import subprocess
import signal
import tempfile
import threading
from functools import partial
from itertools import chain
from collections import namedtuple
Expand Down Expand Up @@ -925,6 +926,7 @@ def __init__(
statuscmd=None,
cancelcmd=None,
cancelnargs=None,
sidecarcmd=None,
cluster_config=None,
jobname="snakejob.{rulename}.{jobid}.sh",
printreason=False,
Expand All @@ -947,6 +949,7 @@ def __init__(

self.statuscmd = statuscmd
self.cancelcmd = cancelcmd
self.sidecarcmd = sidecarcmd
self.cancelnargs = cancelnargs
self.external_jobid = dict()
# We need to collect all external ids so we can properly cancel even if
Expand All @@ -970,6 +973,10 @@ def __init__(
keepmetadata=keepmetadata,
)

self.sidecar_vars = None
if self.sidecarcmd:
self._launch_sidecar()

if statuscmd:
self.exec_job += " && exit 0 || exit 1"
elif assume_shared_fs:
Expand All @@ -982,13 +989,53 @@ def __init__(
"specify a cluster status command."
)

def _launch_sidecar(self):
def copy_stdout(executor, process):
"""Run sidecar process and copy it's stdout to our stdout."""
while process.poll() is None and executor.wait:
buf = process.stdout.readline()
if buf:
self.stdout.write(buf)
# one final time ...
buf = process.stdout.readline()
if buf:
self.stdout.write(buf)

def wait(executor, process):
while executor.wait:
time.sleep(0.5)
process.terminate()
process.wait()
logger.info(
"Cluster sidecar process has terminated (retcode=%d)."
% process.returncode
)

logger.info("Launch sidecar process and read first output line.")
process = subprocess.Popen(
self.sidecarcmd, stdout=subprocess.PIPE, shell=False, encoding="utf-8"
)
self.sidecar_vars = process.stdout.readline()
while self.sidecar_vars and self.sidecar_vars[-1] in "\n\r":
self.sidecar_vars = self.sidecar_vars[:-1]
logger.info("Done reading first output line.")

thread_stdout = threading.Thread(
target=copy_stdout, name="sidecar_stdout", args=(self, process)
)
thread_stdout.start()
thread_wait = threading.Thread(
target=wait, name="sidecar_stdout", args=(self, process)
)
thread_wait.start()

def cancel(self):
def _chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]

if self.cancelcmd: # We have --cluster-[m]cancel
if self.cancelcmd: # We have --cluster-cancel
# Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None)
# then pass all job ids at once
jobids = list(self.all_ext_jobids)
Expand All @@ -998,8 +1045,14 @@ def _chunks(lst, n):
for chunk in chunks:
try:
cancel_timeout = 2 # rather fail on timeout than miss canceling all
env = dict(os.environ)
if self.sidecar_vars:
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
subprocess.check_call(
[self.cancelcmd] + chunk, shell=False, timeout=cancel_timeout
[self.cancelcmd] + chunk,
shell=False,
timeout=cancel_timeout,
env=env,
)
except subprocess.SubprocessError:
failures += 1
Expand Down Expand Up @@ -1070,12 +1123,16 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
raise WorkflowError(str(e), rule=job.rule if not job.is_group() else None)

try:
env = dict(os.environ)
if self.sidecar_vars:
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
ext_jobid = (
subprocess.check_output(
'{submitcmd} "{jobscript}"'.format(
submitcmd=submitcmd, jobscript=jobscript
),
shell=True,
env=env,
)
.decode()
.split("\n")
Expand Down Expand Up @@ -1124,11 +1181,15 @@ def _wait_for_jobs(self):
def job_status(job, valid_returns=["running", "success", "failed"]):
try:
# this command shall return "success", "failed" or "running"
env = dict(os.environ)
if self.sidecar_vars:
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
ret = subprocess.check_output(
"{statuscmd} {jobid}".format(
jobid=job.jobid, statuscmd=self.statuscmd
),
shell=True,
env=env,
).decode()
except subprocess.CalledProcessError as e:
if e.returncode < 0:
Expand Down
2 changes: 2 additions & 0 deletions snakemake/scheduler.py
Expand Up @@ -66,6 +66,7 @@ def __init__(
cluster_sync=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
cluster_sidecar=None,
drmaa=None,
drmaa_log_dir=None,
kubernetes=None,
Expand Down Expand Up @@ -199,6 +200,7 @@ def __init__(
statuscmd=cluster_status,
cancelcmd=cluster_cancel,
cancelnargs=cluster_cancel_nargs,
sidecarcmd=cluster_sidecar,
max_status_checks_per_second=max_status_checks_per_second,
)

Expand Down
2 changes: 2 additions & 0 deletions snakemake/workflow.py
Expand Up @@ -617,6 +617,7 @@ def execute(
cluster_status=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
cluster_sidecar=None,
report=None,
report_stylesheet=None,
export_cwl=False,
Expand Down Expand Up @@ -986,6 +987,7 @@ def files(items):
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_sidecar=cluster_sidecar,
cluster_config=cluster_config,
cluster_sync=cluster_sync,
jobname=jobname,
Expand Down
13 changes: 13 additions & 0 deletions tests/test_cluster_sidecar/Snakefile
@@ -0,0 +1,13 @@
from snakemake import shell


rule all:
input: 'f.1', 'f.2'

rule one:
output: 'f.1'
shell: "touch {output}"

rule two:
output: 'f.2'
shell: "touch {output}"
Empty file.
Empty file.
2 changes: 2 additions & 0 deletions tests/test_cluster_sidecar/expected-results/launched.txt
@@ -0,0 +1,2 @@
SNAKEMAKE_CLUSTER_SIDECAR_VARS=FIRST_LINE
SNAKEMAKE_CLUSTER_SIDECAR_VARS=FIRST_LINE
2 changes: 2 additions & 0 deletions tests/test_cluster_sidecar/expected-results/sidecar.txt
@@ -0,0 +1,2 @@
sidecar started
sidecar stopped
11 changes: 11 additions & 0 deletions tests/test_cluster_sidecar/sbatch
@@ -0,0 +1,11 @@
#!/bin/bash
set -x
echo "SNAKEMAKE_CLUSTER_SIDECAR_VARS=$SNAKEMAKE_CLUSTER_SIDECAR_VARS" >>launched.txt
echo --sbatch-- >> sbatch.log
echo `date` >> sbatch.log
tail -n1 $1 >> sbatch.log
cat $1 >> sbatch.log
# daemonize job script
nohup sh $1 0<&- &>/dev/null &
# print PID for job number
echo $!
20 changes: 20 additions & 0 deletions tests/test_cluster_sidecar/sidecar.sh
@@ -0,0 +1,20 @@
#!/bin/bash

set -ex

echo "FIRST_LINE"
echo "sidecar started" > sidecar.txt
sleep infinity &
pid=$!

catch()
{
set -x
kill -TERM $pid || true
echo "sidecar stopped" >> sidecar.txt
exit 0
}

trap catch SIGTERM SIGINT

wait
1 change: 1 addition & 0 deletions tests/test_cluster_sidecar/test.in
@@ -0,0 +1 @@
testz0r
8 changes: 8 additions & 0 deletions tests/tests.py
Expand Up @@ -135,6 +135,14 @@ def test_cluster_cancelscript():
assert len(scancel_lines[0].split(" ")) == 3


@skip_on_windows
def test_cluster_sidecar():
run(
dpath("test_cluster_sidecar"),
shellcmd=("snakemake -j 10 --cluster=./sbatch --cluster-sidecar=./sidecar.sh"),
)


@skip_on_windows
def test_cluster_cancelscript_nargs1():
outdir = run(
Expand Down

0 comments on commit b992cd1

Please sign in to comment.