From 1ae85c6b57b3c6b5860214a9c0e3ab28c7c8c5dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Thu, 12 May 2022 09:19:38 +0200 Subject: [PATCH] feat: for small files, compare checksums to determine if job needs to run if input file is newer than output file (#1568) * feat: for small files, compare checksums to determine if job needs to run if input file is newer than output file * check local existence * read file in binary mode * fixed rewrite, now being output file based * do not calculate checksum of directories * fix: fix order of checksum eligibility test * fixes * skip reading of empty file for checksum creation * allow forcing checksum generation --- snakemake/dag.py | 32 +++++++++++++++++++++++- snakemake/io.py | 53 +++++++++++++++++++++++++++++++++++++--- snakemake/persistence.py | 17 +++++++++++++ 3 files changed, 97 insertions(+), 5 deletions(-) diff --git a/snakemake/dag.py b/snakemake/dag.py index ee71dbe01..aa1d017b5 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -954,6 +954,30 @@ def update_output_mintime(job): return output_mintime[job] = None + is_same_checksum_cache = dict() + + def is_same_checksum(f, job): + try: + return is_same_checksum_cache[(f, job)] + except KeyError: + if not f.is_checksum_eligible(): + # no chance to compute checksum, cannot be assumed the same + is_same = False + else: + # obtain the input checksums for the given file for all output files of the job + checksums = self.workflow.persistence.input_checksums(job, f) + if len(checksums) > 1: + # more than one checksum recorded, cannot be all the same + is_same = False + elif not checksums: + # no checksums recorded, we cannot assume them to be the same + is_same = False + else: + is_same = f.is_same_checksum(checksums.pop()) + + is_same_checksum_cache[(f, job)] = is_same + return is_same + def update_needrun(job): reason = self.reason(job) noinitreason = not reason @@ -1000,8 +1024,14 @@ def update_needrun(job): if not reason: output_mintime_ = output_mintime.get(job) if output_mintime_: + # Input is updated if it is newer that the oldest output file + # and does not have the same checksum as the one previously recorded. updated_input = [ - f for f in job.input if f.exists and f.is_newer(output_mintime_) + f + for f in job.input + if f.exists + and f.is_newer(output_mintime_) + and not is_same_checksum(f, job) ] reason.updated_input.update(updated_input) if noinitreason and reason: diff --git a/snakemake/io.py b/snakemake/io.py index 7c53a9a45..4d91ec5e6 100755 --- a/snakemake/io.py +++ b/snakemake/io.py @@ -4,6 +4,7 @@ __license__ = "MIT" import collections +from hashlib import sha256 import os import shutil from pathlib import Path @@ -561,6 +562,10 @@ def get_dir_mtime(): def flags(self): return getattr(self._file, "flags", {}) + def is_fifo(self): + """Return True if file is a FIFO according to the filesystem.""" + return stat.S_ISFIFO(os.stat(self).st_mode) + @property @iocache @_refer_to_remote @@ -573,12 +578,52 @@ def size_local(self): self.check_broken_symlink() return os.path.getsize(self.file) + def is_checksum_eligible(self): + return ( + self.exists_local + and not os.path.isdir(self.file) + and self.size < 100000 + and not self.is_fifo() + ) + + def checksum(self, force=False): + """Return checksum if file is small enough, else None. + Returns None if file does not exist. If force is True, + omit eligibility check.""" + if force or self.is_checksum_eligible(): # less than 100000 bytes + checksum = sha256() + if self.size > 0: + # only read if file is bigger than zero + # otherwise the checksum is the same as taking hexdigest + # from the empty sha256 as initialized above + # This helps endless reading in case the input + # is a named pipe or a socket or a symlink to a device like + # /dev/random. + with open(self.file, "rb") as f: + checksum.update(f.read()) + return checksum.hexdigest() + else: + return None + + def is_same_checksum(self, other_checksum): + checksum = self.checksum() + if checksum is None or other_checksum is None: + # if no checksum available or files too large, not the same + return False + else: + return checksum == other_checksum + def check_broken_symlink(self): """Raise WorkflowError if file is a broken symlink.""" - if not self.exists_local and os.lstat(self.file): - raise WorkflowError( - "File {} seems to be a broken symlink.".format(self.file) - ) + if not self.exists_local: + try: + if os.lstat(self.file): + raise WorkflowError( + "File {} seems to be a broken symlink.".format(self.file) + ) + except FileNotFoundError as e: + # there is no broken symlink present, hence all fine + return @_refer_to_remote def is_newer(self, time): diff --git a/snakemake/persistence.py b/snakemake/persistence.py index 4292c7c7a..216f52ec0 100755 --- a/snakemake/persistence.py +++ b/snakemake/persistence.py @@ -241,6 +241,9 @@ def finished(self, job, keep_metadata=True): "starttime", None ) endtime = f.mtime.local_or_remote() if f.exists else fallback_time + + checksums = ((infile, infile.checksum()) for infile in job.input) + self._record( self._metadata_path, { @@ -257,6 +260,11 @@ def finished(self, job, keep_metadata=True): "job_hash": hash(job), "conda_env": conda_env, "container_img_url": job.container_img_url, + "input_checksums": { + infile: checksum + for infile, checksum in checksums + if checksum is not None + }, }, f, ) @@ -323,6 +331,15 @@ def params(self, path): def code(self, path): return self.metadata(path).get("code") + def input_checksums(self, job, input_path): + """Return all checksums of the given input file + recorded for the output of the given job. + """ + return set( + self.metadata(output_path).get("input_checksums", {}).get(input_path) + for output_path in job.output + ) + def version_changed(self, job, file=None): """Yields output files with changed versions or bool if file given.""" return _bool_or_gen(self._version_changed, job, file=file)