Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new directive "retries" for annotating the number of times a job shall be restarted after a failure #1649

Merged
merged 4 commits into from May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 26 additions & 2 deletions docs/snakefiles/rules.rst
Expand Up @@ -351,9 +351,9 @@ Resources must be ``int`` or ``str`` values. Note that you are free to choose an
Resources can also be callables that return ``int`` or ``str`` values.
The signature of the callable has to be ``callable(wildcards [, input] [, threads] [, attempt])`` (``input``, ``threads``, and ``attempt`` are optional parameters).

The parameter ``attempt`` allows us to adjust resources based on how often the job has been restarted (see :ref:`all_options`, option ``--restart-times``).
The parameter ``attempt`` allows us to adjust resources based on how often the job has been restarted (see :ref:`all_options`, option ``--retries``).
This is handy when executing a Snakemake workflow in a cluster environment, where jobs can e.g. fail because of too limited resources.
When Snakemake is executed with ``--restart-times 3``, it will try to restart a failed job 3 times before it gives up.
When Snakemake is executed with ``--retries 3``, it will try to restart a failed job 3 times before it gives up.
Thereby, the parameter ``attempt`` will contain the current attempt number (starting from ``1``).
This can be used to adjust the required memory as follows

Expand Down Expand Up @@ -1069,6 +1069,30 @@ Consider running with the ``--cleanup-shadow`` argument every now and then
to remove any remaining shadow directories from aborted jobs.
The base shadow directory can be changed with the ``--shadow-prefix`` command line argument.

.. _snakefiles-retries:

Defining retries for fallible rules
-----------------------------------

Sometimes, rules may be expected to fail occasionally.
For example, this can happen when a rule downloads some online resources.
For such cases, it is possible to defined a number of automatic retries for each job from that particular rule via the ``retries`` directive:

.. code-block:: python

rule a:
output:
"test.txt"
retries: 3
shell:
"curl https://some.unreliable.server/test.txt > {output}"

Note that it is also possible to define retries globally (via the ``--retries`` command line option, see :ref:`all_options`).
The local definition of the rule thereby overwrites the global definition.

Importantly the ``retries`` directive is meant to be used for defining platform independent behavior (like adding robustness to above download command).
For dealing with unreliable cluster or cloud systems, you should use the ``--retries`` command line option.

Flag files
----------

