Skip to content

Commit

Permalink
feat: for small files, compare checksums to determine if job needs to…
Browse files Browse the repository at this point in the history
… run if input file is newer than output file (#1568)

* feat: for small files, compare checksums to determine if job needs to run if input file is newer than output file

* check local existence

* read file in binary mode

* fixed rewrite, now being output file based

* do not calculate checksum of directories

* fix: fix order of checksum eligibility test

* fixes

* skip reading of empty file for checksum creation

* allow forcing checksum generation
  • Loading branch information
johanneskoester committed May 12, 2022
1 parent c8d81d0 commit 1ae85c6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 5 deletions.
32 changes: 31 additions & 1 deletion snakemake/dag.py
Expand Up @@ -954,6 +954,30 @@ def update_output_mintime(job):
return
output_mintime[job] = None

is_same_checksum_cache = dict()

def is_same_checksum(f, job):
try:
return is_same_checksum_cache[(f, job)]
except KeyError:
if not f.is_checksum_eligible():
# no chance to compute checksum, cannot be assumed the same
is_same = False
else:
# obtain the input checksums for the given file for all output files of the job
checksums = self.workflow.persistence.input_checksums(job, f)
if len(checksums) > 1:
# more than one checksum recorded, cannot be all the same
is_same = False
elif not checksums:
# no checksums recorded, we cannot assume them to be the same
is_same = False
else:
is_same = f.is_same_checksum(checksums.pop())

is_same_checksum_cache[(f, job)] = is_same
return is_same

def update_needrun(job):
reason = self.reason(job)
noinitreason = not reason
Expand Down Expand Up @@ -1000,8 +1024,14 @@ def update_needrun(job):
if not reason:
output_mintime_ = output_mintime.get(job)
if output_mintime_:
# Input is updated if it is newer that the oldest output file
# and does not have the same checksum as the one previously recorded.
updated_input = [
f for f in job.input if f.exists and f.is_newer(output_mintime_)
f
for f in job.input
if f.exists
and f.is_newer(output_mintime_)
and not is_same_checksum(f, job)
]
reason.updated_input.update(updated_input)
if noinitreason and reason:
Expand Down
53 changes: 49 additions & 4 deletions snakemake/io.py
Expand Up @@ -4,6 +4,7 @@
__license__ = "MIT"

import collections
from hashlib import sha256
import os
import shutil
from pathlib import Path
Expand Down Expand Up @@ -561,6 +562,10 @@ def get_dir_mtime():
def flags(self):
return getattr(self._file, "flags", {})

def is_fifo(self):
"""Return True if file is a FIFO according to the filesystem."""
return stat.S_ISFIFO(os.stat(self).st_mode)

@property
@iocache
@_refer_to_remote
Expand All @@ -573,12 +578,52 @@ def size_local(self):
self.check_broken_symlink()
return os.path.getsize(self.file)

def is_checksum_eligible(self):
return (
self.exists_local
and not os.path.isdir(self.file)
and self.size < 100000
and not self.is_fifo()
)

def checksum(self, force=False):
"""Return checksum if file is small enough, else None.
Returns None if file does not exist. If force is True,
omit eligibility check."""
if force or self.is_checksum_eligible(): # less than 100000 bytes
checksum = sha256()
if self.size > 0:
# only read if file is bigger than zero
# otherwise the checksum is the same as taking hexdigest
# from the empty sha256 as initialized above
# This helps endless reading in case the input
# is a named pipe or a socket or a symlink to a device like
# /dev/random.
with open(self.file, "rb") as f:
checksum.update(f.read())
return checksum.hexdigest()
else:
return None

def is_same_checksum(self, other_checksum):
checksum = self.checksum()
if checksum is None or other_checksum is None:
# if no checksum available or files too large, not the same
return False
else:
return checksum == other_checksum

def check_broken_symlink(self):
"""Raise WorkflowError if file is a broken symlink."""
if not self.exists_local and os.lstat(self.file):
raise WorkflowError(
"File {} seems to be a broken symlink.".format(self.file)
)
if not self.exists_local:
try:
if os.lstat(self.file):
raise WorkflowError(
"File {} seems to be a broken symlink.".format(self.file)
)
except FileNotFoundError as e:
# there is no broken symlink present, hence all fine
return

@_refer_to_remote
def is_newer(self, time):
Expand Down
17 changes: 17 additions & 0 deletions snakemake/persistence.py
Expand Up @@ -241,6 +241,9 @@ def finished(self, job, keep_metadata=True):
"starttime", None
)
endtime = f.mtime.local_or_remote() if f.exists else fallback_time

checksums = ((infile, infile.checksum()) for infile in job.input)

self._record(
self._metadata_path,
{
Expand All @@ -257,6 +260,11 @@ def finished(self, job, keep_metadata=True):
"job_hash": hash(job),
"conda_env": conda_env,
"container_img_url": job.container_img_url,
"input_checksums": {
infile: checksum
for infile, checksum in checksums
if checksum is not None
},
},
f,
)
Expand Down Expand Up @@ -323,6 +331,15 @@ def params(self, path):
def code(self, path):
return self.metadata(path).get("code")

def input_checksums(self, job, input_path):
"""Return all checksums of the given input file
recorded for the output of the given job.
"""
return set(
self.metadata(output_path).get("input_checksums", {}).get(input_path)
for output_path in job.output
)

def version_changed(self, job, file=None):
"""Yields output files with changed versions or bool if file given."""
return _bool_or_gen(self._version_changed, job, file=file)
Expand Down

0 comments on commit 1ae85c6

Please sign in to comment.