Skip to content

Commit

Permalink
fix: warn on non-file-modification-date changes like params, code, or…
Browse files Browse the repository at this point in the history
… input files (#1419)

* fix: warn on non-file modification date changes (params, code, input files)

* handle windows

* log formatting

* Handle missing notebooks

* fixes
  • Loading branch information
johanneskoester committed Feb 23, 2022
1 parent 5d45493 commit b5f53f0
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
28 changes: 27 additions & 1 deletion snakemake/dag.py
Expand Up @@ -27,7 +27,7 @@
from snakemake.exceptions import RemoteFileException, WorkflowError, ChildIOException
from snakemake.exceptions import InputFunctionException
from snakemake.logging import logger
from snakemake.common import DYNAMIC_FILL, group_into_chunks
from snakemake.common import DYNAMIC_FILL, ON_WINDOWS, group_into_chunks
from snakemake.deployment import conda, singularity
from snakemake.output_index import OutputIndex
from snakemake import workflow
Expand Down Expand Up @@ -2234,6 +2234,32 @@ def stats(self):
yield tabulate(rows, headers="keys")
yield ""

def get_outputs_with_changes(self, change_type):
is_changed = lambda job: (
getattr(self.workflow.persistence, f"{change_type}_changed")(job)
if not job.is_group()
else []
)
changed = list(chain(*map(is_changed, self.jobs)))
if change_type == "code":
for job in self.jobs:
if not job.is_group():
changed.extend(list(job.outputs_older_than_script_or_notebook()))
return changed

def warn_about_changes(self):
for change_type in ["code", "input", "params"]:
changed = self.get_outputs_with_changes(change_type)
if changed:
rerun_trigger = ""
if not ON_WINDOWS:
rerun_trigger = f"\n To trigger a re-run, use 'snakemake -R $(snakemake --list-{change_type}-changes)'."
logger.warning(
f"The {change_type} used to generate one or several output files has changed:\n"
f" To inspect which output files have changes, run 'snakemake --list-{change_type}-changes'."
f"{rerun_trigger}"
)

def __str__(self):
return self.dot()

Expand Down
3 changes: 1 addition & 2 deletions snakemake/jobs.py
Expand Up @@ -269,8 +269,7 @@ def outputs_older_than_script_or_notebook(self):
if self.rule.basedir:
# needed if rule is included from another subdirectory
path = self.rule.basedir.join(path).get_path_or_uri()
if is_local_file(path):
assert os.path.exists(path), "cannot find {0}".format(path)
if is_local_file(path) and os.path.exists(path):
script_mtime = os.lstat(path).st_mtime
for f in self.expanded_output:
if f.exists:
Expand Down
4 changes: 2 additions & 2 deletions snakemake/logging.py
Expand Up @@ -30,7 +30,7 @@ class ColorizingStreamHandler(_logging.StreamHandler):
"WARNING": YELLOW,
"INFO": GREEN,
"DEBUG": BLUE,
"CRITICAL": RED,
"CRITICAL": MAGENTA,
"ERROR": RED,
}

Expand Down Expand Up @@ -542,7 +542,7 @@ def job_error():
if level == "info" and not self.quiet:
self.logger.warning(msg["msg"])
if level == "warning":
self.logger.warning(msg["msg"])
self.logger.critical(msg["msg"])
elif level == "error":
self.logger.error(msg["msg"])
elif level == "debug":
Expand Down
23 changes: 13 additions & 10 deletions snakemake/workflow.py
Expand Up @@ -928,24 +928,22 @@ def files(items):
dag.clean(only_temp=True, dryrun=dryrun)
return True
elif list_version_changes:
items = list(chain(*map(self.persistence.version_changed, dag.jobs)))
items = dag.get_outputs_with_changes("version")
if items:
print(*items, sep="\n")
return True
elif list_code_changes:
items = list(chain(*map(self.persistence.code_changed, dag.jobs)))
for j in dag.jobs:
items.extend(list(j.outputs_older_than_script_or_notebook()))
items = dag.get_outputs_with_changes("code")
if items:
print(*items, sep="\n")
return True
elif list_input_changes:
items = list(chain(*map(self.persistence.input_changed, dag.jobs)))
items = dag.get_outputs_with_changes("input")
if items:
print(*items, sep="\n")
return True
elif list_params_changes:
items = list(chain(*map(self.persistence.params_changed, dag.jobs)))
items = dag.get_outputs_with_changes("params")
if items:
print(*items, sep="\n")
return True
Expand Down Expand Up @@ -1029,6 +1027,7 @@ def files(items):
)

if not dryrun:
dag.warn_about_changes()
if len(dag):
shell_exec = shell.get_executable()
if shell_exec is not None:
Expand Down Expand Up @@ -1074,6 +1073,7 @@ def files(items):
logger.info(NOTHING_TO_BE_DONE_MSG)
else:
# the dryrun case
dag.warn_about_changes()
if len(dag):
logger.run_info("\n".join(dag.stats()))
else:
Expand All @@ -1095,21 +1095,24 @@ def files(items):
if dryrun:
if len(dag):
logger.run_info("\n".join(dag.stats()))
logger.info(
"This was a dry-run (flag -n). The order of jobs "
"does not reflect the order of execution."
)
logger.info(
"This was a dry-run (flag -n). The order of jobs "
"does not reflect the order of execution."
)
dag.warn_about_changes()
logger.remove_logfile()
else:
if stats:
self.scheduler.stats.to_json(stats)
dag.warn_about_changes()
logger.logfile_hint()
if not dryrun and not no_hooks:
self._onsuccess(logger.get_logfile())
return True
else:
if not dryrun and not no_hooks:
self._onerror(logger.get_logfile())
dag.warn_about_changes()
logger.logfile_hint()
return False

Expand Down

0 comments on commit b5f53f0

Please sign in to comment.