From 5b15cafdfff27a3319e837b171bfed6984a1b0ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Mon, 21 Feb 2022 10:29:08 +0100 Subject: [PATCH 1/4] feat: ability to flag output files as service, meaning that the generating job is expected to stay active until the file is not needed anymore by consuming jobs --- snakemake/dag.py | 52 ++++++++++++++++++++++-------------------- snakemake/io.py | 15 ++++++++++-- snakemake/jobs.py | 40 +++++++++++++++++++++++--------- snakemake/rules.py | 1 + snakemake/scheduler.py | 2 +- snakemake/workflow.py | 1 + 6 files changed, 72 insertions(+), 39 deletions(-) diff --git a/snakemake/dag.py b/snakemake/dag.py index 11b57e6b7..4061e066b 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -494,7 +494,7 @@ def check_and_touch_output( expanded_output, latency_wait=wait, force_stay_on_remote=force_stay_on_remote, - ignore_pipe=True, + ignore_pipe_or_service=True, ) except IOError as e: raise MissingOutputException( @@ -1215,14 +1215,14 @@ def postprocess(self, update_needrun=True): if update_needrun: self.update_needrun() self.update_priority() - self.handle_pipes() + self.handle_pipes_and_services() self.update_groups() self.update_ready() self.close_remote_objects() self.update_checkpoint_outputs() - def handle_pipes(self): - """Use pipes to determine job groups. Check if every pipe has exactly + def handle_pipes_and_services(self): + """Use pipes and services to determine job groups. Check if every pipe has exactly one consumer""" visited = set() @@ -1231,9 +1231,11 @@ def handle_pipes(self): if job.group is not None: candidate_groups.add(job.group) all_depending = set() - has_pipe = False + has_pipe_or_service = False for f in job.output: - if is_flagged(f, "pipe"): + is_pipe = is_flagged(f, "pipe") + is_service = is_flagged(f, "service") + if is_pipe or is_service: if job.is_run: raise WorkflowError( "Rule defines pipe output but " @@ -1244,11 +1246,11 @@ def handle_pipes(self): rule=job.rule, ) - has_pipe = True + has_pipe_or_service = True depending = [ j for j, files in self.depending[job].items() if f in files ] - if len(depending) > 1: + if is_pipe and len(depending) > 1: raise WorkflowError( "Output file {} is marked as pipe " "but more than one job depends on " @@ -1259,40 +1261,40 @@ def handle_pipes(self): ) elif len(depending) == 0: raise WorkflowError( - "Output file {} is marked as pipe " + "Output file {} is marked as pipe or service " "but it has no consumer. This is " "invalid because it can lead to " "a dead lock.".format(f), rule=job.rule, ) - depending = depending[0] - - if depending.is_run: - raise WorkflowError( - "Rule consumes pipe input but " - "uses a 'run' directive. This is " - "not possible for technical " - "reasons. Consider using 'shell' or " - "'script'.", - rule=job.rule, - ) + for dep in depending: + if dep.is_run: + raise WorkflowError( + "Rule consumes pipe or service input but " + "uses a 'run' directive. This is " + "not possible for technical " + "reasons. Consider using 'shell' or " + "'script'.", + rule=job.rule, + ) - all_depending.add(depending) - if depending.group is not None: - candidate_groups.add(depending.group) - if not has_pipe: + all_depending.add(dep) + if dep.group is not None: + candidate_groups.add(dep.group) + if not has_pipe_or_service: continue if len(candidate_groups) > 1: if all(isinstance(group, CandidateGroup) for group in candidate_groups): + # all candidates are newly created groups, merge them into one group = candidate_groups.pop() for g in candidate_groups: group.merge(g) else: raise WorkflowError( "An output file is marked as " - "pipe, but consuming jobs " + "pipe or service, but consuming jobs " "are part of conflicting " "groups.", rule=job.rule, diff --git a/snakemake/io.py b/snakemake/io.py index f898834e2..74c7685db 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -790,7 +790,7 @@ def __hash__(self): def wait_for_files( - files, latency_wait=3, force_stay_on_remote=False, ignore_pipe=False + files, latency_wait=3, force_stay_on_remote=False, ignore_pipe_or_service=False ): """Wait for given files to be present in the filesystem.""" files = list(files) @@ -807,7 +807,10 @@ def get_missing(): and (force_stay_on_remote or f.should_stay_on_remote) ) else os.path.exists(f) - if not (is_flagged(f, "pipe") and ignore_pipe) + if not ( + (is_flagged(f, "pipe") or is_flagged(f, "service")) + and ignore_pipe_or_service + ) else True ) ] @@ -1018,6 +1021,14 @@ def pipe(value): return flag(value, "pipe", not ON_WINDOWS) +def service(value): + if is_flagged(value, "protected"): + raise SyntaxError("Pipes may not be protected.") + if is_flagged(value, "remote"): + raise SyntaxError("Pipes may not be remote files.") + return flag(value, "service") + + def temporary(value): """An alias for temp.""" return temp(value) diff --git a/snakemake/jobs.py b/snakemake/jobs.py index 12f721d24..7093c43fc 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -38,9 +38,11 @@ def format_files(job, io, dynamicio): for f in io: if f in dynamicio: - yield "{} (dynamic)".format(f.format_dynamic()) + yield f"{f.format_dynamic()} (dynamic)" elif is_flagged(f, "pipe"): - yield "{} (pipe)".format(f) + yield f"{f} (pipe)" + elif is_flagged(f, "service"): + yield f"{f} (service)" elif is_flagged(f, "checkpoint_target"): yield TBDString() else: @@ -186,7 +188,6 @@ def __init__( self._attempt = self.dag.workflow.attempt # TODO get rid of these - self.pipe_output = set(f for f in self.output if is_flagged(f, "pipe")) self.dynamic_output, self.dynamic_input = set(), set() self.temp_output, self.protected_output = set(), set() self.touch_output = set() @@ -466,7 +467,11 @@ def is_run(self): @property def is_pipe(self): - return any([is_flagged(o, "pipe") for o in self.output]) + return any(is_flagged(o, "pipe") for o in self.output) + + @property + def is_service(self): + return any(is_flagged(o, "service") for o in self.output) @property def expanded_output(self): @@ -562,9 +567,9 @@ def output_mintime(self): def missing_output(self, requested): def handle_file(f): - # pipe output is always declared as missing + # pipe or service output is always declared as missing # (even if it might be present on disk for some reason) - if f in self.pipe_output or not f.exists: + if is_flagged(f, "pipe") or is_flagged(f, "service") or not f.exists: yield f if self.dynamic_output: @@ -1256,7 +1261,9 @@ def check_string_resource(res, value1, value2): self._resources = defaultdict(int) self._resources["_nodes"] = 1 - pipe_group = any([job.is_pipe for job in self.jobs]) + pipe_or_service_group = any( + [job.is_pipe or job.is_service for job in self.jobs] + ) # iterate over siblings that can be executed in parallel for siblings in self.toposorted: sibling_resources = defaultdict(int) @@ -1287,7 +1294,7 @@ def check_string_resource(res, value1, value2): for res, value in sibling_resources.items(): if isinstance(value, int): if res != "_nodes": - if self.dag.workflow.run_local or pipe_group: + if self.dag.workflow.run_local or pipe_or_service_group: # in case of local execution, this must be a # group of jobs that are connected with pipes # and have to run simultaneously @@ -1375,11 +1382,11 @@ def postprocess(self, error=False, **kwargs): if not error: for job in self.jobs: self.dag.handle_temp(job) - # remove all pipe outputs since all jobs of this group are done and the - # pipes are no longer needed + # remove all pipe and service outputs since all jobs of this group are done and the + # outputs are no longer needed for job in self.jobs: for f in job.output: - if is_flagged(f, "pipe"): + if is_flagged(f, "pipe") or is_flagged(f, "service"): f.remove() @property @@ -1510,6 +1517,7 @@ class Reason: "nooutput", "derived", "pipe", + "service", "target", "finished", ] @@ -1525,6 +1533,7 @@ def __init__(self): self.nooutput = False self.derived = True self.pipe = False + self.service = False @lazy_property def updated_input(self): @@ -1582,6 +1591,14 @@ def __str__(self): ", ".join(self.updated_input_run) ) ) + if self.pipe: + s.append( + "Output file is a pipe and has to be filled for consuming job." + ) + if self.service: + s.append( + "Job provides a service which has to be kept active until all consumers are finished." + ) s = "; ".join(s) if self.finished: return "Finished (was: {s})".format(s=s) @@ -1596,5 +1613,6 @@ def __bool__(self): or self.noio or self.nooutput or self.pipe + or self.service ) return v and not self.finished diff --git a/snakemake/rules.py b/snakemake/rules.py index a125cee99..07a1dd709 100644 --- a/snakemake/rules.py +++ b/snakemake/rules.py @@ -531,6 +531,7 @@ def _set_inoutput_item(self, item, output=False, name=None): "directory", "touch", "pipe", + "service", ]: logger.warning( "The flag '{}' used in rule {} is only valid for outputs, not inputs.".format( diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index c26f39e9a..138c78626 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -894,7 +894,7 @@ def calc_resource(self, name, value): raise WorkflowError( "Job needs {name}={res} but only {name}={gres} " "are available. This is likely because two " - "jobs are connected via a pipe and have to run " + "jobs are connected via a pipe or a service output and have to run " "simultaneously. Consider providing more " "resources (e.g. via --cores).".format(name=name, res=value, gres=gres) ) diff --git a/snakemake/workflow.py b/snakemake/workflow.py index 5834e458f..431c600e6 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -51,6 +51,7 @@ unpack, local, pipe, + service, repeat, report, multiext, From 11ae7c4637f1e42dd8da45bb72a6cd380ff5348d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Mon, 21 Feb 2022 10:53:09 +0100 Subject: [PATCH 2/4] handle service job termination --- snakemake/executors/__init__.py | 22 +++++++++++++++++----- snakemake/shell.py | 7 +++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index 8fe0ebb96..97067cc72 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -540,16 +540,17 @@ def run_single_job(self, job): return future def run_group_job(self, job): - """Run a pipe group job. + """Run a pipe or service group job. This lets all items run simultaneously.""" - # we only have to consider pipe groups because in local running mode, + # we only have to consider pipe or service groups because in local running mode, # these are the only groups that will occur futures = [self.run_single_job(j) for j in job] + n_non_service = sum(1 for j in job if not j.is_service) while True: - k = 0 + n_finished = 0 for f in futures: if f.done(): ex = f.exception() @@ -561,8 +562,19 @@ def run_group_job(self, job): shell.kill(j.jobid) raise ex else: - k += 1 - if k == len(futures): + n_finished += 1 + if n_finished >= n_non_service: + # terminate all service jobs since all consumers are done + for j in job: + if j.is_service: + logger.info( + f"Terminating service job {j.jobid} since all consuming jobs are finished." + ) + shell.terminate(j.jobid) + logger.info( + f"Service job {j.jobid} has been successfully terminated." + ) + return time.sleep(1) diff --git a/snakemake/shell.py b/snakemake/shell.py index ec0989151..d5d981216 100644 --- a/snakemake/shell.py +++ b/snakemake/shell.py @@ -114,6 +114,13 @@ def kill(cls, jobid): cls._processes[jobid].kill() del cls._processes[jobid] + @classmethod + def terminate(cls, jobid): + with cls._lock: + if jobid in cls._processes: + cls._processes[jobid].terminate() + del cls._processes[jobid] + @classmethod def cleanup(cls): with cls._lock: From a09ee5ceb483aa73a1da2dbac05f29914bade648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Tue, 22 Feb 2022 11:10:53 +0100 Subject: [PATCH 3/4] docs: added service rule/job documentation --- docs/snakefiles/rules.rst | 46 +++++++++++++++++++++++++++++++++++++++ test.socket | 1 + tests/tests.py | 5 +++++ 3 files changed, 52 insertions(+) create mode 120000 test.socket diff --git a/docs/snakefiles/rules.rst b/docs/snakefiles/rules.rst index 744aead28..8c936a745 100644 --- a/docs/snakefiles/rules.rst +++ b/docs/snakefiles/rules.rst @@ -1521,6 +1521,8 @@ This enables to almost arbitrarily partition the DAG, e.g. in order to safe netw For execution on the cloud using Google Life Science API and preemptible instances, we expect all rules in the group to be homogenously set as preemptible instances (e.g., with command-line option ``--preemptible-rules``), such that a preemptible VM is requested for the execution of the group job. +.. _snakefiles-piped-output: + Piped output ------------ @@ -1555,6 +1557,50 @@ It is possible to combine explicit group definition as above with pipe outputs. Thereby, pipe jobs can live within, or (automatically) extend existing groups. However, the two jobs connected by a pipe may not exist in conflicting groups. + +.. _snakefiles-service-rules: + +Service rules/jobs +------------------ + +From Snakemake 7.0 on, it is possible to define so-called service rules. +Jobs spawned from such rules provide at least one special output file that is marked as ``service``, which means that it is considered to provide a resource that shall be kept available until all consuming jobs are finished. +This can for example be the socket of a database, a shared memory device, a ramdisk, and so on. +It can even just be a dummy file, and access to the service might happen via a different channel (e.g. a local http port). +Service jobs are expected to not exit after creating that resource, but instead wait until Snakemake terminates them (e.g. via SIGTERM on Unixoid systems). + +Consider the following example: + +.. code-block:: python + + rule the_service: + output: + service("foo.socket") + shell: + # here we simulate some kind of server process that provides data via a socket + "ln -s /dev/random {output}; sleep 10000" + + + rule consumer1: + input: + "foo.socket" + output: + "test.txt" + shell: + "head -n1 {input} > {output}" + + + rule consumer2: + input: + "foo.socket" + output: + "test2.txt" + shell: + "head -n1 {input} > {output}" + +Snakemake will schedule the service with all consumers to the same physical node (in the future we might provide further controls and other modes of operation). +Once all consumer jobs are finished, the service job will be terminated automatically by Snakemake, and the service output will be removed. + .. _snakefiles-paramspace: Parameter space exploration diff --git a/test.socket b/test.socket new file mode 120000 index 000000000..b9251ec6a --- /dev/null +++ b/test.socket @@ -0,0 +1 @@ +/dev/random \ No newline at end of file diff --git a/tests/tests.py b/tests/tests.py index bfa2503fa..7f39608fc 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1498,3 +1498,8 @@ def test_peppy(): def test_template_engine(): run(dpath("test_template_engine")) + + +@skip_on_windows +def test_service_jobs(): + run(dpath("test_service_jobs"), check_md5=False) From d84d52b38c96f4e05335d17abc9270d76b2a3119 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Tue, 22 Feb 2022 11:23:46 +0100 Subject: [PATCH 4/4] add missing testcase --- tests/test_service_jobs/Snakefile | 29 +++++++++++++++++++ .../expected-results/test.txt | 1 + .../expected-results/test2.txt | 1 + 3 files changed, 31 insertions(+) create mode 100644 tests/test_service_jobs/Snakefile create mode 100644 tests/test_service_jobs/expected-results/test.txt create mode 100644 tests/test_service_jobs/expected-results/test2.txt diff --git a/tests/test_service_jobs/Snakefile b/tests/test_service_jobs/Snakefile new file mode 100644 index 000000000..4b8b13fb6 --- /dev/null +++ b/tests/test_service_jobs/Snakefile @@ -0,0 +1,29 @@ +rule all: + input: + "test.txt", + "test2.txt", + + +rule a: + output: + service("foo.socket") + shell: + "ln -s /dev/random {output}; sleep 10000" + + +rule b: + input: + "foo.socket" + output: + "test.txt" + shell: + "head -n1 {input} > {output}" + + +rule c: + input: + "foo.socket" + output: + "test2.txt" + shell: + "head -n1 {input} > {output}" \ No newline at end of file diff --git a/tests/test_service_jobs/expected-results/test.txt b/tests/test_service_jobs/expected-results/test.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_service_jobs/expected-results/test.txt @@ -0,0 +1 @@ +test diff --git a/tests/test_service_jobs/expected-results/test2.txt b/tests/test_service_jobs/expected-results/test2.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_service_jobs/expected-results/test2.txt @@ -0,0 +1 @@ +test