Skip to content

Commit

Permalink
feat: support for group local jobs by enabling optional groupid consi…
Browse files Browse the repository at this point in the history
…deration in input functions (#1418)

* feat: define grouplocal jobs [WIP]

* todos and docs

* Implement grouplocal jobs via input functions

* fixes, docs, test cases

* windows handling
  • Loading branch information
johanneskoester committed Feb 23, 2022
1 parent a471adb commit 5d45493
Show file tree
Hide file tree
Showing 21 changed files with 321 additions and 68 deletions.
164 changes: 107 additions & 57 deletions docs/snakefiles/rules.rst
Expand Up @@ -1119,8 +1119,8 @@ The following shows an example job submission wrapper:
.. _snakefiles-input_functions:

Functions as Input Files
------------------------
Input functions
---------------

Instead of specifying strings or lists of strings as input files, snakemake can also make use of functions that return single **or** lists of input files:

Expand All @@ -1130,15 +1130,18 @@ Instead of specifying strings or lists of strings as input files, snakemake can
return [... a list of input files depending on given wildcards ...]
rule:
input: myfunc
output: "someoutput.{somewildcard}.txt"
shell: "..."
input:
myfunc
output:
"someoutput.{somewildcard}.txt"
shell:
"..."
The function has to accept a single argument that will be the wildcards object generated from the application of the rule to create some requested output files.
Note that you can also use `lambda expressions <https://docs.python.org/3/tutorial/controlflow.html#lambda-expressions>`_ instead of full function definitions.
By this, rules can have entirely different input files (both in form and number) depending on the inferred wildcards. E.g. you can assign input files that appear in entirely different parts of your filesystem based on some wildcard value and a dictionary that maps the wildcard value to file paths.

Note that the function will be executed when the rule is evaluated and before the workflow actually starts to execute. Further note that using a function as input overrides the default mechanism of replacing wildcards with their values inferred from the output files. You have to take care of that yourself with the given wildcards object.
In additon to a single wildcards argument, input functions can optionally take a ``groupid`` (with exactly that name) as second argument, see :ref:`snakefiles_group-local` for details.

Finally, when implementing the input function, it is best practice to make sure that it can properly handle all possible wildcard values your rule can have.
In particular, input files should not be combined with very general rules that can be applied to create almost any file: Snakemake will try to apply the rule, and will report the exceptions of your input function as errors.
Expand All @@ -1159,9 +1162,12 @@ This can be done by having them return ``dict()`` objects with the names as the
return {'foo': '{wildcards.token}.txt'.format(wildcards=wildcards)}
rule:
input: unpack(myfunc)
output: "someoutput.{token}.txt"
shell: "..."
input:
unpack(myfunc)
output:
"someoutput.{token}.txt"
shell:
"..."
Note that ``unpack()`` is only necessary for input functions returning ``dict``.
While it also works for ``list``, remember that lists (and nested lists) of strings are automatically flattened.
Expand All @@ -1184,54 +1190,10 @@ These restrictions do not apply when using ``unpack()``.
input:
*myfunc1(),
**myfunc2(),
output: "..."
shell: "..."
.. _snakefiles-version_tracking:

Version Tracking
----------------

Rules can specify a version that is tracked by Snakemake together with the output files. When the version changes snakemake informs you when using the flag ``--summary`` or ``--list-version-changes``.
The version can be specified by the version directive, which takes a string:

.. code-block:: python
rule:
input: ...
output: ...
version: "1.0"
shell: ...
The version can of course also be filled with the output of a shell command, e.g.:

.. code-block:: python
SOMECOMMAND_VERSION = subprocess.check_output("somecommand --version", shell=True)
rule:
version: SOMECOMMAND_VERSION
Alternatively, you might want to use file modification times in case of local scripts:

.. code-block:: python
SOMECOMMAND_VERSION = str(os.path.getmtime("path/to/somescript"))
rule:
version: SOMECOMMAND_VERSION
A re-run can be automated by invoking Snakemake as follows:

.. code-block:: console
$ snakemake -R `snakemake --list-version-changes`
With the availability of the ``conda`` directive (see :ref:`integrated_package_management`)
the ``version`` directive has become **obsolete** in favor of defining isolated
software environments that can be automatically deployed via the conda package
manager.

output:
"..."
shell:
"..."
.. _snakefiles-code_tracking:

Expand Down Expand Up @@ -1521,6 +1483,51 @@ This enables to almost arbitrarily partition the DAG, e.g. in order to safe netw

For execution on the cloud using Google Life Science API and preemptible instances, we expect all rules in the group to be homogenously set as preemptible instances (e.g., with command-line option ``--preemptible-rules``), such that a preemptible VM is requested for the execution of the group job.

.. _snakefiles_group-local:

Group-local jobs
~~~~~~~~~~~~~~~~

From Snakemake 7.0 on, it is further possible to ensure that jobs from a certain rule are executed separately within each :ref:`job group <job_grouping>`.
For this purpose we use :ref:`input functions <snakefiles-input_functions>`, which, in addition to the ``wildcards`` argument can expect a ``groupid`` argument.
In such a case, Snakemake passes the ID of the corresponding group job to the input function.
Consider the following example

.. code-block:: python
rule all:
input:
expand("bar{i}.txt", i=range(3))
rule grouplocal:
output:
"foo.{groupid}.txt"
group:
"foo"
shell:
"echo test > {output}"
def get_input(wildcards, groupid):
return f"foo.{groupid}.txt"
rule consumer:
input:
get_input
output:
"bar{i}.txt"
group:
"foo"
shell:
"cp {input} {output}"
Here, the value of ``groupid`` that is passed by Snakemake to the input function is a `UUID <https://en.wikipedia.org/wiki/Universally_unique_identifier>`_ that uniquely identifies the group job in which each instance of the rule ``consumer`` is contained.
In the input function ``get_input`` we use this ID to request the desired input file from the rule ``grouplocal``.
Since the value of the corresponding wildcard ``groupid`` is now always a group specific unique ID, it is ensured that the rule ``grouplocal`` will run for every group job spawned from the group ``foo`` (remember that group jobs by default only span one connected component, and that this can be configured via the command line, see :ref:`job_grouping`).
Of course, above example would also work if the groups are not specified via the rule definition but entirely via the :ref:`command line <job_grouping>`.

.. _snakefiles-piped-output:

Piped output
Expand Down Expand Up @@ -1601,6 +1608,49 @@ Consider the following example:
Snakemake will schedule the service with all consumers to the same physical node (in the future we might provide further controls and other modes of operation).
Once all consumer jobs are finished, the service job will be terminated automatically by Snakemake, and the service output will be removed.

Group-local service jobs
~~~~~~~~~~~~~~~~~~~~~~~~

Since Snakemake supports arbitrary partitioning of the DAG into so-called :ref:`job groups <job-grouping>`, one should consider what this implies for service jobs when running a workflow in a cluster of cloud context:
since each group job spans at least one connected component (see :ref:`job groups <job-grouping>` and `the Snakemake paper <https://doi.org/10.12688/f1000research.29032.2>`), this means that the service job will automatically connect all consumers into one big group.
This can be undesired, because depending on the number of consumers that group job can become too big for efficient execution on the underlying architecture.
In case of local execution, this is not a problem because here DAG partitioning has no effect.

However, to make a workflow portable across different backends, this behavior should always be considered.
In order to circumvent it, it is possible to model service jobs as group-local, i.e. ensuring that each group job gets its own instance of the service rule.
This works by combining the service job pattern from above with the :ref:`group-local pattern <snakefiles_group-local>` as follows:

.. code-block:: python
rule the_service:
output:
service("foo.{groupid}.socket")
shell:
# here we simulate some kind of server process that provides data via a socket
"ln -s /dev/random {output}; sleep 10000"
def get_socket(wildcards, groupid):
return f"foo.{groupid}.socket"
rule consumer1:
input:
get_socket
output:
"test.txt"
shell:
"head -n1 {input} > {output}"
rule consumer2:
input:
get_socket
output:
"test2.txt"
shell:
"head -n1 {input} > {output}"
.. _snakefiles-paramspace:

Parameter space exploration
Expand Down
10 changes: 10 additions & 0 deletions snakemake/__init__.py
Expand Up @@ -183,6 +183,7 @@ def snakemake(
conda_not_block_search_path_envvars=False,
scheduler_solver_path=None,
conda_base_path=None,
local_groupid="local",
):
"""Run snakemake on a given snakefile.
Expand Down Expand Up @@ -313,6 +314,7 @@ def snakemake(
conda_not_block_search_path_envvars (bool): Do not block search path envvars (R_LIBS, PYTHONPATH, ...) when using conda environments.
scheduler_solver_path (str): Path to Snakemake environment (this can be used to e.g. overwrite the search path for the ILP solver used during scheduling).
conda_base_path (str): Path to conda base environment (this can be used to overwrite the search path for conda, mamba, and activate).
local_groupid (str): Local groupid to use as a placeholder for groupid-referrring input functions of local jobs (internal use only, default: local).
log_handler (list): redirect snakemake output to this list of custom log handlers, each a function that takes a log message dictionary (see below) as its only argument (default []). The log message dictionary for the log handler has to following entries:
:level:
Expand Down Expand Up @@ -593,6 +595,7 @@ def snakemake(
conda_base_path=conda_base_path,
check_envvars=not lint, # for linting, we do not need to check whether requested envvars exist
all_temp=all_temp,
local_groupid=local_groupid,
)
success = True

Expand Down Expand Up @@ -706,6 +709,7 @@ def snakemake(
group_components=group_components,
max_inventory_wait_time=max_inventory_wait_time,
conda_not_block_search_path_envvars=conda_not_block_search_path_envvars,
local_groupid=local_groupid,
)
success = workflow.execute(
targets=targets,
Expand Down Expand Up @@ -1906,6 +1910,11 @@ def get_argument_parser(profile=None):
"used. Note that this is intended primarily for internal use and may "
"lead to unexpected results otherwise.",
)
group_behavior.add_argument(
"--local-groupid",
default="local",
help="Name for local groupid, meant for internal use only.",
)
group_behavior.add_argument(
"--max-jobs-per-second",
default=10,
Expand Down Expand Up @@ -2923,6 +2932,7 @@ def open_browser():
conda_not_block_search_path_envvars=args.conda_not_block_search_path_envvars,
scheduler_solver_path=args.scheduler_solver_path,
conda_base_path=args.conda_base_path,
local_groupid=args.local_groupid,
)

if args.runtime_profile:
Expand Down
39 changes: 37 additions & 2 deletions snakemake/dag.py
Expand Up @@ -367,6 +367,9 @@ def check_dynamic(self):
def is_edit_notebook_job(self, job):
return self.workflow.edit_notebook and job.targetfile in self.targetfiles

def get_job_group(self, job):
return self._group.get(job)

@property
def dynamic_output_jobs(self):
"""Iterate over all jobs with dynamic output files."""
Expand Down Expand Up @@ -1157,6 +1160,22 @@ def _update_group_components(self):
for j in primary:
self._group[j] = primary

for group in self._group.values():
group.finalize()

def update_incomplete_input_expand_jobs(self):
"""Update (re-evaluate) all jobs which have incomplete input file expansions.
only filled in the second pass of postprocessing.
"""
updated = False
for job in list(self.jobs):
if job.incomplete_input_expand:
newjob = job.updated()
self.replace_job(job, newjob, recursive=False)
updated = True
return updated

def update_ready(self, jobs=None):
"""Update information whether a job is ready to execute.
Expand All @@ -1178,7 +1197,6 @@ def update_ready(self, jobs=None):
self._ready_jobs.add(job)
else:
group = self._group[job]
group.finalize()
if group not in self._running:
candidate_groups.add(group)

Expand Down Expand Up @@ -1207,7 +1225,9 @@ def close_remote_objects(self):
if not self.needrun(job):
job.close_remote()

def postprocess(self, update_needrun=True):
def postprocess(
self, update_needrun=True, update_incomplete_input_expand_jobs=True
):
"""Postprocess the DAG. This has to be invoked after any change to the
DAG topology."""
self.cleanup()
Expand All @@ -1217,6 +1237,21 @@ def postprocess(self, update_needrun=True):
self.update_priority()
self.handle_pipes_and_services()
self.update_groups()

if update_incomplete_input_expand_jobs:
updated = self.update_incomplete_input_expand_jobs()
if updated:

# run a second pass, some jobs have been updated
# with potentially new input files that have depended
# on group ids.
self.postprocess(
update_needrun=True,
update_incomplete_input_expand_jobs=False,
)

return

self.update_ready()
self.close_remote_objects()
self.update_checkpoint_outputs()
Expand Down
9 changes: 9 additions & 0 deletions snakemake/executors/__init__.py
Expand Up @@ -134,6 +134,9 @@ def fmt(res):
return args
return ""

def get_local_groupid_arg(self):
return f" --local-groupid {self.workflow.local_groupid} "

def get_behavior_args(self):
if self.workflow.conda_not_block_search_path_envvars:
return " --conda-not-block-search-path-envvars "
Expand Down Expand Up @@ -448,6 +451,7 @@ def __init__(
"--latency-wait {latency_wait} ",
self.get_default_remote_provider_args(),
self.get_default_resources_args(),
self.get_local_groupid_arg(),
"{overwrite_workdir} {overwrite_config} {printshellcmds} {rules} ",
"--notemp --quiet --no-hooks --nolock --mode {} ".format(
Mode.subprocess
Expand Down Expand Up @@ -721,6 +725,7 @@ def __init__(
self.exec_job = exec_job

self.exec_job += self.get_additional_args()
self.exec_job += " {job_specific_args:u} "
if not disable_default_remote_provider_args:
self.exec_job += self.get_default_remote_provider_args()
if not disable_get_default_resources_args:
Expand Down Expand Up @@ -819,6 +824,9 @@ def format_job(self, pattern, job, **kwargs):
"--wait-for-files {wait_for_files}",
wait_for_files=[repr(f) for f in wait_for_files],
)
job_specific_args = ""
if job.is_group:
job_specific_args = f"--local-groupid {job.jobid}"

format_p = partial(
self.format_job_pattern,
Expand All @@ -827,6 +835,7 @@ def format_job(self, pattern, job, **kwargs):
latency_wait=self.latency_wait,
waitfiles_parameter=waitfiles_parameter,
scheduler_solver_path=scheduler_solver_path,
job_specific_args=job_specific_args,
**kwargs,
)
try:
Expand Down

0 comments on commit 5d45493

Please sign in to comment.