diff --git a/docs/tutorial/additional_features.rst b/docs/tutorial/additional_features.rst index fe255428f..68adeb64d 100644 --- a/docs/tutorial/additional_features.rst +++ b/docs/tutorial/additional_features.rst @@ -279,6 +279,19 @@ When snakemake is terminated by pressing ``Ctrl-C``, it will cancel all currentl You can get the same behaviour with ``--cluster`` by adding ``--cluster-cancel`` and passing a command to use for canceling jobs by their jobid (e.g., ``scancel`` for SLURM or ``qdel`` for SGE). Most job schedulers can be passed multiple jobids and you can use ``--cluster-cancel-nargs`` to limit the number of arguments (default is 1000 which is reasonable for most schedulers). +Using --cluster-sidecar +::::::::::::::::::::::: + +In certain situations, it is necessary to not perform calls to cluster commands directly and instead have a "sidecar" process, e.g., providing a REST API. +One example is when using SLURM where regular calls to ``scontrol show job JOBID`` or ``sacct -j JOBID`` puts a high load on the controller. +Rather, it is better to use the ``squeue`` command with the ``-i/--iterate`` option. + +When using ``--cluster``, you can use ``--cluster-sidecar`` to pass in a command that starts a sidecar server. +The command should print one line to stdout and then block and accept connections. +The line will subsequently be available in the calls to ``--cluster``, ``--cluster-status``, and ``--cluster-cancel`` in the environment variable ``SNAKEMAKE_CLUSTER_SIDECAR_VARS``. +In the case of a REST server, you can use this to return the port that the server is listening on and credentials. +When the Snakemake process terminates, the sidecar process will be terminated as well. + Constraining wildcards :::::::::::::::::::::: diff --git a/snakemake/__init__.py b/snakemake/__init__.py index f6e8b0b0c..2c90a0e38 100644 --- a/snakemake/__init__.py +++ b/snakemake/__init__.py @@ -168,6 +168,7 @@ def snakemake( cluster_status=None, cluster_cancel=None, cluster_cancel_nargs=None, + cluster_sidecar=None, export_cwl=None, show_failed_logs=False, keep_incomplete=False, @@ -300,6 +301,7 @@ def snakemake( cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as a single argument. cluster_cancel (str): command to cancel multiple job IDs (like SLURM 'scancel') (default None) cluster_cancel_nargs (int): maximal number of job ids to pass to cluster_cancel (default 1000) + cluster_sidecar (str): command that starts a sidecar process, see cluster documentation (default None) export_cwl (str): Compile workflow to CWL and save to given file log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries: keep_incomplete (bool): keep incomplete output files of failed jobs @@ -697,6 +699,7 @@ def snakemake( cluster_status=cluster_status, cluster_cancel=cluster_cancel, cluster_cancel_nargs=cluster_cancel_nargs, + cluster_sidecar=cluster_sidecar, max_jobs_per_second=max_jobs_per_second, max_status_checks_per_second=max_status_checks_per_second, overwrite_groups=overwrite_groups, @@ -785,6 +788,7 @@ def snakemake( cluster_status=cluster_status, cluster_cancel=cluster_cancel, cluster_cancel_nargs=cluster_cancel_nargs, + cluster_sidecar=cluster_sidecar, report=report, report_stylesheet=report_stylesheet, export_cwl=export_cwl, @@ -2152,6 +2156,12 @@ def get_argument_parser(profile=None): help="Specify maximal number of job ids to pass to --cluster-cancel " "command, defaults to 1000.", ) + group_cluster.add_argument( + "--cluster-sidecar", + default=None, + help="Optional command to start a sidecar process during cluster " + "execution. Only active when --cluster is given as well.", + ) group_cluster.add_argument( "--drmaa-log-dir", metavar="DIR", @@ -2424,7 +2434,7 @@ def adjust_path(f): args.cluster_config = adjust_path(args.cluster_config) if args.cluster_sync: args.cluster_sync = adjust_path(args.cluster_sync) - for key in "cluster_status", "cluster_cancel": + for key in "cluster_status", "cluster_cancel", "cluster_sidecar": if getattr(args, key): setattr(args, key, adjust_path(getattr(args, key))) if args.report_stylesheet: @@ -2900,6 +2910,7 @@ def open_browser(): cluster_status=args.cluster_status, cluster_cancel=args.cluster_cancel, cluster_cancel_nargs=args.cluster_cancel_nargs, + cluster_sidecar=args.cluster_sidecar, export_cwl=args.export_cwl, show_failed_logs=args.show_failed_logs, keep_incomplete=args.keep_incomplete, diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index 546b27dee..06f2394f1 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -18,6 +18,7 @@ import subprocess import signal import tempfile +import threading from functools import partial from itertools import chain from collections import namedtuple @@ -925,6 +926,7 @@ def __init__( statuscmd=None, cancelcmd=None, cancelnargs=None, + sidecarcmd=None, cluster_config=None, jobname="snakejob.{rulename}.{jobid}.sh", printreason=False, @@ -947,6 +949,7 @@ def __init__( self.statuscmd = statuscmd self.cancelcmd = cancelcmd + self.sidecarcmd = sidecarcmd self.cancelnargs = cancelnargs self.external_jobid = dict() # We need to collect all external ids so we can properly cancel even if @@ -970,6 +973,10 @@ def __init__( keepmetadata=keepmetadata, ) + self.sidecar_vars = None + if self.sidecarcmd: + self._launch_sidecar() + if statuscmd: self.exec_job += " && exit 0 || exit 1" elif assume_shared_fs: @@ -982,13 +989,53 @@ def __init__( "specify a cluster status command." ) + def _launch_sidecar(self): + def copy_stdout(executor, process): + """Run sidecar process and copy it's stdout to our stdout.""" + while process.poll() is None and executor.wait: + buf = process.stdout.readline() + if buf: + self.stdout.write(buf) + # one final time ... + buf = process.stdout.readline() + if buf: + self.stdout.write(buf) + + def wait(executor, process): + while executor.wait: + time.sleep(0.5) + process.terminate() + process.wait() + logger.info( + "Cluster sidecar process has terminated (retcode=%d)." + % process.returncode + ) + + logger.info("Launch sidecar process and read first output line.") + process = subprocess.Popen( + self.sidecarcmd, stdout=subprocess.PIPE, shell=False, encoding="utf-8" + ) + self.sidecar_vars = process.stdout.readline() + while self.sidecar_vars and self.sidecar_vars[-1] in "\n\r": + self.sidecar_vars = self.sidecar_vars[:-1] + logger.info("Done reading first output line.") + + thread_stdout = threading.Thread( + target=copy_stdout, name="sidecar_stdout", args=(self, process) + ) + thread_stdout.start() + thread_wait = threading.Thread( + target=wait, name="sidecar_stdout", args=(self, process) + ) + thread_wait.start() + def cancel(self): def _chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i : i + n] - if self.cancelcmd: # We have --cluster-[m]cancel + if self.cancelcmd: # We have --cluster-cancel # Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None) # then pass all job ids at once jobids = list(self.all_ext_jobids) @@ -998,8 +1045,14 @@ def _chunks(lst, n): for chunk in chunks: try: cancel_timeout = 2 # rather fail on timeout than miss canceling all + env = dict(os.environ) + if self.sidecar_vars: + env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars subprocess.check_call( - [self.cancelcmd] + chunk, shell=False, timeout=cancel_timeout + [self.cancelcmd] + chunk, + shell=False, + timeout=cancel_timeout, + env=env, ) except subprocess.SubprocessError: failures += 1 @@ -1070,12 +1123,16 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None): raise WorkflowError(str(e), rule=job.rule if not job.is_group() else None) try: + env = dict(os.environ) + if self.sidecar_vars: + env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars ext_jobid = ( subprocess.check_output( '{submitcmd} "{jobscript}"'.format( submitcmd=submitcmd, jobscript=jobscript ), shell=True, + env=env, ) .decode() .split("\n") @@ -1124,11 +1181,15 @@ def _wait_for_jobs(self): def job_status(job, valid_returns=["running", "success", "failed"]): try: # this command shall return "success", "failed" or "running" + env = dict(os.environ) + if self.sidecar_vars: + env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars ret = subprocess.check_output( "{statuscmd} {jobid}".format( jobid=job.jobid, statuscmd=self.statuscmd ), shell=True, + env=env, ).decode() except subprocess.CalledProcessError as e: if e.returncode < 0: diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index 53e8a1b3b..c42fb8fa7 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -66,6 +66,7 @@ def __init__( cluster_sync=None, cluster_cancel=None, cluster_cancel_nargs=None, + cluster_sidecar=None, drmaa=None, drmaa_log_dir=None, kubernetes=None, @@ -199,6 +200,7 @@ def __init__( statuscmd=cluster_status, cancelcmd=cluster_cancel, cancelnargs=cluster_cancel_nargs, + sidecarcmd=cluster_sidecar, max_status_checks_per_second=max_status_checks_per_second, ) diff --git a/snakemake/workflow.py b/snakemake/workflow.py index ffbbcf7f1..fa7a645aa 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -617,6 +617,7 @@ def execute( cluster_status=None, cluster_cancel=None, cluster_cancel_nargs=None, + cluster_sidecar=None, report=None, report_stylesheet=None, export_cwl=False, @@ -986,6 +987,7 @@ def files(items): cluster_status=cluster_status, cluster_cancel=cluster_cancel, cluster_cancel_nargs=cluster_cancel_nargs, + cluster_sidecar=cluster_sidecar, cluster_config=cluster_config, cluster_sync=cluster_sync, jobname=jobname, diff --git a/tests/test_cluster_sidecar/Snakefile b/tests/test_cluster_sidecar/Snakefile new file mode 100644 index 000000000..fd46f32ee --- /dev/null +++ b/tests/test_cluster_sidecar/Snakefile @@ -0,0 +1,13 @@ +from snakemake import shell + + +rule all: + input: 'f.1', 'f.2' + +rule one: + output: 'f.1' + shell: "touch {output}" + +rule two: + output: 'f.2' + shell: "touch {output}" diff --git a/tests/test_cluster_sidecar/expected-results/f.1 b/tests/test_cluster_sidecar/expected-results/f.1 new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_cluster_sidecar/expected-results/f.2 b/tests/test_cluster_sidecar/expected-results/f.2 new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_cluster_sidecar/expected-results/launched.txt b/tests/test_cluster_sidecar/expected-results/launched.txt new file mode 100644 index 000000000..3381db118 --- /dev/null +++ b/tests/test_cluster_sidecar/expected-results/launched.txt @@ -0,0 +1,2 @@ +SNAKEMAKE_CLUSTER_SIDECAR_VARS=FIRST_LINE +SNAKEMAKE_CLUSTER_SIDECAR_VARS=FIRST_LINE diff --git a/tests/test_cluster_sidecar/expected-results/sidecar.txt b/tests/test_cluster_sidecar/expected-results/sidecar.txt new file mode 100644 index 000000000..bfeb0fe5e --- /dev/null +++ b/tests/test_cluster_sidecar/expected-results/sidecar.txt @@ -0,0 +1,2 @@ +sidecar started +sidecar stopped diff --git a/tests/test_cluster_sidecar/sbatch b/tests/test_cluster_sidecar/sbatch new file mode 100755 index 000000000..7995c09c4 --- /dev/null +++ b/tests/test_cluster_sidecar/sbatch @@ -0,0 +1,11 @@ +#!/bin/bash +set -x +echo "SNAKEMAKE_CLUSTER_SIDECAR_VARS=$SNAKEMAKE_CLUSTER_SIDECAR_VARS" >>launched.txt +echo --sbatch-- >> sbatch.log +echo `date` >> sbatch.log +tail -n1 $1 >> sbatch.log +cat $1 >> sbatch.log +# daemonize job script +nohup sh $1 0<&- &>/dev/null & +# print PID for job number +echo $! diff --git a/tests/test_cluster_sidecar/sidecar.sh b/tests/test_cluster_sidecar/sidecar.sh new file mode 100755 index 000000000..7849e66b2 --- /dev/null +++ b/tests/test_cluster_sidecar/sidecar.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +set -ex + +echo "FIRST_LINE" +echo "sidecar started" > sidecar.txt +sleep infinity & +pid=$! + +catch() +{ + set -x + kill -TERM $pid || true + echo "sidecar stopped" >> sidecar.txt + exit 0 +} + +trap catch SIGTERM SIGINT + +wait diff --git a/tests/test_cluster_sidecar/test.in b/tests/test_cluster_sidecar/test.in new file mode 100644 index 000000000..ce667834a --- /dev/null +++ b/tests/test_cluster_sidecar/test.in @@ -0,0 +1 @@ +testz0r diff --git a/tests/tests.py b/tests/tests.py index 9923b83e8..aae2fa80a 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -135,6 +135,14 @@ def test_cluster_cancelscript(): assert len(scancel_lines[0].split(" ")) == 3 +@skip_on_windows +def test_cluster_sidecar(): + run( + dpath("test_cluster_sidecar"), + shellcmd=("snakemake -j 10 --cluster=./sbatch --cluster-sidecar=./sidecar.sh"), + ) + + @skip_on_windows def test_cluster_cancelscript_nargs1(): outdir = run(