Skip to content

Commit

Permalink
feat: automatically rerun jobs if parameters, code, input file set, o…
Browse files Browse the repository at this point in the history
…r software stack changed (thanks to @cclienty and @timtroendle). This also increases performance of DAG building by handling job "needrun" updates level wise, while avoiding to perform a full check for those jobs that are already downstream of a job that has been determined to require a rerun. (#1663)

* feat: automatically rerun jobs if parameters, code, input file set, or software stack changed

* fix script modification date check

* add command line parameters for setting rerun triggers

* fix

* fix msg

* always print reason

* help message

* remove check for non mtime changes since that is now handled by --rerun-triggers

* speedup of needrun determination

* remove traces of old modification time only rerun triggering

* docs
  • Loading branch information
johanneskoester committed May 24, 2022
1 parent 28a4795 commit 4c11893
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 48 deletions.
20 changes: 7 additions & 13 deletions docs/project_info/faq.rst
Expand Up @@ -473,17 +473,6 @@ Per default, Snakemake will lock a working directory by output and input files.
With the command line option ``--nolock``, you can disable this mechanism on your own risk. With ``--unlock``, you can be remove a stale lock. Stale locks can appear if your machine is powered off with a running Snakemake instance.


Snakemake does not trigger re-runs if I add additional input files. What can I do?
----------------------------------------------------------------------------------

Snakemake has a kind of "lazy" policy about added input files if their modification date is older than that of the output files. One reason is that information cannot be inferred just from the input and output files. You need additional information about the last run to be stored. Since behaviour would be inconsistent between cases where that information is available and where it is not, this functionality has been encoded as an extra switch. To trigger updates for jobs with changed input files, you can use the command line argument ``--list-input-changes`` in the following way:

.. code-block:: console
$ snakemake -n -R `snakemake --list-input-changes`
Here, ``snakemake --list-input-changes`` returns the list of output files with changed input files, which is fed into ``-R`` to trigger a re-run.


How do I trigger re-runs for rules with updated code or parameters?
-------------------------------------------------------------------
Expand Down Expand Up @@ -541,7 +530,11 @@ If you are just interested in the final summary, you can use the ``--quiet`` fla
Git is messing up the modification times of my input files, what can I do?
--------------------------------------------------------------------------

When you checkout a git repository, the modification times of updated files are set to the time of the checkout. If you rely on these files as input **and** output files in your workflow, this can cause trouble. For example, Snakemake could think that a certain (git-tracked) output has to be re-executed, just because its input has been checked out a bit later. In such cases, it is advisable to set the file modification dates to the last commit date after an update has been pulled. One solution is to add the following lines to your ``.bashrc`` (or similar):
When you checkout a git repository, the modification times of updated files are set to the time of the checkout.
If you rely on these files as input **and** output files in your workflow, this can cause trouble.
For example, Snakemake could think that a certain (git-tracked) output has to be re-executed, just because its input has been checked out a bit later.
In such cases, it is advisable to set the file modification dates to the last commit date after an update has been pulled.
One solution is to add the following lines to your ``.bashrc`` (or similar):

.. code-block:: bash
Expand All @@ -557,7 +550,8 @@ When you checkout a git repository, the modification times of updated files are
done
}
(inspired by the answer `here <https://stackoverflow.com/questions/2458042/restore-files-modification-time-in-git/22638823#22638823>`_). You can then run ``gitmodtimes`` to update the modification times of all tracked files on the current branch to their last commit time in git; BE CAREFUL--this does not account for local changes that have not been commited.
(inspired by the answer `here <https://stackoverflow.com/questions/2458042/restore-files-modification-time-in-git/22638823#22638823>`_).
You can then run ``gitmodtimes`` to update the modification times of all tracked files on the current branch to their last commit time in git; BE CAREFUL--this does not account for local changes that have not been commited.

How do I exit a running Snakemake workflow?
-------------------------------------------
Expand Down
10 changes: 9 additions & 1 deletion docs/snakefiles/rules.rst
Expand Up @@ -1004,7 +1004,15 @@ Further, an output file marked as ``temp`` is deleted after all rules that use i
Directories as outputs
----------------------

