From 3f324091ffc97f181e14dbc1b82accc28b19663f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Tue, 5 Apr 2022 16:06:30 +0200 Subject: [PATCH] refactor: cleaner caching persistence detection in sourcecache module (#1567) * fix: properly use retry mechanism in source cache * fix: persistence detection robustness * flag sourcecache entries obtained by source_path * show cached path or uri * improved reason printing * fix: properly infer source file persistence information when caching via source_file * fmt * cleanup --- snakemake/io.py | 4 ++++ snakemake/jobs.py | 28 +++++++++++++++++----------- snakemake/sourcecache.py | 5 ++--- snakemake/workflow.py | 19 +++++++++++++++---- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/snakemake/io.py b/snakemake/io.py index 77699c3a2..7c53a9a45 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -1082,6 +1082,10 @@ def checkpoint_target(value): return flag(value, "checkpoint_target") +def sourcecache_entry(value, orig_path_or_uri): + return flag(value, "sourcecache_entry", orig_path_or_uri) + + ReportObject = collections.namedtuple( "ReportObject", ["caption", "category", "subcategory", "labels", "patterns", "htmlindex"], diff --git a/snakemake/jobs.py b/snakemake/jobs.py index e8d202403..d8699e2af 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -50,6 +50,9 @@ def format_files(job, io, dynamicio): yield f"{f} (service)" elif is_flagged(f, "checkpoint_target"): yield TBDString() + elif is_flagged(f, "sourcecache_entry"): + orig_path_or_uri = get_flag_value(f, "sourcecache_entry") + yield f"{orig_path_or_uri} (cached)" else: yield f @@ -1594,6 +1597,15 @@ def mark_finished(self): self.finished = True def __str__(self): + def format_file(f): + if is_flagged(f, "sourcecache_entry"): + return f"{get_flag_value(f, 'sourcecache_entry')} (cached)" + else: + return f + + def format_files(files): + return ", ".join(map(format_file, files)) + s = list() if self.forced: s.append("Forced execution") @@ -1610,24 +1622,18 @@ def __str__(self): else: if self._missing_output: s.append( - "Missing output files: {}".format( - ", ".join(self.missing_output) - ) + f"Missing output files: {format_files(self.missing_output)}" ) if self._incomplete_output: s.append( - "Incomplete output files: {}".format( - ", ".join(self.incomplete_output) - ) + f"Incomplete output files: {format_files(self.incomplete_output)}" ) if self._updated_input: updated_input = self.updated_input - self.updated_input_run - s.append("Updated input files: {}".format(", ".join(updated_input))) + s.append(f"Updated input files: {format_files(updated_input)}") if self._updated_input_run: s.append( - "Input files updated by another job: {}".format( - ", ".join(self.updated_input_run) - ) + f"Input files updated by another job: {format_files(self.updated_input_run)}" ) if self.pipe: s.append( @@ -1639,7 +1645,7 @@ def __str__(self): ) s = "; ".join(s) if self.finished: - return "Finished (was: {s})".format(s=s) + return f"Finished (was: {s})" return s def __bool__(self): diff --git a/snakemake/sourcecache.py b/snakemake/sourcecache.py index 8ee479d7e..2efcd9d96 100644 --- a/snakemake/sourcecache.py +++ b/snakemake/sourcecache.py @@ -223,7 +223,7 @@ def __init__( self.path = path.strip("/") def is_persistently_cacheable(self): - return self.tag or self.commit + return bool(self.tag or self.commit) def get_filename(self): return os.path.basename(self.path) @@ -331,7 +331,6 @@ def __init__(self, runtime_cache_path=None): runtime_cache_parent = self.cache / "runtime-cache" os.makedirs(runtime_cache_parent, exist_ok=True) self.runtime_cache = tempfile.TemporaryDirectory( - suffix="snakemake-runtime-source-cache", dir=runtime_cache_parent, ) self._runtime_cache_path = None @@ -357,7 +356,7 @@ def exists(self, source_file): def get_path(self, source_file, mode="r"): cache_entry = self._cache(source_file) - return cache_entry + return str(cache_entry) def _cache_entry(self, source_file): urihash = source_file.get_uri_hash() diff --git a/snakemake/workflow.py b/snakemake/workflow.py index 488b8dda7..cf6bad0bd 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -16,7 +16,7 @@ from operator import attrgetter import copy import subprocess -from pathlib import Path +from pathlib import Path, PosixPath from urllib.request import pathname2url, url2pathname @@ -56,6 +56,7 @@ report, multiext, IOFile, + sourcecache_entry, ) from snakemake.persistence import Persistence from snakemake.utils import update_config @@ -1154,9 +1155,19 @@ def source_path(self, rel_path): frame = inspect.currentframe().f_back calling_file = frame.f_code.co_filename - calling_dir = os.path.dirname(calling_file) - path = smart_join(calling_dir, rel_path) - return self.sourcecache.get_path(infer_source_file(path)) + + if calling_file == self.included_stack[-1].get_path_or_uri(): + # called from current snakefile, we can try to keep the original source + # file annotation + path = self.current_basedir.join(rel_path) + else: + # heuristically determine path + calling_dir = os.path.dirname(calling_file) + path = smart_join(calling_dir, rel_path) + + return sourcecache_entry( + self.sourcecache.get_path(infer_source_file(path)), path + ) @property def snakefile(self):