/
hash.py
142 lines (122 loc) · 5.26 KB
/
hash.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
__authors__ = "Johannes Köster, Sven Nahnsen"
__copyright__ = "Copyright 2022, Johannes Köster, Sven Nahnsen"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
import hashlib
import json
from snakemake.jobs import Job
from snakemake import script
from snakemake import wrapper
from snakemake.exceptions import WorkflowError
# ATTENTION: increase version number whenever the hashing algorithm below changes!
__version__ = "0.1"
class ProvenanceHashMap:
def __init__(self):
self._hashes = dict()
def get_provenance_hash(self, job: Job):
versioned_hash = hashlib.sha256()
# Ensure that semantic version changes in this module
versioned_hash.update(self._get_provenance_hash(job).encode())
versioned_hash.update(__version__.encode())
return versioned_hash.hexdigest()
def _get_provenance_hash(self, job: Job):
"""
Recursively calculate hash for the output of the given job
and all upstream jobs in a blockchain fashion.
This is based on an idea of Sven Nahnsen.
Fails if job has more than one output file. The reason is that there
is no way to generate a per-output file hash without generating the files.
This hash, however, shall work without having to generate the files,
just by describing all steps down to a given job.
"""
if job in self._hashes:
return self._hashes[job]
workflow = job.dag.workflow
h = hashlib.sha256()
# Hash shell command or script.
if job.is_shell:
# We cannot use the formatted shell command, because it also contains threads,
# resources, and filenames (which shall be irrelevant for the hash).
h.update(job.rule.shellcmd.encode())
elif job.is_script:
_, source, _, _ = script.get_source(
job.rule.script,
job.rule.workflow.sourcecache,
basedir=job.rule.basedir,
wildcards=job.wildcards,
params=job.params,
)
h.update(source.encode())
elif job.is_notebook:
_, source, _, _ = script.get_source(
job.rule.notebook,
job.rule.workflow.sourcecache,
basedir=job.rule.basedir,
wildcards=job.wildcards,
params=job.params,
)
h.update(source.encode())
elif job.is_wrapper:
_, source, _, _ = script.get_source(
wrapper.get_script(
job.rule.wrapper,
sourcecache=job.rule.workflow.sourcecache,
prefix=workflow.wrapper_prefix,
),
job.rule.workflow.sourcecache,
basedir=job.rule.basedir,
wildcards=job.wildcards,
params=job.params,
)
h.update(source.encode())
# Hash params.
for key, value in sorted(job.params._allitems()):
if key is not None:
h.update(key.encode())
# If this raises a TypeError, we cannot calculate a reliable hash.
try:
h.update(json.dumps(value, sort_keys=True).encode())
except TypeError as e:
raise WorkflowError(
"Rule {} cannot be cached, because params "
"are not JSON serializable. "
"Consider converting them into a suitable format "
"if you are sure that caching is necessary. "
"Otherwise, deactivate caching for this rule "
"by removing it from the --cache command line argument "
"or removing the cache: true directive from the rule itself.".format(
job.rule.name
),
e,
)
# Hash input files that are not generated by other jobs (sorted by hash value).
for file_hash in sorted(
hash_file(f)
for f in job.input
if not any(f in depfiles for depfiles in job.dag.dependencies[job].values())
):
h.update(file_hash.encode())
# Hash used containers or conda environments.
if workflow.use_conda and job.conda_env:
if workflow.use_singularity and job.conda_env.container_img_url:
h.update(job.conda_env.container_img_url.encode())
h.update(job.conda_env.content)
elif workflow.use_singularity and job.container_img_url:
h.update(job.container_img_url.encode())
# Generate hashes of dependencies, and add them in a blockchain fashion (as input to the current hash, sorted by hash value).
for dep_hash in sorted(
self._get_provenance_hash(dep)
for dep in set(job.dag.dependencies[job].keys())
):
h.update(dep_hash.encode())
provenance_hash = h.hexdigest()
# Store for re-use.
self._hashes[job] = provenance_hash
return provenance_hash
def hash_file(f):
h = hashlib.sha256()
with open(f, "rb") as f:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: f.read(4096), b""):
h.update(byte_block)
return h.hexdigest()