Sometimes it can be convenient to have directories, rather than files, as outputs of a rule. As of version 5.2.0, directories as outputs have to be explicitly marked with ``directory``. This is primarily for safety reasons; since all outputs are deleted before a job is executed, we don't want to risk deleting important directories if the user makes some mistake. Marking the output as ``directory`` makes the intent clear, and the output can be safely removed. Another reason comes down to how modification time for directories work. The modification time on a directory changes when a file or a subdirectory is added, removed or renamed. This can easily happen in not-quite-intended ways, such as when Apple macOS or MS Windows add ``.DS_Store`` or ``thumbs.db`` files to store parameters for how the directory contents should be displayed. When the ``directory`` flag is used a hidden file called ``.snakemake_timestamp`` is created in the output directory, and the modification time of that file is used when determining whether the rule output is up to date or if it needs to be rerun. Always consider if you can't formulate your workflow using normal files before resorting to using ``directory()``.
Sometimes it can be convenient to have directories, rather than files, as outputs of a rule.
As of version 5.2.0, directories as outputs have to be explicitly marked with ``directory``.
This is primarily for safety reasons; since all outputs are deleted before a job is executed, we don't want to risk deleting important directories if the user makes some mistake.
Marking the output as ``directory`` makes the intent clear, and the output can be safely removed.
Another reason comes down to how modification time for directories work.
The modification time on a directory changes when a file or a subdirectory is added, removed or renamed.
This can easily happen in not-quite-intended ways, such as when Apple macOS or MS Windows add ``.DS_Store`` or ``thumbs.db`` files to store parameters for how the directory contents should be displayed.
When the ``directory`` flag is used a hidden file called ``.snakemake_timestamp`` is created in the output directory, and the modification time of that file is used when determining whether the rule output is up to date or if it needs to be rerun.
Always consider if you can't formulate your workflow using normal files before resorting to using ``directory()``.

.. code-block:: python
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorial/advanced.rst
Expand Up @@ -176,7 +176,7 @@ Input functions are evaluated once the wildcard values of a job are determined.
Exercise
........

* In the ``data/samples`` folder, there is an additional sample ``C.fastq``. Add that sample to the config file and see how Snakemake wants to recompute the part of the workflow belonging to the new sample, when invoking with ``snakemake -n --reason --forcerun bcftools_call``.
* In the ``data/samples`` folder, there is an additional sample ``C.fastq``. Add that sample to the config file and see how Snakemake wants to recompute the part of the workflow belonging to the new sample, when invoking with ``snakemake -n --forcerun bcftools_call``.

Step 4: Rule parameters
:::::::::::::::::::::::
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorial/basics.rst
Expand Up @@ -492,7 +492,7 @@ Exercise
* Create the DAG of jobs for the complete workflow.
* Execute the complete workflow and have a look at the resulting ``plots/quals.svg``.
* Snakemake provides handy flags for forcing re-execution of parts of the workflow. Have a look at the command line help with ``snakemake --help`` and search for the flag ``--forcerun``. Then, use this flag to re-execute the rule ``samtools_sort`` and see what happens.
* With ``--reason`` it is possible to display the execution reason for each job. Try this flag together with a dry-run and the ``--forcerun`` flag to understand the decisions of Snakemake.
* Snakemake displays the reason for each job (under ``reason:``). Perform a dry-run that forces some rules to be reexecuted (using the ``--forcerun`` flag in combination with some rulename) to understand the decisions of Snakemake.

Summary
:::::::
Expand Down
22 changes: 19 additions & 3 deletions snakemake/__init__.py
Expand Up @@ -37,6 +37,8 @@
"workflow/snakefile",
]

RERUN_TRIGGERS = ["mtime", "params", "input", "software-env", "code"]


