diff --git a/docs/snakefiles/rules.rst b/docs/snakefiles/rules.rst index 744aead28..8c936a745 100644 --- a/docs/snakefiles/rules.rst +++ b/docs/snakefiles/rules.rst @@ -1521,6 +1521,8 @@ This enables to almost arbitrarily partition the DAG, e.g. in order to safe netw For execution on the cloud using Google Life Science API and preemptible instances, we expect all rules in the group to be homogenously set as preemptible instances (e.g., with command-line option ``--preemptible-rules``), such that a preemptible VM is requested for the execution of the group job. +.. _snakefiles-piped-output: + Piped output ------------ @@ -1555,6 +1557,50 @@ It is possible to combine explicit group definition as above with pipe outputs. Thereby, pipe jobs can live within, or (automatically) extend existing groups. However, the two jobs connected by a pipe may not exist in conflicting groups. + +.. _snakefiles-service-rules: + +Service rules/jobs +------------------ + +From Snakemake 7.0 on, it is possible to define so-called service rules. +Jobs spawned from such rules provide at least one special output file that is marked as ``service``, which means that it is considered to provide a resource that shall be kept available until all consuming jobs are finished. +This can for example be the socket of a database, a shared memory device, a ramdisk, and so on. +It can even just be a dummy file, and access to the service might happen via a different channel (e.g. a local http port). +Service jobs are expected to not exit after creating that resource, but instead wait until Snakemake terminates them (e.g. via SIGTERM on Unixoid systems). + +Consider the following example: + +.. code-block:: python + + rule the_service: + output: + service("foo.socket") + shell: + # here we simulate some kind of server process that provides data via a socket + "ln -s /dev/random {output}; sleep 10000" + + + rule consumer1: + input: + "foo.socket" + output: + "test.txt" + shell: + "head -n1 {input} > {output}" + + + rule consumer2: + input: + "foo.socket" + output: + "test2.txt" + shell: + "head -n1 {input} > {output}" + +Snakemake will schedule the service with all consumers to the same physical node (in the future we might provide further controls and other modes of operation). +Once all consumer jobs are finished, the service job will be terminated automatically by Snakemake, and the service output will be removed. + .. _snakefiles-paramspace: Parameter space exploration diff --git a/snakemake/dag.py b/snakemake/dag.py index eecdd9023..48b1d6437 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -494,7 +494,7 @@ def check_and_touch_output( expanded_output, latency_wait=wait, force_stay_on_remote=force_stay_on_remote, - ignore_pipe=True, + ignore_pipe_or_service=True, ) except IOError as e: raise MissingOutputException( @@ -1215,14 +1215,14 @@ def postprocess(self, update_needrun=True): if update_needrun: self.update_needrun() self.update_priority() - self.handle_pipes() + self.handle_pipes_and_services() self.update_groups() self.update_ready() self.close_remote_objects() self.update_checkpoint_outputs() - def handle_pipes(self): - """Use pipes to determine job groups. Check if every pipe has exactly + def handle_pipes_and_services(self): + """Use pipes and services to determine job groups. Check if every pipe has exactly one consumer""" visited = set() @@ -1231,9 +1231,11 @@ def handle_pipes(self): if job.group is not None: candidate_groups.add(job.group) all_depending = set() - has_pipe = False + has_pipe_or_service = False for f in job.output: - if is_flagged(f, "pipe"): + is_pipe = is_flagged(f, "pipe") + is_service = is_flagged(f, "service") + if is_pipe or is_service: if job.is_run: raise WorkflowError( "Rule defines pipe output but " @@ -1244,11 +1246,11 @@ def handle_pipes(self): rule=job.rule, ) - has_pipe = True + has_pipe_or_service = True depending = [ j for j, files in self.depending[job].items() if f in files ] - if len(depending) > 1: + if is_pipe and len(depending) > 1: raise WorkflowError( "Output file {} is marked as pipe " "but more than one job depends on " @@ -1259,40 +1261,40 @@ def handle_pipes(self): ) elif len(depending) == 0: raise WorkflowError( - "Output file {} is marked as pipe " + "Output file {} is marked as pipe or service " "but it has no consumer. This is " "invalid because it can lead to " "a dead lock.".format(f), rule=job.rule, ) - depending = depending[0] - - if depending.is_run: - raise WorkflowError( - "Rule consumes pipe input but " - "uses a 'run' directive. This is " - "not possible for technical " - "reasons. Consider using 'shell' or " - "'script'.", - rule=job.rule, - ) + for dep in depending: + if dep.is_run: + raise WorkflowError( + "Rule consumes pipe or service input but " + "uses a 'run' directive. This is " + "not possible for technical " + "reasons. Consider using 'shell' or " + "'script'.", + rule=job.rule, + ) - all_depending.add(depending) - if depending.group is not None: - candidate_groups.add(depending.group) - if not has_pipe: + all_depending.add(dep) + if dep.group is not None: + candidate_groups.add(dep.group) + if not has_pipe_or_service: continue if len(candidate_groups) > 1: if all(isinstance(group, CandidateGroup) for group in candidate_groups): + # all candidates are newly created groups, merge them into one group = candidate_groups.pop() for g in candidate_groups: group.merge(g) else: raise WorkflowError( "An output file is marked as " - "pipe, but consuming jobs " + "pipe or service, but consuming jobs " "are part of conflicting " "groups.", rule=job.rule, diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index 8fe0ebb96..97067cc72 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -540,16 +540,17 @@ def run_single_job(self, job): return future def run_group_job(self, job): - """Run a pipe group job. + """Run a pipe or service group job. This lets all items run simultaneously.""" - # we only have to consider pipe groups because in local running mode, + # we only have to consider pipe or service groups because in local running mode, # these are the only groups that will occur futures = [self.run_single_job(j) for j in job] + n_non_service = sum(1 for j in job if not j.is_service) while True: - k = 0 + n_finished = 0 for f in futures: if f.done(): ex = f.exception() @@ -561,8 +562,19 @@ def run_group_job(self, job): shell.kill(j.jobid) raise ex else: - k += 1 - if k == len(futures): + n_finished += 1 + if n_finished >= n_non_service: + # terminate all service jobs since all consumers are done + for j in job: + if j.is_service: + logger.info( + f"Terminating service job {j.jobid} since all consuming jobs are finished." + ) + shell.terminate(j.jobid) + logger.info( + f"Service job {j.jobid} has been successfully terminated." + ) + return time.sleep(1) diff --git a/snakemake/io.py b/snakemake/io.py index 63e7250c1..03c3ae6d1 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -790,7 +790,7 @@ def __hash__(self): def wait_for_files( - files, latency_wait=3, force_stay_on_remote=False, ignore_pipe=False + files, latency_wait=3, force_stay_on_remote=False, ignore_pipe_or_service=False ): """Wait for given files to be present in the filesystem.""" files = list(files) @@ -807,7 +807,10 @@ def get_missing(): and (force_stay_on_remote or f.should_stay_on_remote) ) else os.path.exists(f) - if not (is_flagged(f, "pipe") and ignore_pipe) + if not ( + (is_flagged(f, "pipe") or is_flagged(f, "service")) + and ignore_pipe_or_service + ) else True ) ] @@ -1018,6 +1021,14 @@ def pipe(value): return flag(value, "pipe", not ON_WINDOWS) +def service(value): + if is_flagged(value, "protected"): + raise SyntaxError("Pipes may not be protected.") + if is_flagged(value, "remote"): + raise SyntaxError("Pipes may not be remote files.") + return flag(value, "service") + + def temporary(value): """An alias for temp.""" return temp(value) diff --git a/snakemake/jobs.py b/snakemake/jobs.py index 12f721d24..7093c43fc 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -38,9 +38,11 @@ def format_files(job, io, dynamicio): for f in io: if f in dynamicio: - yield "{} (dynamic)".format(f.format_dynamic()) + yield f"{f.format_dynamic()} (dynamic)" elif is_flagged(f, "pipe"): - yield "{} (pipe)".format(f) + yield f"{f} (pipe)" + elif is_flagged(f, "service"): + yield f"{f} (service)" elif is_flagged(f, "checkpoint_target"): yield TBDString() else: @@ -186,7 +188,6 @@ def __init__( self._attempt = self.dag.workflow.attempt # TODO get rid of these - self.pipe_output = set(f for f in self.output if is_flagged(f, "pipe")) self.dynamic_output, self.dynamic_input = set(), set() self.temp_output, self.protected_output = set(), set() self.touch_output = set() @@ -466,7 +467,11 @@ def is_run(self): @property def is_pipe(self): - return any([is_flagged(o, "pipe") for o in self.output]) + return any(is_flagged(o, "pipe") for o in self.output) + + @property + def is_service(self): + return any(is_flagged(o, "service") for o in self.output) @property def expanded_output(self): @@ -562,9 +567,9 @@ def output_mintime(self): def missing_output(self, requested): def handle_file(f): - # pipe output is always declared as missing + # pipe or service output is always declared as missing # (even if it might be present on disk for some reason) - if f in self.pipe_output or not f.exists: + if is_flagged(f, "pipe") or is_flagged(f, "service") or not f.exists: yield f if self.dynamic_output: @@ -1256,7 +1261,9 @@ def check_string_resource(res, value1, value2): self._resources = defaultdict(int) self._resources["_nodes"] = 1 - pipe_group = any([job.is_pipe for job in self.jobs]) + pipe_or_service_group = any( + [job.is_pipe or job.is_service for job in self.jobs] + ) # iterate over siblings that can be executed in parallel for siblings in self.toposorted: sibling_resources = defaultdict(int) @@ -1287,7 +1294,7 @@ def check_string_resource(res, value1, value2): for res, value in sibling_resources.items(): if isinstance(value, int): if res != "_nodes": - if self.dag.workflow.run_local or pipe_group: + if self.dag.workflow.run_local or pipe_or_service_group: # in case of local execution, this must be a # group of jobs that are connected with pipes # and have to run simultaneously @@ -1375,11 +1382,11 @@ def postprocess(self, error=False, **kwargs): if not error: for job in self.jobs: self.dag.handle_temp(job) - # remove all pipe outputs since all jobs of this group are done and the - # pipes are no longer needed + # remove all pipe and service outputs since all jobs of this group are done and the + # outputs are no longer needed for job in self.jobs: for f in job.output: - if is_flagged(f, "pipe"): + if is_flagged(f, "pipe") or is_flagged(f, "service"): f.remove() @property @@ -1510,6 +1517,7 @@ class Reason: "nooutput", "derived", "pipe", + "service", "target", "finished", ] @@ -1525,6 +1533,7 @@ def __init__(self): self.nooutput = False self.derived = True self.pipe = False + self.service = False @lazy_property def updated_input(self): @@ -1582,6 +1591,14 @@ def __str__(self): ", ".join(self.updated_input_run) ) ) + if self.pipe: + s.append( + "Output file is a pipe and has to be filled for consuming job." + ) + if self.service: + s.append( + "Job provides a service which has to be kept active until all consumers are finished." + ) s = "; ".join(s) if self.finished: return "Finished (was: {s})".format(s=s) @@ -1596,5 +1613,6 @@ def __bool__(self): or self.noio or self.nooutput or self.pipe + or self.service ) return v and not self.finished diff --git a/snakemake/rules.py b/snakemake/rules.py index 119b5b954..d976ba8d6 100644 --- a/snakemake/rules.py +++ b/snakemake/rules.py @@ -531,6 +531,7 @@ def _set_inoutput_item(self, item, output=False, name=None): "directory", "touch", "pipe", + "service", ]: logger.warning( "The flag '{}' used in rule {} is only valid for outputs, not inputs.".format( diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index 1b3ac8669..64c4d6868 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -911,7 +911,7 @@ def calc_resource(self, name, value): raise WorkflowError( "Job needs {name}={res} but only {name}={gres} " "are available. This is likely because two " - "jobs are connected via a pipe and have to run " + "jobs are connected via a pipe or a service output and have to run " "simultaneously. Consider providing more " "resources (e.g. via --cores).".format(name=name, res=value, gres=gres) ) diff --git a/snakemake/shell.py b/snakemake/shell.py index ec0989151..d5d981216 100644 --- a/snakemake/shell.py +++ b/snakemake/shell.py @@ -114,6 +114,13 @@ def kill(cls, jobid): cls._processes[jobid].kill() del cls._processes[jobid] + @classmethod + def terminate(cls, jobid): + with cls._lock: + if jobid in cls._processes: + cls._processes[jobid].terminate() + del cls._processes[jobid] + @classmethod def cleanup(cls): with cls._lock: diff --git a/snakemake/workflow.py b/snakemake/workflow.py index 5834e458f..431c600e6 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -51,6 +51,7 @@ unpack, local, pipe, + service, repeat, report, multiext, diff --git a/test.socket b/test.socket new file mode 120000 index 000000000..b9251ec6a --- /dev/null +++ b/test.socket @@ -0,0 +1 @@ +/dev/random \ No newline at end of file diff --git a/tests/test_service_jobs/Snakefile b/tests/test_service_jobs/Snakefile new file mode 100644 index 000000000..4b8b13fb6 --- /dev/null +++ b/tests/test_service_jobs/Snakefile @@ -0,0 +1,29 @@ +rule all: + input: + "test.txt", + "test2.txt", + + +rule a: + output: + service("foo.socket") + shell: + "ln -s /dev/random {output}; sleep 10000" + + +rule b: + input: + "foo.socket" + output: + "test.txt" + shell: + "head -n1 {input} > {output}" + + +rule c: + input: + "foo.socket" + output: + "test2.txt" + shell: + "head -n1 {input} > {output}" \ No newline at end of file diff --git a/tests/test_service_jobs/expected-results/test.txt b/tests/test_service_jobs/expected-results/test.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_service_jobs/expected-results/test.txt @@ -0,0 +1 @@ +test diff --git a/tests/test_service_jobs/expected-results/test2.txt b/tests/test_service_jobs/expected-results/test2.txt new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/test_service_jobs/expected-results/test2.txt @@ -0,0 +1 @@ +test diff --git a/tests/tests.py b/tests/tests.py index 547889599..8bf56b87a 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1503,3 +1503,8 @@ def test_peppy(): def test_template_engine(): run(dpath("test_template_engine")) + + +@skip_on_windows +def test_service_jobs(): + run(dpath("test_service_jobs"), check_md5=False)