Skip to content

Commit

Permalink
feat: Group resources (#1218)
Browse files Browse the repository at this point in the history
* Add .python-version to .gitignore

.python-version is used by pyenv to manage local python versions

* Track what specific pipe group each job belongs to

* Adds a pipe_group property to jobs that tracks which pipe group the job belongs to, if any. None if not involved with pipes, otherwise, it equals the CandidateGroup generated when grouping the pipe.

* If the job was not part of a user-defined group, the ordinary group property will still be set to CandidateGroup, as before.

* Toposort puts  pipe jobs at same level

* Jobs connected by a pipe now do not depend on each other, but inherit each other's dependencies. This properly ensures toposort() puts them at the same level.

* Ensure item is correct type for CandidateGroup eq

* __eq__ in CandidateGroup failed when comparing with a different type of object. This corrects this by inserting a type check

* Group resource constraint calculations

Computes group-resource requirements on the cluster by taking
user-provided constraints into account. Jobs that could be run in
parallel will now be run in series if not enough resources (e.g cores or
memory) are available. Runtime is now treated as a special resource, and
will be accurately calculated for parallel jobs via max() or for series
jobs via sum()

Logic is implemented in a series of private methods in GroupJob

Pipe groups are treated as a single unit when calculating resources. Runtime is
summed over pipe groups instead of "maxed". Cores are merged via max()

* Fix sorting for pipe job scheduling.

The toposorting within GroupJob was updated to put all pipe jobs
at the same toposort level so that accurate resource requirements
could be calculated. This broke the scheduler, however, which
relied on pipe jobs within the same group being in an hierarchical
order. This adds an additional toposort to GroupJob.__iter__
specifically for pipe jobs.

This also refactors the toposorting code to a less complex form

* Stop constraining resources for submitted jobs

Add a new property "local_resources" to Job and GroupJob and change all references to
job.resources in the scheduler to job.local_resources. local_resources
is normally identical to resources, unless job.is_local == False (i.e
the job is submitted to a cluster), in which case local_resources will
only include "_cores" and "_nodes".

GroupJob.resources now outputs Resources object instead of dict.

* Add resource constraints to cluster job submission

Include the --resources flag on the snakemake call made in the child
job.

Does not send any runtime information, as this falsly constrains jobs
within the cluster.

Cores is still set to "all" for regular cluster submission, leaving the
cluster to manage how many cores the job is entitled to

* Add workaround for #1550 in tests

The new auto `--default-resources` was causing test failures, so we
manually set the default mem_mb to 0.

Ran formatting with black

* Implement resource scopes for remote jobs

Resources can now be set as local or global, either using the Snakefile
directive "resource_scope" or, with higher priority, the CLI argument
"--set-resource-scopes". The scheduler shall only take local resources
into account on a local job, i.e. a job scheduled on a cluster. Global
resources shall always be taken into account. Group jobs that would
exceed global resource allocations shall not be scheduled until other
jobs have finished.

Note that the global/local distinction only has effect in remote jobs,
i.e. cluster, compute nodes, etc. In local executions, all jobs are
local.

* Update documentation for resource scopes

Add resource scope info and clean up/organize resource section

* Unify toposort functions

Toposort was seperately implemented both in dag and in jobs. This moves
the jobs version into dag. Pipe group jobs are immediately sorted in
dependency order when toposort is calculated, instead of each time
__iter__() is called on a group job

* Ensure resources passed to subcmds are digits

Previously, the check was merely if the resource was an int, but some
int subclasses are not represented as digits, especially bool, which is
represented by the strings True and False. This is corrected by checking
if the stringified int looks like an int (as opposed to `True` or
`False`)

* Remove special merging rules for pipejobs

Also refactors _simple_merge into a more generic format

* Refactor group job calculation into new class

Moves all relevant code into the GroupResources class in the resources
module.

Remove the hardcoding of "runtime" from the logic and isolate it into
two variables: additive_resources, which determines which resources are
summed across layers, and sortby, which determines sorting priority when
layering.

GroupResources is changed a UserDict containing a class method
'basic_layered'. This opens the possibility for new group resource
merging methods to be added later.

* Refactor resource scopes into self-contained class

Moves the logic for resource scope management into a single class

Adds an 'excluded' scope for resources not be sent child snakemake
processes. This undocumented feature will currently be used to prevent
runtime from being sent to cluster jobs

* Pass on resource limits to run-directive jobs

* Make too-much-group-resource error more generic

Remove hardcoded mention of runtime, instead referring to the
additive_resources

Move some of the error logic into it's own function

* Fix group_job_pipe test failures on low core envs

Some environments (like gh actions) don't have enough cores to run the
subprocess in test_group_job_resources_with_pipe. We fix this by
patching RealExecutor.get_job_args, which instructs the executor how
many cores to request

* Refactor basic_layered and add docstring

Common operations have been refactored into seperate methods to simplify
the logic.

* Clarify snakemake version scope feature started

* Add link to scopes explanation in cli help

* Test approp inclusion of resources in submission

Ensure that excluded resources like runtime are not included in the job
submit script
  • Loading branch information
pvandyken committed Jul 27, 2022
1 parent 225e68c commit a8014d0
Show file tree
Hide file tree
Showing 25 changed files with 1,360 additions and 266 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -11,6 +11,7 @@ dist/
.snakemake*
.venv
.venv/*
.python-version


.idea
Expand Down
18 changes: 12 additions & 6 deletions docs/executing/grouping.rst
Expand Up @@ -11,7 +11,12 @@ When executing locally, group definitions are ignored.
Groups can be defined along with the workflow definition via the ``group`` keyword, see :ref:`snakefiles-grouping`.
This way, queueing and execution time can be saved, in particular by attaching short-running downstream jobs to long running upstream jobs.

However, often the benefit grouping should be heavily dependent on the specifics of the underlying computing platform.
Snakemake will request resources for groups by summing across jobs that can be run in parallel, and taking the max of jobs run in series.
The only exception is ``runtime``, where the max will be taken over parallel jobs, and the sum over series.
If resource contraints are provided (via ``--resources`` or ``--cores``), parallel job layers that exceed the constraints will be stacked in series.
For example, if 6 instances of ``somerule`` are being run, each instance requires ``1000MB`` of memory and ``30 min`` runtime, and only ``3000MB`` are available, Snakemake will request ``3000MB`` and ``60 min`` runtime, enough to run 3 instances of ``somerule``, then another 3 instances of ``somerule`` in series.

Often, the ideal group will be dependent on the specifics of the underlying computing platform.
Hence, it is possible to assign groups via the command line.
For example, with

Expand All @@ -20,15 +25,16 @@ For example, with
snakemake --groups somerule=group0 someotherrule=group0
we assign the two rules ``somerule`` and ``someotherrule`` to the same group ``group0``.
by default, groups do not span disconnected parts of the DAG.
Here, this means that by default only jobs of ``somerule`` and ``someotherrule`` end in the same group that are directly connected.
It is however possible to configure the number of connected DAG components that are spanned by a group via the flag ``--group-components``.
This way, it is e.g. possible to define batches of jobs of the same kind that shall be executed within one group, e.g.

By default, groups do not span disconnected parts of the DAG.
This means that, for example, jobs of ``somerule`` and ``someotherrule`` only end in the same group if they are directly connected.
It is, however, possible to configure the number of connected DAG components that are spanned by a group via the flag ``--group-components``.
This makes it possible to define batches of jobs of the same kind that shall be executed within one group. For instance:


.. code-block:: bash
snakemake --groups somerule=group0 --group-components group0=5
means that given that there exist ``n`` jobs spawned from rule ``somerule``, Snakemake will create ``n / 5`` groups which each execute 5 jobs of ``somerule`` together.
means that given ``n`` jobs spawned from rule ``somerule``, Snakemake will create ``n / 5`` groups which each execute 5 jobs of ``somerule`` together.
For example, with 10 jobs from ``somerule`` you would end up with 2 groups of 5 jobs that are submitted as one piece each.
75 changes: 63 additions & 12 deletions docs/snakefiles/rules.rst
Expand Up @@ -340,16 +340,14 @@ If limits for the resources are given via the command line, e.g.
the scheduler will ensure that the given resources are not exceeded by running jobs.
Resources are always meant to be specified as total per job, not by thread (i.e. above ``mem_mb=100`` in rule ``a`` means that any job from rule ``a`` will require ``100`` megabytes of memory in total, and not per thread).

In general, resources are just names to the Snakemake scheduler, i.e., Snakemake does not check whether a job exceeds a certain resource.
However, resources are used to determine which jobs can be executed at a time while not exceeding the given limits at the command line.
If no limits are given, the resources are ignored in local execution.
In cluster or cloud execution, resources are always passed to the backend, even if ``--resources`` is not specified.
In general, resources are just names to the Snakemake scheduler, i.e., Snakemake does not check on the resource consumption of jobs in real time.
Instead, resources are used to determine which jobs can be executed at the same time without exceeding the limits specified at the command line.
Apart from making Snakemake aware of hybrid-computing architectures (e.g. with a limited number of additional devices like GPUs) this allows us to control scheduling in various ways, e.g. to limit IO-heavy jobs by assigning an artificial IO-resource to them and limiting it via the ``--resources`` flag.
Resources must be ``int`` or ``str`` values. Note that you are free to choose any names for the given resources.

If no limits are given, the resources are ignored in local execution.

Resources can also be callables that return ``int`` or ``str`` values.
The signature of the callable has to be ``callable(wildcards [, input] [, threads] [, attempt])`` (``input``, ``threads``, and ``attempt`` are optional parameters).
Resources can have any arbitrary name, and must be assigned ``int`` or ``str`` values.
They can also be callables that return ``int`` or ``str`` values.
The signature of the callable must be ``callable(wildcards [, input] [, threads] [, attempt])`` (``input``, ``threads``, and ``attempt`` are optional parameters).

The parameter ``attempt`` allows us to adjust resources based on how often the job has been restarted (see :ref:`all_options`, option ``--retries``).
This is handy when executing a Snakemake workflow in a cluster environment, where jobs can e.g. fail because of too limited resources.
Expand Down Expand Up @@ -397,11 +395,21 @@ Both threads and resources can be overwritten upon invocation via `--set-threads
Standard Resources
~~~~~~~~~~~~~~~~~~

There are three **standard resources**, for total memory, disk usage and the temporary directory of a job: ``mem_mb`` and ``disk_mb`` and ``tmpdir``.
The ``tmpdir`` resource automatically leads to setting the TMPDIR variable for shell commands, scripts, wrappers and notebooks.
When defining memory constraints, it is advised to use ``mem_mb``, because some execution modes make direct use of this information (e.g., when using :ref:`Kubernetes <kubernetes>`).
There are four **standard resources**, for total memory, disk usage, runtime, and the temporary directory of a job: ``mem_mb``, ``disk_mb``, ``runtime``, and ``tmpdir``.
All of these resources have specific meanings understood by snakemake and are treated in varying unique ways:

* The ``tmpdir`` resource automatically leads to setting the TMPDIR variable for shell commands, scripts, wrappers and notebooks.

* The ``runtime`` resource indicates how much time a job needs to run, and has a special meaning for cluster and cloud compute jobs.
See :ref:`the section below <resources_remote_execution>` for more information

* ``disk_mb`` and ``mem_mb`` are both locally scoped by default, a fact important for cluster and compute execution.
:ref:`See below <resources_remote_execution>` for more info.
``mem_mb`` also has special meaning for some execution modes (e.g., when using :ref:`Kubernetes <kubernetes>`).

Since it would be cumbersome to define such standard resources them for every rule, you can set default values at
Because of these special meanings, the above names should always be used instead of possible synonyms (e.g. ``tmp``, ``mem``, ``time``, ``temp``, etc).

Since it could be cumbersome to define these standard resources for every rule, you can set default values at
the terminal or in a :ref:`profile <profiles>`.
This works via the command line flag ``--default-resources``, see ``snakemake --help`` for more information.
If those resource definitions are mandatory for a certain execution mode, Snakemake will fail with a hint if they are missing.
Expand All @@ -410,6 +418,48 @@ If ``--default-resources`` are not specified, Snakemake uses ``'mem_mb=max(2*inp
``'disk_mb=max(2*input.size_mb, 1000)'``, and ``'tmpdir=system_tmpdir'``.
The latter points to whatever is the default of the operating system or specified by any of the environment variables ``$TMPDIR``, ``$TEMP``, or ``$TMP`` as outlined `here <https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir>`_.

.. _resources-remote-execution:

Resources and Remote Execution
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

New to Snakemake 7.10

In cluster or cloud execution, resources may represent either a global constraint across all submissions (e.g. number of API calls per second), or a constraint local to each specific job sumbmission (e.g. the amount of memory available on a node).
Snakemake distinguishes between these two types of constraints using **resource scopes**.
By default, ``mem_mb``, ``disk_mb``, and ``threads`` are all considered ``"local"`` resources, meaning specific to individual submissions.
So if a constraint of 16G of memory is given to snakemake (e.g. ``snakemake --resources mem_mb=16000``), each group job will be allowed 16G of memory.
All other resources are considered ``"global"``, meaning they are tracked across all jobs across all submissions.
For example, if ``api_calls`` was limited to 5 and each job scheduled used 1 api call, only 5 jobs would be scheduled at a time, even if more job submissions were available.

These resource scopes may be modified both in the Snakefile and via the CLI parameter ``--set-resource-scopes``.
The CLI parameter takes priority.
Modification in the Snakefile uses the following syntax:

.. code-block:: python
resource_scopes:
gpus="local",
foo="local",
disk_mb="global"
Here, we set both ``gpus`` and ``foo`` as local resources, and we changed ``disk_mb`` from its default to be a ``global`` resource.
These options could be overriden at the command line using:

.. code-block:: bash
snakemake --set-resource-scopes gpus=global disk_mb=local
Resources and Group Jobs
~~~~~~~~~~~~~~~~~~~~~~~~

When submitting :ref:`group jobs <job_grouping>` to the cluster, Snakemake calculates how many resources to request by first determining which component jobs can be run in parallel, and which must be run in series.
For most resources, such as ``mem_mb`` or ``threads``, a sum will be taken across each parallel layer.
The layer requiring the most resource (i.e. ``max()``) will determine the final amount requested.
The only exception is ``runtime``.
For it, ``max()`` will be used within each layer, then the total amount of time across all layers will be summed.
If resource constraints are provided (via ``--resources`` or ``--cores``) Snakemake will prevent group jobs from requesting more than the constraint.
Jobs that could otherwise be run in parallel will be run in series to prevent the violation of resource constraints.



Preemptible Jobs
~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -1662,6 +1712,7 @@ It is possible to combine explicit group definition as above with pipe outputs.
Thereby, pipe jobs can live within, or (automatically) extend existing groups.
However, the two jobs connected by a pipe may not exist in conflicting groups.

As with other groups, Snakemake will automatically calculate the required resources for the group job (see :ref:`resources <snakefiles-resources>`.

.. _snakefiles-service-rules:

Expand Down
72 changes: 62 additions & 10 deletions snakemake/__init__.py
Expand Up @@ -21,13 +21,16 @@

from snakemake.workflow import Workflow
from snakemake.dag import Batch
from snakemake.exceptions import print_exception, WorkflowError
from snakemake.exceptions import ResourceScopesException, print_exception, WorkflowError
from snakemake.logging import setup_logger, logger, SlackLogger, WMSLogger
from snakemake.io import load_configfile, wait_for_files
from snakemake.shell import shell
from snakemake.utils import update_config, available_cpu_count
from snakemake.utils import (
update_config,
available_cpu_count,
)
from snakemake.common import Mode, __version__, MIN_PY_VERSION, get_appdirs
from snakemake.resources import parse_resources, DefaultResources
from snakemake.resources import ResourceScopes, parse_resources, DefaultResources


SNAKEFILE_CHOICES = [
Expand Down Expand Up @@ -58,6 +61,7 @@ def snakemake(
resources=dict(),
overwrite_threads=None,
overwrite_scatter=None,
overwrite_resource_scopes=None,
default_resources=None,
overwrite_resources=None,
config=dict(),
Expand Down Expand Up @@ -567,6 +571,7 @@ def snakemake(
overwrite_scatter=overwrite_scatter,
overwrite_groups=overwrite_groups,
overwrite_resources=overwrite_resources,
overwrite_resource_scopes=overwrite_resource_scopes,
group_components=group_components,
config_args=config_args,
debug=debug,
Expand Down Expand Up @@ -638,6 +643,7 @@ def snakemake(
overwrite_threads=overwrite_threads,
overwrite_scatter=overwrite_scatter,
overwrite_resources=overwrite_resources,
overwrite_resource_scopes=overwrite_resource_scopes,
default_resources=default_resources,
dryrun=dryrun,
touch=touch,
Expand Down Expand Up @@ -872,6 +878,26 @@ def parse_set_scatter(args):
)


def parse_set_resource_scope(args):
err_msg = (
"Invalid resource scopes: entries must be defined as RESOURCE=SCOPE pairs, "
"where SCOPE is either 'local', 'global', or 'excluded'"
)
if args.set_resource_scopes is not None:
try:
return ResourceScopes(
parse_key_value_arg(entry, errmsg=err_msg)
for entry in args.set_resource_scopes
)
except ResourceScopesException as err:
invalid_resources = ", ".join(
f"'{res}={scope}'" for res, scope in err.invalid_resources.items()
)
raise ValueError(f"{err.msg} (got {invalid_resources})")

return ResourceScopes()


def parse_set_ints(arg, errmsg):
assignments = dict()
if arg is not None:
Expand Down Expand Up @@ -1121,8 +1147,11 @@ def get_argument_parser(profile=None):
"Use at most N CPU cores/jobs in parallel. "
"If N is omitted or 'all', the limit is set to the number of "
"available CPU cores. "
"In case of cluster/cloud execution, this argument sets the number of "
"total cores used over all jobs (made available to rules via workflow.cores)."
"In case of cluster/cloud execution, this argument sets the maximum number "
"of cores requested from the cluster or cloud scheduler. (See "
"https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#"
"resources-remote-execution for more info)"
"This number is available to rules via workflow.cores."
),
)
group_exec.add_argument(
Expand Down Expand Up @@ -1156,11 +1185,15 @@ def get_argument_parser(profile=None):
metavar="NAME=INT",
help=(
"Define additional resources that shall constrain the scheduling "
"analogously to threads (see above). A resource is defined as "
"analogously to --cores (see above). A resource is defined as "
"a name and an integer value. E.g. --resources mem_mb=1000. Rules can "
"use resources by defining the resource keyword, e.g. "
"resources: mem_mb=600. If now two rules require 600 of the resource "
"'mem_mb' they won't be run in parallel by the scheduler."
"'mem_mb' they won't be run in parallel by the scheduler. In "
"cluster/cloud mode, this argument will also constrain the amount of "
"resources requested from the server. (See "
"https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#"
"resources-remote-execution for more info)"
),
)
group_exec.add_argument(
Expand All @@ -1175,9 +1208,11 @@ def get_argument_parser(profile=None):
group_exec.add_argument(
"--max-threads",
type=int,
help="Define a global maximum number of threads for any job. This can be helpful in a cluster/cloud setting, "
"when you want to restrict the maximum number of requested threads without modifying the workflow definition "
"or overwriting them invidiually with --set-threads.",
help="Define a global maximum number of threads available to any rule. Rules "
"requesting more threads (via the threads keyword) will have their values "
"reduced to the maximum. This can be useful when you want to restrict the "
"maximum number of threads without modifying the workflow definition or "
"overwriting rules individually with --set-threads.",
)
group_exec.add_argument(
"--set-resources",
Expand All @@ -1197,6 +1232,21 @@ def get_argument_parser(profile=None):
"workflow parallelization. Thereby, SCATTERITEMS has to be a positive integer, and NAME has to be "
"the name of the scattergather process defined via a scattergather directive in the workflow.",
)
group_exec.add_argument(
"--set-resource-scopes",
metavar="RESOURCE=[global|local]",
nargs="+",
help="Overwrite resource scopes. A scope determines how a constraint is "
"reckoned in cluster execution. With RESOURCE=local, a constraint applied to "
"RESOURCE using --resources will be considered the limit for each group "
"submission. With RESOURCE=global, the constraint will apply across all groups "
"cumulatively. By default, only `mem_mb` and `disk_mb` are considered local, "
"all other resources are global. This may be modified in the snakefile using "
"the `resource_scopes:` directive. Note that number of threads, specified via "
"--cores, is always considered local. (See "
"https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#"
"resources-remote-execution for more info)",
)
group_exec.add_argument(
"--default-resources",
"--default-res",
Expand Down Expand Up @@ -2500,6 +2550,7 @@ def adjust_path(f):
batch = parse_batch(args)
overwrite_threads = parse_set_threads(args)
overwrite_resources = parse_set_resources(args)
overwrite_resource_scopes = parse_set_resource_scope(args)

overwrite_scatter = parse_set_scatter(args)

Expand Down Expand Up @@ -2828,6 +2879,7 @@ def open_browser():
overwrite_scatter=overwrite_scatter,
default_resources=default_resources,
overwrite_resources=overwrite_resources,
overwrite_resource_scopes=overwrite_resource_scopes,
config=config,
configfiles=args.configfile,
config_args=args.config,
Expand Down

0 comments on commit a8014d0

Please sign in to comment.