Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: warn on non-file-modification-date changes like params, code, or input files #1419

Merged
merged 5 commits into from Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 27 additions & 1 deletion snakemake/dag.py
Expand Up @@ -27,7 +27,7 @@
from snakemake.exceptions import RemoteFileException, WorkflowError, ChildIOException
from snakemake.exceptions import InputFunctionException
from snakemake.logging import logger
from snakemake.common import DYNAMIC_FILL, group_into_chunks
from snakemake.common import DYNAMIC_FILL, ON_WINDOWS, group_into_chunks
from snakemake.deployment import conda, singularity
from snakemake.output_index import OutputIndex
from snakemake import workflow
Expand Down Expand Up @@ -2199,6 +2199,32 @@ def stats(self):
yield tabulate(rows, headers="keys")
yield ""

def get_outputs_with_changes(self, change_type):
is_changed = lambda job: (
getattr(self.workflow.persistence, f"{change_type}_changed")(job)
if not job.is_group()
else []
)
changed = list(chain(*map(is_changed, self.jobs)))
if change_type == "code":
for job in self.jobs:
if not job.is_group():
changed.extend(list(job.outputs_older_than_script_or_notebook()))
return changed

def warn_about_changes(self):
for change_type in ["code", "input", "params"]:
changed = self.get_outputs_with_changes(change_type)
if changed:
rerun_trigger = ""
if not ON_WINDOWS:
rerun_trigger = f"\n To trigger a re-run, use 'snakemake -R $(snakemake --list-{change_type}-changes)'."
logger.warning(
f"The {change_type} used to generate one or several output files has changed:\n"
f" To inspect which output files have changes, run 'snakemake --list-{change_type}-changes'."
f"{rerun_trigger}"
)

def __str__(self):
return self.dot()

Expand Down
3 changes: 1 addition & 2 deletions snakemake/jobs.py
Expand Up @@ -246,8 +246,7 @@ def outputs_older_than_script_or_notebook(self):
if self.rule.basedir:
# needed if rule is included from another subdirectory
path = self.rule.basedir.join(path).get_path_or_uri()
if is_local_file(path):
assert os.path.exists(path), "cannot find {0}".format(path)
if is_local_file(path) and os.path.exists(path):
script_mtime = os.lstat(path).st_mtime
for f in self.expanded_output:
if f.exists:
Expand Down
4 changes: 2 additions & 2 deletions snakemake/logging.py
Expand Up @@ -30,7 +30,7 @@ class ColorizingStreamHandler(_logging.StreamHandler):
"WARNING": YELLOW,
"INFO": GREEN,
"DEBUG": BLUE,
"CRITICAL": RED,
"CRITICAL": MAGENTA,
"ERROR": RED,
}

Expand Down Expand Up @@ -542,7 +542,7 @@ def job_error():
if level == "info" and not self.quiet:
self.logger.warning(msg["msg"])
if level == "warning":
self.logger.warning(msg["msg"])
self.logger.critical(msg["msg"])
elif level == "error":
self.logger.error(msg["msg"])
elif level == "debug":
Expand Down
23 changes: 13 additions & 10 deletions snakemake/workflow.py
Expand Up @@ -926,24 +926,22 @@ def files(items):
dag.clean(only_temp=True, dryrun=dryrun)
return True
elif list_version_changes:
items = list(chain(*map(self.persistence.version_changed, dag.jobs)))
items = dag.get_outputs_with_changes("version")
if items:
print(*items, sep="\n")
return True
elif list_code_changes:
items = list(chain(*map(self.persistence.code_changed, dag.jobs)))
for j in dag.jobs:
items.extend(list(j.outputs_older_than_script_or_notebook()))
items = dag.get_outputs_with_changes("code")
if items:
print(*items, sep="\n")
return True
elif list_input_changes:
items = list(chain(*map(self.persistence.input_changed, dag.jobs)))
items = dag.get_outputs_with_changes("input")
if items:
print(*items, sep="\n")
return True
elif list_params_changes:
items = list(chain(*map(self.persistence.params_changed, dag.jobs)))
items = dag.get_outputs_with_changes("params")
if items:
print(*items, sep="\n")
return True
Expand Down Expand Up @@ -1027,6 +1025,7 @@ def files(items):
)

if not dryrun:
dag.warn_about_changes()
if len(dag):
shell_exec = shell.get_executable()
if shell_exec is not None:
Expand Down Expand Up @@ -1072,6 +1071,7 @@ def files(items):
logger.info(NOTHING_TO_BE_DONE_MSG)
else:
# the dryrun case
dag.warn_about_changes()
if len(dag):
logger.run_info("\n".join(dag.stats()))
else:
Expand All @@ -1093,21 +1093,24 @@ def files(items):
if dryrun:
if len(dag):
logger.run_info("\n".join(dag.stats()))
logger.info(
"This was a dry-run (flag -n). The order of jobs "
"does not reflect the order of execution."
)
logger.info(
"This was a dry-run (flag -n). The order of jobs "
"does not reflect the order of execution."
)
dag.warn_about_changes()
logger.remove_logfile()
else:
if stats:
self.scheduler.stats.to_json(stats)
dag.warn_about_changes()
logger.logfile_hint()
if not dryrun and not no_hooks:
self._onsuccess(logger.get_logfile())
return True
else:
if not dryrun and not no_hooks:
self._onerror(logger.get_logfile())
dag.warn_about_changes()
logger.logfile_hint()
return False

Expand Down