Skip to content

Commit

Permalink
refactor: cleaner caching persistence detection in sourcecache module (
Browse files Browse the repository at this point in the history
…#1567)

* fix: properly use retry mechanism in source cache

* fix: persistence detection robustness

* flag sourcecache entries obtained by source_path

* show cached path or uri

* improved reason printing

* fix: properly infer source file persistence information when caching via source_file

* fmt

* cleanup
  • Loading branch information
johanneskoester committed Apr 5, 2022
1 parent d2223d4 commit 3f32409
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 18 deletions.
4 changes: 4 additions & 0 deletions snakemake/io.py
Expand Up @@ -1082,6 +1082,10 @@ def checkpoint_target(value):
return flag(value, "checkpoint_target")


def sourcecache_entry(value, orig_path_or_uri):
return flag(value, "sourcecache_entry", orig_path_or_uri)


ReportObject = collections.namedtuple(
"ReportObject",
["caption", "category", "subcategory", "labels", "patterns", "htmlindex"],
Expand Down
28 changes: 17 additions & 11 deletions snakemake/jobs.py
Expand Up @@ -50,6 +50,9 @@ def format_files(job, io, dynamicio):
yield f"{f} (service)"
elif is_flagged(f, "checkpoint_target"):
yield TBDString()
elif is_flagged(f, "sourcecache_entry"):
orig_path_or_uri = get_flag_value(f, "sourcecache_entry")
yield f"{orig_path_or_uri} (cached)"
else:
yield f

Expand Down Expand Up @@ -1594,6 +1597,15 @@ def mark_finished(self):
self.finished = True

def __str__(self):
def format_file(f):
if is_flagged(f, "sourcecache_entry"):
return f"{get_flag_value(f, 'sourcecache_entry')} (cached)"
else:
return f

def format_files(files):
return ", ".join(map(format_file, files))

s = list()
if self.forced:
s.append("Forced execution")
Expand All @@ -1610,24 +1622,18 @@ def __str__(self):
else:
if self._missing_output:
s.append(
"Missing output files: {}".format(
", ".join(self.missing_output)
)
f"Missing output files: {format_files(self.missing_output)}"
)
if self._incomplete_output:
s.append(
"Incomplete output files: {}".format(
", ".join(self.incomplete_output)
)
f"Incomplete output files: {format_files(self.incomplete_output)}"
)
if self._updated_input:
updated_input = self.updated_input - self.updated_input_run
s.append("Updated input files: {}".format(", ".join(updated_input)))
s.append(f"Updated input files: {format_files(updated_input)}")
if self._updated_input_run:
s.append(
"Input files updated by another job: {}".format(
", ".join(self.updated_input_run)
)
f"Input files updated by another job: {format_files(self.updated_input_run)}"
)
if self.pipe:
s.append(
Expand All @@ -1639,7 +1645,7 @@ def __str__(self):
)
s = "; ".join(s)
if self.finished:
return "Finished (was: {s})".format(s=s)
return f"Finished (was: {s})"
return s

def __bool__(self):
Expand Down
5 changes: 2 additions & 3 deletions snakemake/sourcecache.py
Expand Up @@ -223,7 +223,7 @@ def __init__(
self.path = path.strip("/")

def is_persistently_cacheable(self):
return self.tag or self.commit
return bool(self.tag or self.commit)

def get_filename(self):
return os.path.basename(self.path)
Expand Down Expand Up @@ -331,7 +331,6 @@ def __init__(self, runtime_cache_path=None):
runtime_cache_parent = self.cache / "runtime-cache"
os.makedirs(runtime_cache_parent, exist_ok=True)
self.runtime_cache = tempfile.TemporaryDirectory(
suffix="snakemake-runtime-source-cache",
dir=runtime_cache_parent,
)
self._runtime_cache_path = None
Expand All @@ -357,7 +356,7 @@ def exists(self, source_file):

def get_path(self, source_file, mode="r"):
cache_entry = self._cache(source_file)
return cache_entry
return str(cache_entry)

def _cache_entry(self, source_file):
urihash = source_file.get_uri_hash()
Expand Down
19 changes: 15 additions & 4 deletions snakemake/workflow.py
Expand Up @@ -16,7 +16,7 @@
from operator import attrgetter
import copy
import subprocess
from pathlib import Path
from pathlib import Path, PosixPath
from urllib.request import pathname2url, url2pathname


Expand Down Expand Up @@ -56,6 +56,7 @@
report,
multiext,
IOFile,
sourcecache_entry,
)
from snakemake.persistence import Persistence
from snakemake.utils import update_config
Expand Down Expand Up @@ -1154,9 +1155,19 @@ def source_path(self, rel_path):

frame = inspect.currentframe().f_back
calling_file = frame.f_code.co_filename
calling_dir = os.path.dirname(calling_file)
path = smart_join(calling_dir, rel_path)
return self.sourcecache.get_path(infer_source_file(path))

if calling_file == self.included_stack[-1].get_path_or_uri():
# called from current snakefile, we can try to keep the original source
# file annotation
path = self.current_basedir.join(rel_path)
else:
# heuristically determine path
calling_dir = os.path.dirname(calling_file)
path = smart_join(calling_dir, rel_path)

return sourcecache_entry(
self.sourcecache.get_path(infer_source_file(path)), path
)

@property
def snakefile(self):
Expand Down

0 comments on commit 3f32409

Please sign in to comment.