/
persistence.py
executable file
·529 lines (437 loc) · 17.9 KB
/
persistence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
__author__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
import os
import shutil
import signal
import marshal
import pickle
import json
import time
from base64 import urlsafe_b64encode, b64encode
from functools import lru_cache, partial
from itertools import filterfalse, count
from pathlib import Path
import snakemake.exceptions
from snakemake.logging import logger
from snakemake.jobs import jobfiles
from snakemake.utils import listfiles
class Persistence:
def __init__(
self,
nolock=False,
dag=None,
conda_prefix=None,
singularity_prefix=None,
shadow_prefix=None,
warn_only=False,
):
self.path = os.path.abspath(".snakemake")
if not os.path.exists(self.path):
os.mkdir(self.path)
self._lockdir = os.path.join(self.path, "locks")
if not os.path.exists(self._lockdir):
os.mkdir(self._lockdir)
self.dag = dag
self._lockfile = dict()
self._metadata_path = os.path.join(self.path, "metadata")
self._incomplete_path = os.path.join(self.path, "incomplete")
self.conda_env_archive_path = os.path.join(self.path, "conda-archive")
self.benchmark_path = os.path.join(self.path, "benchmarks")
self.source_cache = os.path.join(self.path, "source_cache")
if conda_prefix is None:
self.conda_env_path = os.path.join(self.path, "conda")
else:
self.conda_env_path = os.path.abspath(os.path.expanduser(conda_prefix))
if singularity_prefix is None:
self.container_img_path = os.path.join(self.path, "singularity")
else:
self.container_img_path = os.path.abspath(
os.path.expanduser(singularity_prefix)
)
if shadow_prefix is None:
self.shadow_path = os.path.join(self.path, "shadow")
else:
self.shadow_path = os.path.join(shadow_prefix, "shadow")
# place to store any auxiliary information needed during a run (e.g. source tarballs)
self.aux_path = os.path.join(self.path, "auxiliary")
# migration of .snakemake folder structure
migration_indicator = Path(
os.path.join(self._incomplete_path, "migration_underway")
)
if (
os.path.exists(self._metadata_path)
and not os.path.exists(self._incomplete_path)
) or migration_indicator.exists():
os.makedirs(self._incomplete_path, exist_ok=True)
migration_indicator.touch()
self.migrate_v1_to_v2()
migration_indicator.unlink()
self._incomplete_cache = None
for d in (
self._metadata_path,
self._incomplete_path,
self.shadow_path,
self.conda_env_archive_path,
self.conda_env_path,
self.container_img_path,
self.aux_path,
):
os.makedirs(d, exist_ok=True)
if nolock:
self.lock = self.noop
self.unlock = self.noop
if warn_only:
self.lock = self.lock_warn_only
self.unlock = self.noop
self._read_record = self._read_record_cached
def migrate_v1_to_v2(self):
logger.info("Migrating .snakemake folder to new format...")
i = 0
for path, _, filenames in os.walk(self._metadata_path):
path = Path(path)
for filename in filenames:
with open(path / filename, "r") as f:
try:
record = json.load(f)
except json.JSONDecodeError:
continue # not a properly formatted JSON file
if record.get("incomplete", False):
target_path = Path(self._incomplete_path) / path.relative_to(
self._metadata_path
)
os.makedirs(target_path, exist_ok=True)
shutil.copyfile(
path / filename,
target_path / filename,
)
i += 1
# this can take a while for large folders...
if (i % 10000) == 0 and i > 0:
logger.info("{} files migrated".format(i))
logger.info("Migration complete")
@property
def files(self):
if self._files is None:
self._files = set(self.dag.output_files)
return self._files
@property
def locked(self):
inputfiles = set(self.all_inputfiles())
outputfiles = set(self.all_outputfiles())
if os.path.exists(self._lockdir):
for lockfile in self._locks("input"):
with open(lockfile) as lock:
for f in lock:
f = f.strip()
if f in outputfiles:
return True
for lockfile in self._locks("output"):
with open(lockfile) as lock:
for f in lock:
f = f.strip()
if f in outputfiles or f in inputfiles:
return True
return False
def lock_warn_only(self):
if self.locked:
logger.info(
"Error: Directory cannot be locked. This usually "
"means that another Snakemake instance is running on this directory. "
"Another possibility is that a previous run exited unexpectedly."
)
def lock(self):
if self.locked:
raise snakemake.exceptions.LockException()
self._lock(self.all_inputfiles(), "input")
self._lock(self.all_outputfiles(), "output")
def unlock(self, *args):
logger.debug("unlocking")
for lockfile in self._lockfile.values():
try:
logger.debug("removing lock")
os.remove(lockfile)
except OSError as e:
if e.errno != 2: # missing file
raise e
logger.debug("removed all locks")
def cleanup_locks(self):
shutil.rmtree(self._lockdir)
def cleanup_metadata(self, path):
return self._delete_record(self._metadata_path, path)
def cleanup_shadow(self):
if os.path.exists(self.shadow_path):
shutil.rmtree(self.shadow_path)
os.mkdir(self.shadow_path)
def conda_cleanup_envs(self):
# cleanup envs
in_use = set(env.hash[:8] for env in self.dag.conda_envs.values())
for d in os.listdir(self.conda_env_path):
if len(d) >= 8 and d[:8] not in in_use:
if os.path.isdir(os.path.join(self.conda_env_path, d)):
shutil.rmtree(os.path.join(self.conda_env_path, d))
else:
os.remove(os.path.join(self.conda_env_path, d))
# cleanup env archives
in_use = set(env.content_hash for env in self.dag.conda_envs.values())
for d in os.listdir(self.conda_env_archive_path):
if d not in in_use:
shutil.rmtree(os.path.join(self.conda_env_archive_path, d))
def started(self, job, external_jobid=None):
for f in job.output:
self._record(
self._incomplete_path,
{"external_jobid": external_jobid},
f,
)
def finished(self, job, keep_metadata=True):
if not keep_metadata:
for f in job.expanded_output:
self._delete_record(self._incomplete_path, f)
return
version = str(job.rule.version) if job.rule.version is not None else None
code = self._code(job.rule)
input = self._input(job)
log = self._log(job)
params = self._params(job)
shellcmd = job.shellcmd
conda_env = self._conda_env(job)
fallback_time = time.time()
for f in job.expanded_output:
rec_path = self._record_path(self._incomplete_path, f)
starttime = os.path.getmtime(rec_path) if os.path.exists(rec_path) else None
# Sometimes finished is called twice, if so, lookup the previous starttime
if not os.path.exists(rec_path):
starttime = self._read_record(self._metadata_path, f).get(
"starttime", None
)
endtime = f.mtime.local_or_remote() if f.exists else fallback_time
checksums = ((infile, infile.checksum()) for infile in job.input)
self._record(
self._metadata_path,
{
"version": version,
"code": code,
"rule": job.rule.name,
"input": input,
"log": log,
"params": params,
"shellcmd": shellcmd,
"incomplete": False,
"starttime": starttime,
"endtime": endtime,
"job_hash": hash(job),
"conda_env": conda_env,
"container_img_url": job.container_img_url,
"input_checksums": {
infile: checksum
for infile, checksum in checksums
if checksum is not None
},
},
f,
)
self._delete_record(self._incomplete_path, f)
def cleanup(self, job):
for f in job.expanded_output:
self._delete_record(self._incomplete_path, f)
self._delete_record(self._metadata_path, f)
def incomplete(self, job):
if self._incomplete_cache is None:
self._cache_incomplete_folder()
if self._incomplete_cache is False: # cache deactivated
def marked_incomplete(f):
return self._exists_record(self._incomplete_path, f)
else:
def marked_incomplete(f):
rec_path = self._record_path(self._incomplete_path, f)
return rec_path in self._incomplete_cache
return any(map(lambda f: f.exists and marked_incomplete(f), job.output))
def _cache_incomplete_folder(self):
self._incomplete_cache = {
os.path.join(path, f)
for path, dirnames, filenames in os.walk(self._incomplete_path)
for f in filenames
}
def external_jobids(self, job):
return list(
set(
self._read_record(self._incomplete_path, f).get("external_jobid", None)
for f in job.output
)
)
def metadata(self, path):
return self._read_record(self._metadata_path, path)
def version(self, path):
return self.metadata(path).get("version")
def rule(self, path):
return self.metadata(path).get("rule")
def input(self, path):
return self.metadata(path).get("input")
def log(self, path):
return self.metadata(path).get("log")
def shellcmd(self, path):
return self.metadata(path).get("shellcmd")
def params(self, path):
return self.metadata(path).get("params")
def code(self, path):
return self.metadata(path).get("code")
def conda_env(self, path):
return self.metadata(path).get("conda_env")
def container_img_url(self, path):
return self.metadata(path).get("container_img_url")
def input_checksums(self, job, input_path):
"""Return all checksums of the given input file
recorded for the output of the given job.
"""
return set(
self.metadata(output_path).get("input_checksums", {}).get(input_path)
for output_path in job.output
)
def version_changed(self, job, file=None):
"""Yields output files with changed versions or bool if file given."""
return _bool_or_gen(self._version_changed, job, file=file)
def code_changed(self, job, file=None):
"""Yields output files with changed code or bool if file given."""
return _bool_or_gen(self._code_changed, job, file=file)
def input_changed(self, job, file=None):
"""Yields output files with changed input or bool if file given."""
return _bool_or_gen(self._input_changed, job, file=file)
def params_changed(self, job, file=None):
"""Yields output files with changed params or bool if file given."""
return _bool_or_gen(self._params_changed, job, file=file)
def conda_env_changed(self, job, file=None):
"""Yields output files with changed conda env or bool if file given."""
return _bool_or_gen(self._conda_env_changed, job, file=file)
def container_changed(self, job, file=None):
"""Yields output files with changed container img or bool if file given."""
return _bool_or_gen(self._container_changed, job, file=file)
def _version_changed(self, job, file=None):
assert file is not None
recorded = self.version(file)
return recorded is not None and recorded != job.rule.version
def _code_changed(self, job, file=None):
assert file is not None
recorded = self.code(file)
return recorded is not None and recorded != self._code(job.rule)
def _input_changed(self, job, file=None):
assert file is not None
recorded = self.input(file)
return recorded is not None and recorded != self._input(job)
def _params_changed(self, job, file=None):
assert file is not None
recorded = self.params(file)
return recorded is not None and recorded != self._params(job)
def _conda_env_changed(self, job, file=None):
assert file is not None
recorded = self.conda_env(file)
return recorded is not None and recorded != self._conda_env(job)
def _container_changed(self, job, file=None):
assert file is not None
recorded = self.container_img_url(file)
return recorded is not None and recorded != job.container_img_url
def noop(self, *args):
pass
def _b64id(self, s):
return urlsafe_b64encode(str(s).encode()).decode()
@lru_cache()
def _code(self, rule):
code = rule.run_func.__code__
return b64encode(pickle_code(code)).decode()
@lru_cache()
def _conda_env(self, job):
if job.conda_env:
return b64encode(job.conda_env.content).decode()
@lru_cache()
def _input(self, job):
return sorted(job.input)
@lru_cache()
def _log(self, job):
return sorted(job.log)
@lru_cache()
def _params(self, job):
return sorted(map(repr, job.params))
@lru_cache()
def _output(self, job):
return sorted(job.output)
def _record(self, subject, json_value, id):
recpath = self._record_path(subject, id)
os.makedirs(os.path.dirname(recpath), exist_ok=True)
with open(recpath, "w") as f:
json.dump(json_value, f)
def _delete_record(self, subject, id):
try:
recpath = self._record_path(subject, id)
os.remove(recpath)
recdirs = os.path.relpath(os.path.dirname(recpath), start=subject)
if recdirs != ".":
os.removedirs(recdirs)
return True
except OSError as e:
if e.errno != 2:
# not missing
raise e
else:
# file is missing, report failure
return False
@lru_cache()
def _read_record_cached(self, subject, id):
return self._read_record_uncached(subject, id)
def _read_record_uncached(self, subject, id):
if not self._exists_record(subject, id):
return dict()
with open(self._record_path(subject, id), "r") as f:
return json.load(f)
def _exists_record(self, subject, id):
return os.path.exists(self._record_path(subject, id))
def _locks(self, type):
return (
f
for f, _ in listfiles(
os.path.join(self._lockdir, "{{n,[0-9]+}}.{}.lock".format(type))
)
if not os.path.isdir(f)
)
def _lock(self, files, type):
for i in count(0):
lockfile = os.path.join(self._lockdir, "{}.{}.lock".format(i, type))
if not os.path.exists(lockfile):
self._lockfile[type] = lockfile
with open(lockfile, "w") as lock:
print(*files, sep="\n", file=lock)
return
def _record_path(self, subject, id):
max_len = (
os.pathconf(subject, "PC_NAME_MAX") if os.name == "posix" else 255
) # maximum NTFS and FAT32 filename length
if max_len == 0:
max_len = 255
b64id = self._b64id(id)
# split into chunks of proper length
b64id = [b64id[i : i + max_len - 1] for i in range(0, len(b64id), max_len - 1)]
# prepend dirs with @ (does not occur in b64) to avoid conflict with b64-named files in the same dir
b64id = ["@" + s for s in b64id[:-1]] + [b64id[-1]]
path = os.path.join(subject, *b64id)
return path
def all_outputfiles(self):
# we only look at output files that will be updated
return jobfiles(self.dag.needrun_jobs, "output")
def all_inputfiles(self):
# we consider all input files, also of not running jobs
return jobfiles(self.dag.jobs, "input")
def deactivate_cache(self):
self._read_record_cached.cache_clear()
self._read_record = self._read_record_uncached
self._incomplete_cache = False
def _bool_or_gen(func, job, file=None):
if file is None:
return (f for f in job.expanded_output if func(job, file=f))
else:
return func(job, file=file)
def pickle_code(code):
consts = [
(pickle_code(const) if type(const) == type(code) else const)
for const in code.co_consts
]
return pickle.dumps((code.co_code, code.co_varnames, consts, code.co_names))