From 9520e988a32f0c5369b4f2c68fdb741f21daa1a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Fri, 11 Mar 2022 12:20:42 +0100 Subject: [PATCH] fix: implement lock-free source file caching. This avoids hangs on network file systems like NFS. (#1464) * fix: implement lock-free source file caching. This avoids hangs on network file systems like NFS. * convert prefix path to str * fix superfluous tempfile deletion * close tmp file before moving --- setup.py | 1 - snakemake/sourcecache.py | 25 +++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/setup.py b/setup.py index 46ebfafbc..facd2d1b7 100644 --- a/setup.py +++ b/setup.py @@ -74,7 +74,6 @@ "connection_pool >=0.0.3", "pulp >=2.0", "smart_open >=3.0", - "filelock", "stopit", "tabulate", "yte >=1.0,<2.0", diff --git a/snakemake/sourcecache.py b/snakemake/sourcecache.py index c3dceb43e..dc768726b 100644 --- a/snakemake/sourcecache.py +++ b/snakemake/sourcecache.py @@ -7,6 +7,7 @@ from pathlib import Path import re import os +import shutil from snakemake import utils import tempfile import io @@ -308,11 +309,6 @@ def __init__(self, runtime_cache_path=None): def runtime_cache_path(self): return self._runtime_cache_path or self.runtime_cache.name - def lock_cache(self, entry): - from filelock import FileLock - - return FileLock(entry.with_suffix(".lock")) - def open(self, source_file, mode="r"): cache_entry = self._cache(source_file) return self._open(cache_entry, mode) @@ -341,17 +337,22 @@ def _cache_entry(self, source_file): def _cache(self, source_file): cache_entry = self._cache_entry(source_file) - with self.lock_cache(cache_entry): - if not cache_entry.exists(): - self._do_cache(source_file, cache_entry) + if not cache_entry.exists(): + self._do_cache(source_file, cache_entry) return cache_entry def _do_cache(self, source_file, cache_entry): # open from origin - with self._open(source_file.get_path_or_uri(), "rb") as source, open( - cache_entry, "wb" - ) as cache_source: - cache_source.write(source.read()) + with self._open(source_file.get_path_or_uri(), "rb") as source: + tmp_source = tempfile.NamedTemporaryFile( + prefix=str(cache_entry), + delete=False, # no need to delete since we move it below + ) + tmp_source.write(source.read()) + tmp_source.close() + # Atomic move to right name. + # This way we avoid the need to lock. + shutil.move(tmp_source.name, cache_entry) mtime = source_file.mtime() if mtime is not None: