Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: new directive "retries" for annotating the number of times a jo…
…b shall be restarted after a failure (#1649)

* feat: new directive "retries" for annotating the number of times a job shall be restarted after a failure.

* add --retries alias for --restart-times argument

* add testcase

* docs
  • Loading branch information
johanneskoester committed May 11, 2022
1 parent 170c1d9 commit c8d81d0
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 14 deletions.
28 changes: 26 additions & 2 deletions docs/snakefiles/rules.rst
Expand Up @@ -351,9 +351,9 @@ Resources must be ``int`` or ``str`` values. Note that you are free to choose an
Resources can also be callables that return ``int`` or ``str`` values.
The signature of the callable has to be ``callable(wildcards [, input] [, threads] [, attempt])`` (``input``, ``threads``, and ``attempt`` are optional parameters).

The parameter ``attempt`` allows us to adjust resources based on how often the job has been restarted (see :ref:`all_options`, option ``--restart-times``).
The parameter ``attempt`` allows us to adjust resources based on how often the job has been restarted (see :ref:`all_options`, option ``--retries``).
This is handy when executing a Snakemake workflow in a cluster environment, where jobs can e.g. fail because of too limited resources.
When Snakemake is executed with ``--restart-times 3``, it will try to restart a failed job 3 times before it gives up.
When Snakemake is executed with ``--retries 3``, it will try to restart a failed job 3 times before it gives up.
Thereby, the parameter ``attempt`` will contain the current attempt number (starting from ``1``).
This can be used to adjust the required memory as follows

Expand Down Expand Up @@ -1069,6 +1069,30 @@ Consider running with the ``--cleanup-shadow`` argument every now and then
to remove any remaining shadow directories from aborted jobs.
The base shadow directory can be changed with the ``--shadow-prefix`` command line argument.

.. _snakefiles-retries:

Defining retries for fallible rules
-----------------------------------

Sometimes, rules may be expected to fail occasionally.
For example, this can happen when a rule downloads some online resources.
For such cases, it is possible to defined a number of automatic retries for each job from that particular rule via the ``retries`` directive:

.. code-block:: python
rule a:
output:
"test.txt"
retries: 3
shell:
"curl https://some.unreliable.server/test.txt > {output}"
Note that it is also possible to define retries globally (via the ``--retries`` command line option, see :ref:`all_options`).
The local definition of the rule thereby overwrites the global definition.

Importantly the ``retries`` directive is meant to be used for defining platform independent behavior (like adding robustness to above download command).
For dealing with unreliable cluster or cloud systems, you should use the ``--retries`` command line option.

Flag files
----------

Expand Down
16 changes: 13 additions & 3 deletions snakemake/__init__.py
Expand Up @@ -1806,8 +1806,13 @@ def get_argument_parser(profile=None):
group_output.add_argument(
"--quiet",
"-q",
action="store_true",
help="Do not output any progress or rule information.",
nargs="*",
choices=["progress", "rules", "all"],
default=None,
help="Do not output certain information. "
"If used without arguments, do not output any progress or rule "
"information. Defining 'all' results in no information being "
"printed at all.",
)
group_output.add_argument(
"--print-compilation",
Expand Down Expand Up @@ -1936,6 +1941,7 @@ def get_argument_parser(profile=None):
)
group_behavior.add_argument(
"-T",
"--retries",
"--restart-times",
default=0,
type=int,
Expand Down Expand Up @@ -2423,6 +2429,10 @@ def main(argv=None):
parser = get_argument_parser()
args = parser.parse_args(argv)

if args.quiet is not None and len(args.quiet) == 0:
# default case, set quiet to progress and rule
args.quiet = ["progress", "rules"]

if args.profile:
# reparse args while inferring config file from profile
parser = get_argument_parser(args.profile)
Expand Down Expand Up @@ -2900,7 +2910,7 @@ def open_browser():
allowed_rules=args.allowed_rules,
max_jobs_per_second=args.max_jobs_per_second,
max_status_checks_per_second=args.max_status_checks_per_second,
restart_times=args.restart_times,
restart_times=args.retries,
attempt=args.attempt,
force_use_threads=args.force_use_threads,
use_conda=args.use_conda,
Expand Down
2 changes: 1 addition & 1 deletion snakemake/executors/__init__.py
Expand Up @@ -466,7 +466,7 @@ def get_envvar_declarations(self):
return ""

def get_job_args(self, job, **kwargs):
return f"{super().get_job_args(job, **kwargs)} --quiet"
return f"{super().get_job_args(job, **kwargs)} --quiet all"

def run(self, job, callback=None, submit_callback=None, error_callback=None):
super()._run(job)
Expand Down
35 changes: 28 additions & 7 deletions snakemake/logging.py
Expand Up @@ -294,7 +294,7 @@ def __init__(self):
self.printshellcmds = False
self.printreason = False
self.debug_dag = False
self.quiet = False
self.quiet = set()
self.logfile = None
self.last_msg_was_job_info = False
self.mode = Mode.default
Expand Down Expand Up @@ -419,6 +419,9 @@ def d3dag(self, **msg):
msg["level"] = "d3dag"
self.handler(msg)

def is_quiet_about(self, msg_type):
return msg_type in self.quiet or "all" in self.quiet

def text_handler(self, msg):
"""The default snakemake log handler.
Expand All @@ -427,6 +430,9 @@ def text_handler(self, msg):
Args:
msg (dict): the log message dictionary
"""
if self.is_quiet_about("all"):
# do not log anything
return

def job_info(msg):
def format_item(item, omit=None, valueformat=str):
Expand Down Expand Up @@ -476,7 +482,7 @@ def timestamp():

level = msg["level"]

if level == "job_info" and not self.quiet:
if level == "job_info" and not self.is_quiet_about("rules"):
if not self.last_msg_was_job_info:
self.logger.info("")
timestamp()
Expand All @@ -495,7 +501,7 @@ def timestamp():
self.logger.info("")

self.last_msg_was_job_info = True
elif level == "group_info" and not self.quiet:
elif level == "group_info" and not self.is_quiet_about("rules"):
timestamp()
msg = "group job {} (jobs in lexicogr. order):".format(msg["groupid"])
if not self.last_msg_was_job_info:
Expand Down Expand Up @@ -541,19 +547,19 @@ def job_error():
timestamp()
self.logger.error("Error in group job {}:".format(msg["groupid"]))
else:
if level == "info" and not self.quiet:
if level == "info":
self.logger.warning(msg["msg"])
if level == "warning":
self.logger.critical(msg["msg"])
elif level == "error":
self.logger.error(msg["msg"])
elif level == "debug":
self.logger.debug(msg["msg"])
elif level == "resources_info" and not self.quiet:
elif level == "resources_info":
self.logger.warning(msg["msg"])
elif level == "run_info":
self.logger.warning(msg["msg"])
elif level == "progress" and not self.quiet:
elif level == "progress" and not self.is_quiet_about("progress"):
done = msg["done"]
total = msg["total"]
self.logger.info(
Expand All @@ -564,7 +570,7 @@ def job_error():
elif level == "shellcmd":
if self.printshellcmds:
self.logger.warning(indent(msg["msg"]))
elif level == "job_finished" and not self.quiet:
elif level == "job_finished" and not self.is_quiet_about("progress"):
timestamp()
self.logger.info("Finished job {}.".format(msg["jobid"]))
pass
Expand Down Expand Up @@ -659,6 +665,21 @@ def setup_logger(
mode=Mode.default,
show_failed_logs=False,
):
if quiet is None:
# not quiet at all
quiet = set()
elif isinstance(quiet, bool):
if quiet:
quiet = set(["progress", "rules"])
else:
quiet = set()
elif isinstance(quiet, list):
quiet = set(quiet)
else:
raise ValueError(
"Unsupported value provided for quiet mode (either bool, None or list allowed)."
)

logger.log_handler.extend(handler)

# console output only if no custom logger was specified
Expand Down
5 changes: 5 additions & 0 deletions snakemake/parser.py
Expand Up @@ -427,6 +427,10 @@ class Threads(RuleKeywordState):
pass


class Retries(RuleKeywordState):
pass


class Shadow(RuleKeywordState):
pass

Expand Down Expand Up @@ -675,6 +679,7 @@ def args(self):
params=Params,
threads=Threads,
resources=Resources,
retries=Retries,
priority=Priority,
version=Version,
log=Log,
Expand Down
1 change: 1 addition & 0 deletions snakemake/ruleinfo.py
Expand Up @@ -26,6 +26,7 @@ def __init__(self, func=None):
self.shadow_depth = None
self.resources = None
self.priority = None
self.retries = None
self.version = None
self.log = None
self.docstring = None
Expand Down
16 changes: 15 additions & 1 deletion snakemake/workflow.py
Expand Up @@ -1480,6 +1480,14 @@ def decorate(ruleinfo):
"Priority values have to be numeric.", rule=rule
)
rule.priority = ruleinfo.priority

if ruleinfo.retries:
if not isinstance(ruleinfo.retries, int) or ruleinfo.retries < 0:
raise RuleException(
"Retries values have to be integers >= 0", rule=rule
)
rule.restart_times = ruleinfo.retries or self.restart_times

if ruleinfo.version:
rule.version = ruleinfo.version
if ruleinfo.log:
Expand Down Expand Up @@ -1585,7 +1593,6 @@ def decorate(ruleinfo):
rule.wrapper = ruleinfo.wrapper
rule.template_engine = ruleinfo.template_engine
rule.cwl = ruleinfo.cwl
rule.restart_times = self.restart_times
rule.basedir = self.current_basedir

if ruleinfo.handover:
Expand Down Expand Up @@ -1765,6 +1772,13 @@ def decorate(ruleinfo):

return decorate

def retries(self, retries):
def decorate(ruleinfo):
ruleinfo.retries = retries
return ruleinfo

return decorate

def shadow(self, shadow_depth):
def decorate(ruleinfo):
ruleinfo.shadow_depth = shadow_depth
Expand Down
11 changes: 11 additions & 0 deletions tests/test_retries/Snakefile
@@ -0,0 +1,11 @@
rule a:
output:
"test.txt"
resources:
shouldfail=lambda w, attempt: attempt < 3
retries: 3
run:
if resources.shouldfail:
raise ValueError("not enough attempts")
with open(output[0], "w") as out:
print("test", file=out)
1 change: 1 addition & 0 deletions tests/test_retries/expected-results/test.txt
@@ -0,0 +1 @@
test
4 changes: 4 additions & 0 deletions tests/tests.py
Expand Up @@ -1628,3 +1628,7 @@ def test_rule_inheritance_globals():
targets=["foo.txt"],
check_md5=False,
)


def test_retries():
run(dpath("test_retries"))

0 comments on commit c8d81d0

Please sign in to comment.