From 4c11893d2fda5824adff44d16d7741484e63efea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Tue, 24 May 2022 10:14:23 +0200 Subject: [PATCH] feat: automatically rerun jobs if parameters, code, input file set, or software stack changed (thanks to @cclienty and @timtroendle). This also increases performance of DAG building by handling job "needrun" updates level wise, while avoiding to perform a full check for those jobs that are already downstream of a job that has been determined to require a rerun. (#1663) * feat: automatically rerun jobs if parameters, code, input file set, or software stack changed * fix script modification date check * add command line parameters for setting rerun triggers * fix * fix msg * always print reason * help message * remove check for non mtime changes since that is now handled by --rerun-triggers * speedup of needrun determination * remove traces of old modification time only rerun triggering * docs --- docs/project_info/faq.rst | 20 ++++------- docs/snakefiles/rules.rst | 10 +++++- docs/tutorial/advanced.rst | 2 +- docs/tutorial/basics.rst | 2 +- snakemake/__init__.py | 22 ++++++++++-- snakemake/dag.py | 69 +++++++++++++++++++++++++------------- snakemake/jobs.py | 39 +++++++++++++++++++++ snakemake/persistence.py | 24 +++++++++++++ snakemake/workflow.py | 19 ++++++++--- 9 files changed, 159 insertions(+), 48 deletions(-) diff --git a/docs/project_info/faq.rst b/docs/project_info/faq.rst index 1e701bffd..ba0832602 100644 --- a/docs/project_info/faq.rst +++ b/docs/project_info/faq.rst @@ -473,17 +473,6 @@ Per default, Snakemake will lock a working directory by output and input files. With the command line option ``--nolock``, you can disable this mechanism on your own risk. With ``--unlock``, you can be remove a stale lock. Stale locks can appear if your machine is powered off with a running Snakemake instance. -Snakemake does not trigger re-runs if I add additional input files. What can I do? ----------------------------------------------------------------------------------- - -Snakemake has a kind of "lazy" policy about added input files if their modification date is older than that of the output files. One reason is that information cannot be inferred just from the input and output files. You need additional information about the last run to be stored. Since behaviour would be inconsistent between cases where that information is available and where it is not, this functionality has been encoded as an extra switch. To trigger updates for jobs with changed input files, you can use the command line argument ``--list-input-changes`` in the following way: - -.. code-block:: console - - $ snakemake -n -R `snakemake --list-input-changes` - -Here, ``snakemake --list-input-changes`` returns the list of output files with changed input files, which is fed into ``-R`` to trigger a re-run. - How do I trigger re-runs for rules with updated code or parameters? ------------------------------------------------------------------- @@ -541,7 +530,11 @@ If you are just interested in the final summary, you can use the ``--quiet`` fla Git is messing up the modification times of my input files, what can I do? -------------------------------------------------------------------------- -When you checkout a git repository, the modification times of updated files are set to the time of the checkout. If you rely on these files as input **and** output files in your workflow, this can cause trouble. For example, Snakemake could think that a certain (git-tracked) output has to be re-executed, just because its input has been checked out a bit later. In such cases, it is advisable to set the file modification dates to the last commit date after an update has been pulled. One solution is to add the following lines to your ``.bashrc`` (or similar): +When you checkout a git repository, the modification times of updated files are set to the time of the checkout. +If you rely on these files as input **and** output files in your workflow, this can cause trouble. +For example, Snakemake could think that a certain (git-tracked) output has to be re-executed, just because its input has been checked out a bit later. +In such cases, it is advisable to set the file modification dates to the last commit date after an update has been pulled. +One solution is to add the following lines to your ``.bashrc`` (or similar): .. code-block:: bash @@ -557,7 +550,8 @@ When you checkout a git repository, the modification times of updated files are done } -(inspired by the answer `here `_). You can then run ``gitmodtimes`` to update the modification times of all tracked files on the current branch to their last commit time in git; BE CAREFUL--this does not account for local changes that have not been commited. +(inspired by the answer `here `_). +You can then run ``gitmodtimes`` to update the modification times of all tracked files on the current branch to their last commit time in git; BE CAREFUL--this does not account for local changes that have not been commited. How do I exit a running Snakemake workflow? ------------------------------------------- diff --git a/docs/snakefiles/rules.rst b/docs/snakefiles/rules.rst index 4911cf3a4..62de60c28 100644 --- a/docs/snakefiles/rules.rst +++ b/docs/snakefiles/rules.rst @@ -1004,7 +1004,15 @@ Further, an output file marked as ``temp`` is deleted after all rules that use i Directories as outputs ---------------------- -Sometimes it can be convenient to have directories, rather than files, as outputs of a rule. As of version 5.2.0, directories as outputs have to be explicitly marked with ``directory``. This is primarily for safety reasons; since all outputs are deleted before a job is executed, we don't want to risk deleting important directories if the user makes some mistake. Marking the output as ``directory`` makes the intent clear, and the output can be safely removed. Another reason comes down to how modification time for directories work. The modification time on a directory changes when a file or a subdirectory is added, removed or renamed. This can easily happen in not-quite-intended ways, such as when Apple macOS or MS Windows add ``.DS_Store`` or ``thumbs.db`` files to store parameters for how the directory contents should be displayed. When the ``directory`` flag is used a hidden file called ``.snakemake_timestamp`` is created in the output directory, and the modification time of that file is used when determining whether the rule output is up to date or if it needs to be rerun. Always consider if you can't formulate your workflow using normal files before resorting to using ``directory()``. +Sometimes it can be convenient to have directories, rather than files, as outputs of a rule. +As of version 5.2.0, directories as outputs have to be explicitly marked with ``directory``. +This is primarily for safety reasons; since all outputs are deleted before a job is executed, we don't want to risk deleting important directories if the user makes some mistake. +Marking the output as ``directory`` makes the intent clear, and the output can be safely removed. +Another reason comes down to how modification time for directories work. +The modification time on a directory changes when a file or a subdirectory is added, removed or renamed. +This can easily happen in not-quite-intended ways, such as when Apple macOS or MS Windows add ``.DS_Store`` or ``thumbs.db`` files to store parameters for how the directory contents should be displayed. +When the ``directory`` flag is used a hidden file called ``.snakemake_timestamp`` is created in the output directory, and the modification time of that file is used when determining whether the rule output is up to date or if it needs to be rerun. +Always consider if you can't formulate your workflow using normal files before resorting to using ``directory()``. .. code-block:: python diff --git a/docs/tutorial/advanced.rst b/docs/tutorial/advanced.rst index d3bf0982b..8fc34ab22 100644 --- a/docs/tutorial/advanced.rst +++ b/docs/tutorial/advanced.rst @@ -176,7 +176,7 @@ Input functions are evaluated once the wildcard values of a job are determined. Exercise ........ -* In the ``data/samples`` folder, there is an additional sample ``C.fastq``. Add that sample to the config file and see how Snakemake wants to recompute the part of the workflow belonging to the new sample, when invoking with ``snakemake -n --reason --forcerun bcftools_call``. +* In the ``data/samples`` folder, there is an additional sample ``C.fastq``. Add that sample to the config file and see how Snakemake wants to recompute the part of the workflow belonging to the new sample, when invoking with ``snakemake -n --forcerun bcftools_call``. Step 4: Rule parameters ::::::::::::::::::::::: diff --git a/docs/tutorial/basics.rst b/docs/tutorial/basics.rst index fc3e9d518..5b29068be 100644 --- a/docs/tutorial/basics.rst +++ b/docs/tutorial/basics.rst @@ -492,7 +492,7 @@ Exercise * Create the DAG of jobs for the complete workflow. * Execute the complete workflow and have a look at the resulting ``plots/quals.svg``. * Snakemake provides handy flags for forcing re-execution of parts of the workflow. Have a look at the command line help with ``snakemake --help`` and search for the flag ``--forcerun``. Then, use this flag to re-execute the rule ``samtools_sort`` and see what happens. -* With ``--reason`` it is possible to display the execution reason for each job. Try this flag together with a dry-run and the ``--forcerun`` flag to understand the decisions of Snakemake. +* Snakemake displays the reason for each job (under ``reason:``). Perform a dry-run that forces some rules to be reexecuted (using the ``--forcerun`` flag in combination with some rulename) to understand the decisions of Snakemake. Summary ::::::: diff --git a/snakemake/__init__.py b/snakemake/__init__.py index 879da6e45..b4d05d358 100644 --- a/snakemake/__init__.py +++ b/snakemake/__init__.py @@ -37,6 +37,8 @@ "workflow/snakefile", ] +RERUN_TRIGGERS = ["mtime", "params", "input", "software-env", "code"] + def snakemake( snakefile, @@ -72,7 +74,7 @@ def snakemake( omit_from=[], prioritytargets=[], stats=None, - printreason=False, + printreason=True, printshellcmds=False, debug_dag=False, printdag=False, @@ -82,6 +84,7 @@ def snakemake( nocolor=False, quiet=False, keepgoing=False, + rerun_triggers=RERUN_TRIGGERS, cluster=None, cluster_config=None, cluster_sync=None, @@ -552,6 +555,7 @@ def snakemake( workflow = Workflow( snakefile=snakefile, + rerun_triggers=rerun_triggers, jobscript=jobscript, overwrite_shellcmd=overwrite_shellcmd, overwrite_config=overwrite_config, @@ -1304,6 +1308,17 @@ def get_argument_parser(profile=None): action="store_true", help="Go on with independent jobs if a job fails.", ) + group_exec.add_argument( + "--rerun-triggers", + nargs="+", + choices=RERUN_TRIGGERS, + default=RERUN_TRIGGERS, + help="Define what triggers the rerunning of a job. By default, " + "all triggers are used, which guarantees that results are " + "consistent with the workflow code and configuration. If you " + "rather prefer the traditional way of just considering " + "file modification dates, use '--rerun-trigger mtime'.", + ) group_exec.add_argument( "--force", "-f", @@ -1767,7 +1782,7 @@ def get_argument_parser(profile=None): "--reason", "-r", action="store_true", - help="Print the reason for each executed rule.", + help="Print the reason for each executed rule (deprecated, always true now).", ) group_output.add_argument( "--gui", @@ -2837,7 +2852,7 @@ def open_browser(): targets=args.target, dryrun=args.dryrun, printshellcmds=args.printshellcmds, - printreason=args.reason, + printreason=True, # always display reason debug_dag=args.debug_dag, printdag=args.dag, printrulegraph=args.rulegraph, @@ -2854,6 +2869,7 @@ def open_browser(): nocolor=args.nocolor, quiet=args.quiet, keepgoing=args.keep_going, + rerun_triggers=args.rerun_triggers, cluster=args.cluster, cluster_config=args.cluster_config, cluster_sync=args.cluster_sync, diff --git a/snakemake/dag.py b/snakemake/dag.py index 69ceb0273..03e300ea6 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -1082,6 +1082,7 @@ def update_needrun(job): reason.missing_output.update(job.missing_output(files)) if not reason: output_mintime_ = output_mintime.get(job) + updated_input = None if output_mintime_: # Input is updated if it is newer that the oldest output file # and does not have the same checksum as the one previously recorded. @@ -1093,8 +1094,28 @@ def update_needrun(job): and not is_same_checksum(f, job) ] reason.updated_input.update(updated_input) + if not updated_input: + # check for other changes like parameters, set of input files, or code + if "params" in self.workflow.rerun_triggers: + reason.params_changed = any( + self.workflow.persistence.params_changed(job) + ) + if "input" in self.workflow.rerun_triggers: + reason.input_changed = any( + self.workflow.persistence.input_changed(job) + ) + if "code" in self.workflow.rerun_triggers: + reason.code_changed = any( + job.outputs_older_than_script_or_notebook() + ) or any(self.workflow.persistence.code_changed(job)) + if "software-env" in self.workflow.rerun_triggers: + reason.software_stack_changed = any( + self.workflow.persistence.conda_env_changed(job) + ) or any(self.workflow.persistence.container_changed(job)) + if noinitreason and reason: reason.derived = False + return reason reason = self.reason _needrun = self._needrun @@ -1105,23 +1126,39 @@ def update_needrun(job): _needrun.clear() _n_until_ready.clear() self._ready_jobs.clear() - candidates = list(self.jobs) + + candidates = list(self.toposorted()) # Update the output mintime of all jobs. # We traverse them in BFS (level order) starting from target jobs. # Then, we check output mintime of job itself and all direct descendants, # which have already been visited in the level before. # This way, we achieve a linear runtime. - for job in candidates: - update_output_mintime(job) + for level in reversed(candidates): + for job in level: + update_output_mintime(job) + + # Update prior reason for all candidate jobs + # Move from the first level to the last of the toposorted candidates. + # If a job is needrun, mask all downstream jobs, they will below + # in the bi-directional BFS + # be determined as needrun because they depend on them. + masked = set() + queue = deque() + for level in candidates: + for job in level: + if job in masked: + # depending jobs of jobs that are needrun as a prior + # can be skipped + continue - # update prior reason for all candidate jobs - for job in candidates: - update_needrun(job) + if update_needrun(job): + queue.append(job) + masked.update(self.bfs(self.depending, job)) - queue = deque(filter(reason, candidates)) + # bi-directional BFS to determine further needrun jobs visited = set(queue) - candidates_set = set(candidates) + candidates_set = set(job for level in candidates for job in level) while queue: job = queue.popleft() _needrun.add(job) @@ -2372,22 +2409,6 @@ def get_outputs_with_changes(self, change_type, include_needrun=True): changed.extend(list(job.outputs_older_than_script_or_notebook())) return changed - def warn_about_changes(self, quiet=False): - if not quiet: - for change_type in ["code", "input", "params"]: - changed = self.get_outputs_with_changes( - change_type, include_needrun=False - ) - if changed: - rerun_trigger = "" - if not ON_WINDOWS: - rerun_trigger = f"\n To trigger a re-run, use 'snakemake -R $(snakemake --list-{change_type}-changes)'." - logger.warning( - f"The {change_type} used to generate one or several output files has changed:\n" - f" To inspect which output files have changes, run 'snakemake --list-{change_type}-changes'." - f"{rerun_trigger}" - ) - def __str__(self): return self.dot() diff --git a/snakemake/jobs.py b/snakemake/jobs.py index d8699e2af..d6e5a11d6 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -1553,6 +1553,10 @@ class Reason: "_updated_input_run", "_missing_output", "_incomplete_output", + "input_changed", + "code_changed", + "params_changed", + "software_stack_changed", "forced", "noio", "nooutput", @@ -1561,6 +1565,7 @@ class Reason: "service", "target", "finished", + "cleanup_metadata_instructions", ] def __init__(self): @@ -1569,12 +1574,32 @@ def __init__(self): self._updated_input_run = None self._missing_output = None self._incomplete_output = None + self.params_changed = False + self.code_changed = False + self.software_stack_changed = False + self.input_changed = False self.forced = False self.noio = False self.nooutput = False self.derived = True self.pipe = False self.service = False + self.cleanup_metadata_instructions = None + + def set_cleanup_metadata_instructions(self, job): + self.cleanup_metadata_instructions = ( + "To ignore these changes, run snakemake " + f"--cleanup-metadata {' '.join(job.expanded_output)}" + ) + + def is_provenance_triggered(self): + """Return True if reason is triggered by provenance information.""" + return ( + self.params_changed + or self.code_changed + or self.software_stack_changed + or self.input_changed + ) @lazy_property def updated_input(self): @@ -1643,6 +1668,16 @@ def format_files(files): s.append( "Job provides a service which has to be kept active until all consumers are finished." ) + + if self.input_changed: + s.append("Set of input files has changed since last execution") + if self.code_changed: + s.append("Code has changed since last execution") + if self.params_changed: + s.append("Params have changed since last execution") + if self.software_stack_changed: + s.append("Software stack has changed since last execution") + s = "; ".join(s) if self.finished: return f"Finished (was: {s})" @@ -1658,5 +1693,9 @@ def __bool__(self): or self.nooutput or self.pipe or self.service + or self.code_changed + or self.params_changed + or self.software_stack_changed + or self.input_changed ) return v and not self.finished diff --git a/snakemake/persistence.py b/snakemake/persistence.py index 216f52ec0..31c0c7147 100755 --- a/snakemake/persistence.py +++ b/snakemake/persistence.py @@ -331,6 +331,12 @@ def params(self, path): def code(self, path): return self.metadata(path).get("code") + def conda_env(self, path): + return self.metadata(path).get("conda_env") + + def container_img_url(self, path): + return self.metadata(path).get("container_img_url") + def input_checksums(self, job, input_path): """Return all checksums of the given input file recorded for the output of the given job. @@ -356,6 +362,14 @@ def params_changed(self, job, file=None): """Yields output files with changed params or bool if file given.""" return _bool_or_gen(self._params_changed, job, file=file) + def conda_env_changed(self, job, file=None): + """Yields output files with changed conda env or bool if file given.""" + return _bool_or_gen(self._conda_env_changed, job, file=file) + + def container_changed(self, job, file=None): + """Yields output files with changed container img or bool if file given.""" + return _bool_or_gen(self._container_changed, job, file=file) + def _version_changed(self, job, file=None): assert file is not None recorded = self.version(file) @@ -376,6 +390,16 @@ def _params_changed(self, job, file=None): recorded = self.params(file) return recorded is not None and recorded != self._params(job) + def _conda_env_changed(self, job, file=None): + assert file is not None + recorded = self.conda_env(file) + return recorded is not None and recorded != self._conda_env(job) + + def _container_changed(self, job, file=None): + assert file is not None + recorded = self.container_img_url(file) + return recorded is not None and recorded != job.container_img_url + def noop(self, *args): pass diff --git a/snakemake/workflow.py b/snakemake/workflow.py index ef9f515e6..b36ddce32 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -101,6 +101,7 @@ class Workflow: def __init__( self, snakefile=None, + rerun_triggers=None, jobscript=None, overwrite_shellcmd=None, overwrite_config=None, @@ -162,6 +163,7 @@ def __init__( self.global_resources["_cores"] = cores self.global_resources["_nodes"] = nodes + self.rerun_triggers = frozenset(rerun_triggers) self._rules = OrderedDict() self.default_target = None self._workdir = None @@ -1038,7 +1040,6 @@ def files(items): ) if not dryrun: - dag.warn_about_changes(quiet) if len(dag): shell_exec = shell.get_executable() if shell_exec is not None: @@ -1084,7 +1085,6 @@ def files(items): logger.info(NOTHING_TO_BE_DONE_MSG) else: # the dryrun case - dag.warn_about_changes(quiet) if len(dag): logger.run_info("\n".join(dag.stats())) else: @@ -1106,16 +1106,26 @@ def files(items): if dryrun: if len(dag): logger.run_info("\n".join(dag.stats())) + if any( + dag.reason(job).is_provencant_triggered + for job in dag.needrun_jobs + ): + logger.info( + "Some jobs were triggered by provenance information, " + "see 'reason' section in the rule displays above. " + "If you prefer that only modification time is used to " + "determine whether a job shall be executed, use the command " + "line option '--rerun-triggers mtime' (also see --help)." + ) + logger.info("") logger.info( "This was a dry-run (flag -n). The order of jobs " "does not reflect the order of execution." ) - dag.warn_about_changes(quiet) logger.remove_logfile() else: if stats: self.scheduler.stats.to_json(stats) - dag.warn_about_changes(quiet) logger.logfile_hint() if not dryrun and not no_hooks: self._onsuccess(logger.get_logfile()) @@ -1123,7 +1133,6 @@ def files(items): else: if not dryrun and not no_hooks: self._onerror(logger.get_logfile()) - dag.warn_about_changes(quiet) logger.logfile_hint() return False