Skip to content

Commit

Permalink
fix: implement lock-free source file caching. This avoids hangs on ne…
Browse files Browse the repository at this point in the history
…twork file systems like NFS. (#1464)

* fix: implement lock-free source file caching. This avoids hangs on network file systems like NFS.

* convert prefix path to str

* fix superfluous tempfile deletion

* close tmp file before moving
  • Loading branch information
johanneskoester committed Mar 11, 2022
1 parent 5355f5a commit 9520e98
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
1 change: 0 additions & 1 deletion setup.py
Expand Up @@ -74,7 +74,6 @@
"connection_pool >=0.0.3",
"pulp >=2.0",
"smart_open >=3.0",
"filelock",
"stopit",
"tabulate",
"yte >=1.0,<2.0",
Expand Down
25 changes: 13 additions & 12 deletions snakemake/sourcecache.py
Expand Up @@ -7,6 +7,7 @@
from pathlib import Path
import re
import os
import shutil
from snakemake import utils
import tempfile
import io
Expand Down Expand Up @@ -308,11 +309,6 @@ def __init__(self, runtime_cache_path=None):
def runtime_cache_path(self):
return self._runtime_cache_path or self.runtime_cache.name

def lock_cache(self, entry):
from filelock import FileLock

return FileLock(entry.with_suffix(".lock"))

def open(self, source_file, mode="r"):
cache_entry = self._cache(source_file)
return self._open(cache_entry, mode)
Expand Down Expand Up @@ -341,17 +337,22 @@ def _cache_entry(self, source_file):

def _cache(self, source_file):
cache_entry = self._cache_entry(source_file)
with self.lock_cache(cache_entry):
if not cache_entry.exists():
self._do_cache(source_file, cache_entry)
if not cache_entry.exists():
self._do_cache(source_file, cache_entry)
return cache_entry

def _do_cache(self, source_file, cache_entry):
# open from origin
with self._open(source_file.get_path_or_uri(), "rb") as source, open(
cache_entry, "wb"
) as cache_source:
cache_source.write(source.read())
with self._open(source_file.get_path_or_uri(), "rb") as source:
tmp_source = tempfile.NamedTemporaryFile(
prefix=str(cache_entry),
delete=False, # no need to delete since we move it below
)
tmp_source.write(source.read())
tmp_source.close()
# Atomic move to right name.
# This way we avoid the need to lock.
shutil.move(tmp_source.name, cache_entry)

mtime = source_file.mtime()
if mtime is not None:
Expand Down

0 comments on commit 9520e98

Please sign in to comment.