Expand Down
16 changes: 13 additions & 3 deletions snakemake/__init__.py
Expand Up @@ -1806,8 +1806,13 @@ def get_argument_parser(profile=None):
group_output.add_argument(
"--quiet",
"-q",
action="store_true",
help="Do not output any progress or rule information.",
nargs="*",
choices=["progress", "rules", "all"],
default=None,
help="Do not output certain information. "
"If used without arguments, do not output any progress or rule "
"information. Defining 'all' results in no information being "
"printed at all.",
)
group_output.add_argument(
"--print-compilation",
Expand Down Expand Up @@ -1936,6 +1941,7 @@ def get_argument_parser(profile=None):
)
group_behavior.add_argument(
"-T",
"--retries",
"--restart-times",
default=0,
type=int,
Expand Down Expand Up @@ -2423,6 +2429,10 @@ def main(argv=None):
parser = get_argument_parser()
args = parser.parse_args(argv)

if args.quiet is not None and len(args.quiet) == 0:
# default case, set quiet to progress and rule
args.quiet = ["progress", "rules"]

if args.profile:
# reparse args while inferring config file from profile
parser = get_argument_parser(args.profile)
Expand Down Expand Up @@ -2900,7 +2910,7 @@ def open_browser():
allowed_rules=args.allowed_rules,
max_jobs_per_second=args.max_jobs_per_second,
max_status_checks_per_second=args.max_status_checks_per_second,
restart_times=args.restart_times,
restart_times=args.retries,
attempt=args.attempt,
force_use_threads=args.force_use_threads,
use_conda=args.use_conda,
Expand Down
2 changes: 1 addition & 1 deletion snakemake/executors/__init__.py
Expand Up @@ -466,7 +466,7 @@ def get_envvar_declarations(self):
return ""

def get_job_args(self, job, **kwargs):
return f"{super().get_job_args(job, **kwargs)} --quiet"
return f"{super().get_job_args(job, **kwargs)} --quiet all"

def run(self, job, callback=None, submit_callback=None, error_callback=None):
super()._run(job)
Expand Down
35 changes: 28 additions & 7 deletions snakemake/logging.py
Expand Up @@ -294,7 +294,7 @@ def __init__(self):
self.printshellcmds = False
self.printreason = False
self.debug_dag = False
self.quiet = False
self.quiet = set()
self.logfile = None
self.last_msg_was_job_info = False
self.mode = Mode.default
Expand Down Expand Up @@ -419,6 +419,9 @@ def d3dag(self, **msg):
msg["level"] = "d3dag"
self.handler(msg)

def is_quiet_about(self, msg_type):
return msg_type in self.quiet or "all" in self.quiet

def text_handler(self, msg):
"""The default snakemake log handler.

Expand All @@ -427,6 +430,9 @@ def text_handler(self, msg):
Args:
msg (dict): the log message dictionary
"""
if self.is_quiet_about("all"):
# do not log anything
return

def job_info(msg):
def format_item(item, omit=None, valueformat=str):
Expand Down Expand Up @@ -476,7 +482,7 @@ def timestamp():

level = msg["level"]

if level == "job_info" and not self.quiet:
if level == "job_info" and not self.is_quiet_about("rules"):
if not self.last_msg_was_job_info:
self.logger.info("")
timestamp()
Expand All @@ -495,7 +501,7 @@ def timestamp():
self.logger.info("")

self.last_msg_was_job_info = True
elif level == "group_info" and not self.quiet:
elif level == "group_info" and not self.is_quiet_about("rules"):
timestamp()
msg = "group job {} (jobs in lexicogr. order):".format(msg["groupid"])
if not self.last_msg_was_job_info:
Expand Down Expand Up @@ -541,19 +547,19 @@ def job_error():
timestamp()
self.logger.error("Error in group job {}:".format(msg["groupid"]))
else:
if level == "info" and not self.quiet:
if level == "info":
self.logger.warning(msg["msg"])
if level == "warning":
self.logger.critical(msg["msg"])
elif level == "error":
self.logger.error(msg["msg"])
elif level == "debug":
self.logger.debug(msg["msg"])
elif level == "resources_info" and not self.quiet:
elif level == "resources_info":
self.logger.warning(msg["msg"])
elif level == "run_info":
self.logger.warning(msg["msg"])
elif level == "progress" and not self.quiet:
elif level == "progress" and not self.is_quiet_about("progress"):
done = msg["done"]
total = msg["total"]
self.logger.info(
Expand All @@ -564,7 +570,7 @@ def job_error():
elif level == "shellcmd":
if self.printshellcmds:
self.logger.warning(indent(msg["msg"]))
elif level == "job_finished" and not self.quiet:
elif level == "job_finished" and not self.is_quiet_about("progress"):
timestamp()
self.logger.info("Finished job {}.".format(msg["jobid"]))
pass
Expand Down Expand Up @@ -659,6 +665,21 @@ def setup_logger(
mode=Mode.default,
show_failed_logs=False,
):
if quiet is None:
# not quiet at all
quiet = set()
elif isinstance(quiet, bool):
if quiet:
quiet = set(["progress", "rules"])
else:
quiet = set()
elif isinstance(quiet, list):
quiet = set(quiet)
else:
raise ValueError(
"Unsupported value provided for quiet mode (either bool, None or list allowed)."
)

logger.log_handler.extend(handler)

# console output only if no custom logger was specified
Expand Down
5 changes: 5 additions & 0 deletions snakemake/parser.py
Expand Up @@ -427,6 +427,10 @@ class Threads(RuleKeywordState):
pass


class Retries(RuleKeywordState):
pass


class Shadow(RuleKeywordState):
pass

Expand Down Expand Up @@ -675,6 +679,7 @@ def args(self):
params=Params,
threads=Threads,
resources=Resources,
retries=Retries,
priority=Priority,
version=Version,
log=Log,
Expand Down
1 change: 1 addition & 0 deletions snakemake/ruleinfo.py
Expand Up @@ -26,6 +26,7 @@ def __init__(self, func=None):
self.shadow_depth = None
self.resources = None
self.priority = None
self.retries = None
self.version = None
self.log = None
self.docstring = None
Expand Down
16 changes: 15 additions & 1 deletion snakemake/workflow.py
Expand Up @@ -1480,6 +1480,14 @@ def decorate(ruleinfo):
"Priority values have to be numeric.", rule=rule
)
rule.priority = ruleinfo.priority

if ruleinfo.retries:
if not isinstance(ruleinfo.retries, int) or ruleinfo.retries < 0:
raise RuleException(
"Retries values have to be integers >= 0", rule=rule
)
rule.restart_times = ruleinfo.retries or self.restart_times

if ruleinfo.version:
rule.version = ruleinfo.version
if ruleinfo.log:
Expand Down Expand Up @@ -1585,7 +1593,6 @@ def decorate(ruleinfo):
rule.wrapper = ruleinfo.wrapper
rule.template_engine = ruleinfo.template_engine
rule.cwl = ruleinfo.cwl
rule.restart_times = self.restart_times
rule.basedir = self.current_basedir

if ruleinfo.handover:
Expand Down Expand Up @@ -1765,6 +1772,13 @@ def decorate(ruleinfo):

return decorate

def retries(self, retries):
def decorate(ruleinfo):
ruleinfo.retries = retries
return ruleinfo

return decorate

def shadow(self, shadow_depth):
def decorate(ruleinfo):
ruleinfo.shadow_depth = shadow_depth
Expand Down
11 changes: 11 additions & 0 deletions tests/test_retries/Snakefile
@@ -0,0 +1,11 @@
rule a:
output:
"test.txt"
resources:
shouldfail=lambda w, attempt: attempt < 3
retries: 3
run:
if resources.shouldfail:
raise ValueError("not enough attempts")
with open(output[0], "w") as out:
print("test", file=out)
1 change: 1 addition & 0 deletions tests/test_retries/expected-results/test.txt
@@ -0,0 +1 @@
test
4 changes: 4 additions & 0 deletions tests/tests.py
Expand Up @@ -1628,3 +1628,7 @@ def test_rule_inheritance_globals():
targets=["foo.txt"],
check_md5=False,
)


def test_retries():
run(dpath("test_retries"))