def snakemake(
snakefile,
Expand Down Expand Up @@ -72,7 +74,7 @@ def snakemake(
omit_from=[],
prioritytargets=[],
stats=None,
printreason=False,
printreason=True,
printshellcmds=False,
debug_dag=False,
printdag=False,
Expand All @@ -82,6 +84,7 @@ def snakemake(
nocolor=False,
quiet=False,
keepgoing=False,
rerun_triggers=RERUN_TRIGGERS,
cluster=None,
cluster_config=None,
cluster_sync=None,
Expand Down Expand Up @@ -552,6 +555,7 @@ def snakemake(

workflow = Workflow(
snakefile=snakefile,
rerun_triggers=rerun_triggers,
jobscript=jobscript,
overwrite_shellcmd=overwrite_shellcmd,
overwrite_config=overwrite_config,
Expand Down Expand Up @@ -1304,6 +1308,17 @@ def get_argument_parser(profile=None):
action="store_true",
help="Go on with independent jobs if a job fails.",
)
group_exec.add_argument(
"--rerun-triggers",
nargs="+",
choices=RERUN_TRIGGERS,
default=RERUN_TRIGGERS,
help="Define what triggers the rerunning of a job. By default, "
"all triggers are used, which guarantees that results are "
"consistent with the workflow code and configuration. If you "
"rather prefer the traditional way of just considering "
"file modification dates, use '--rerun-trigger mtime'.",
)
group_exec.add_argument(
"--force",
"-f",
Expand Down Expand Up @@ -1767,7 +1782,7 @@ def get_argument_parser(profile=None):
"--reason",
"-r",
action="store_true",
help="Print the reason for each executed rule.",
help="Print the reason for each executed rule (deprecated, always true now).",
)
group_output.add_argument(
"--gui",
Expand Down Expand Up @@ -2837,7 +2852,7 @@ def open_browser():
targets=args.target,
dryrun=args.dryrun,
printshellcmds=args.printshellcmds,
printreason=args.reason,
printreason=True, # always display reason
debug_dag=args.debug_dag,
printdag=args.dag,
printrulegraph=args.rulegraph,
Expand All @@ -2854,6 +2869,7 @@ def open_browser():
nocolor=args.nocolor,
quiet=args.quiet,
keepgoing=args.keep_going,
rerun_triggers=args.rerun_triggers,
cluster=args.cluster,
cluster_config=args.cluster_config,
cluster_sync=args.cluster_sync,
Expand Down
69 changes: 45 additions & 24 deletions snakemake/dag.py
Expand Up @@ -1082,6 +1082,7 @@ def update_needrun(job):
reason.missing_output.update(job.missing_output(files))
if not reason:
output_mintime_ = output_mintime.get(job)
updated_input = None
if output_mintime_:
# Input is updated if it is newer that the oldest output file
# and does not have the same checksum as the one previously recorded.
Expand All @@ -1093,8 +1094,28 @@ def update_needrun(job):
and not is_same_checksum(f, job)
]
reason.updated_input.update(updated_input)
if not updated_input:
# check for other changes like parameters, set of input files, or code
if "params" in self.workflow.rerun_triggers:
reason.params_changed = any(
self.workflow.persistence.params_changed(job)
)
if "input" in self.workflow.rerun_triggers:
reason.input_changed = any(
self.workflow.persistence.input_changed(job)
)
if "code" in self.workflow.rerun_triggers:
reason.code_changed = any(
job.outputs_older_than_script_or_notebook()
) or any(self.workflow.persistence.code_changed(job))
if "software-env" in self.workflow.rerun_triggers:
reason.software_stack_changed = any(
self.workflow.persistence.conda_env_changed(job)
) or any(self.workflow.persistence.container_changed(job))

if noinitreason and reason:
reason.derived = False
return reason

reason = self.reason
_needrun = self._needrun
Expand All @@ -1105,23 +1126,39 @@ def update_needrun(job):
_needrun.clear()
_n_until_ready.clear()
self._ready_jobs.clear()
candidates = list(self.jobs)

candidates = list(self.toposorted())

# Update the output mintime of all jobs.
# We traverse them in BFS (level order) starting from target jobs.
# Then, we check output mintime of job itself and all direct descendants,
# which have already been visited in the level before.
# This way, we achieve a linear runtime.
for job in candidates:
update_output_mintime(job)
for level in reversed(candidates):
for job in level:
update_output_mintime(job)

# Update prior reason for all candidate jobs
# Move from the first level to the last of the toposorted candidates.
# If a job is needrun, mask all downstream jobs, they will below
# in the bi-directional BFS
# be determined as needrun because they depend on them.
masked = set()
queue = deque()
for level in candidates:
for job in level:
if job in masked:
# depending jobs of jobs that are needrun as a prior
# can be skipped
continue

# update prior reason for all candidate jobs
for job in candidates:
update_needrun(job)
if update_needrun(job):
queue.append(job)
masked.update(self.bfs(self.depending, job))

queue = deque(filter(reason, candidates))
# bi-directional BFS to determine further needrun jobs
visited = set(queue)
candidates_set = set(candidates)
candidates_set = set(job for level in candidates for job in level)
while queue:
job = queue.popleft()
_needrun.add(job)
Expand Down Expand Up @@ -2372,22 +2409,6 @@ def get_outputs_with_changes(self, change_type, include_needrun=True):
changed.extend(list(job.outputs_older_than_script_or_notebook()))
return changed

def warn_about_changes(self, quiet=False):
if not quiet:
for change_type in ["code", "input", "params"]:
changed = self.get_outputs_with_changes(
change_type, include_needrun=False
)
if changed:
rerun_trigger = ""
if not ON_WINDOWS:
rerun_trigger = f"\n To trigger a re-run, use 'snakemake -R $(snakemake --list-{change_type}-changes)'."
logger.warning(
f"The {change_type} used to generate one or several output files has changed:\n"
f" To inspect which output files have changes, run 'snakemake --list-{change_type}-changes'."
f"{rerun_trigger}"
)

def __str__(self):
return self.dot()

Expand Down
39 changes: 39 additions & 0 deletions snakemake/jobs.py
Expand Up @@ -1553,6 +1553,10 @@ class Reason:
"_updated_input_run",
"_missing_output",
"_incomplete_output",
"input_changed",
"code_changed",
"params_changed",
"software_stack_changed",
"forced",
"noio",
"nooutput",
Expand All @@ -1561,6 +1565,7 @@ class Reason:
"service",
"target",
"finished",
"cleanup_metadata_instructions",
]

def __init__(self):
Expand All @@ -1569,12 +1574,32 @@ def __init__(self):
self._updated_input_run = None
self._missing_output = None
self._incomplete_output = None
self.params_changed = False
self.code_changed = False
self.software_stack_changed = False
self.input_changed = False
self.forced = False
self.noio = False
self.nooutput = False
self.derived = True
self.pipe = False
self.service = False
self.cleanup_metadata_instructions = None

def set_cleanup_metadata_instructions(self, job):
self.cleanup_metadata_instructions = (
"To ignore these changes, run snakemake "
f"--cleanup-metadata {' '.join(job.expanded_output)}"
)

def is_provenance_triggered(self):
"""Return True if reason is triggered by provenance information."""
return (
self.params_changed
or self.code_changed
or self.software_stack_changed
or self.input_changed
)

@lazy_property
def updated_input(self):
Expand Down Expand Up @@ -1643,6 +1668,16 @@ def format_files(files):
s.append(
"Job provides a service which has to be kept active until all consumers are finished."
)

if self.input_changed:
s.append("Set of input files has changed since last execution")
if self.code_changed:
s.append("Code has changed since last execution")
if self.params_changed:
s.append("Params have changed since last execution")
if self.software_stack_changed:
s.append("Software stack has changed since last execution")

s = "; ".join(s)
if self.finished:
return f"Finished (was: {s})"
Expand All @@ -1658,5 +1693,9 @@ def __bool__(self):
or self.nooutput
or self.pipe
or self.service
or self.code_changed
or self.params_changed
or self.software_stack_changed
or self.input_changed
)
return v and not self.finished
24 changes: 24 additions & 0 deletions snakemake/persistence.py
Expand Up @@ -331,6 +331,12 @@ def params(self, path):
def code(self, path):
return self.metadata(path).get("code")

def conda_env(self, path):
return self.metadata(path).get("conda_env")

def container_img_url(self, path):
return self.metadata(path).get("container_img_url")

def input_checksums(self, job, input_path):
"""Return all checksums of the given input file
recorded for the output of the given job.
Expand All @@ -356,6 +362,14 @@ def params_changed(self, job, file=None):
"""Yields output files with changed params or bool if file given."""
return _bool_or_gen(self._params_changed, job, file=file)

def conda_env_changed(self, job, file=None):
"""Yields output files with changed conda env or bool if file given."""
return _bool_or_gen(self._conda_env_changed, job, file=file)

def container_changed(self, job, file=None):
"""Yields output files with changed container img or bool if file given."""
return _bool_or_gen(self._container_changed, job, file=file)

def _version_changed(self, job, file=None):
assert file is not None
recorded = self.version(file)
Expand All @@ -376,6 +390,16 @@ def _params_changed(self, job, file=None):
recorded = self.params(file)
return recorded is not None and recorded != self._params(job)

def _conda_env_changed(self, job, file=None):
assert file is not None
recorded = self.conda_env(file)
return recorded is not None and recorded != self._conda_env(job)

def _container_changed(self, job, file=None):
assert file is not None
recorded = self.container_img_url(file)
return recorded is not None and recorded != job.container_img_url

def noop(self, *args):
pass

Expand Down

0 comments on commit 4c11893

Please sign in to comment.