-
Notifications
You must be signed in to change notification settings - Fork 1
/
phist.py
154 lines (128 loc) · 4.54 KB
/
phist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python3
"""A tool to predict prokaryotic hosts for phage (meta)genomic sequences.
PHIST links viruses to hosts based on the number of k-mers shared between
their sequences.
Copyright (C) 2021 A. Zielezinski, S. Deorowicz, and A. Gudys
https://github.com/refresh-bio/PHIST
"""
from __future__ import annotations
import argparse
import multiprocessing
import platform
from pathlib import Path
import subprocess
import sys
__version__ = '1.1.0'
def get_parser() -> argparse.ArgumentParser:
desc = f'PHIST predicts hosts from phage (meta)genomic data'
p = argparse.ArgumentParser(description=desc)
p.add_argument('virus_dir', metavar='virus_dir',
help='Input directory w/ virus FASTA files (plain or gzip)')
p.add_argument('host_dir', metavar='host_dir',
help='Input directory w/ host FASTA files (plain or gzip)')
p.add_argument('out_dir', metavar='out_dir', nargs='+',
help='Output directory (will be created if it does not exist)')
p.add_argument('-k', dest='k', type=int,
default=25, help='k-mer length [default = %(default)s]')
p.add_argument('-t', dest='num_threads', type=int,
default=multiprocessing.cpu_count(),
help='Number of threads [default = %(default)s]')
p.add_argument('--version', action='version',
version=__version__,
help="Show tool's version number and exit")
# Display help if the script is run without arguments.
if len(sys.argv[1:]) == 0:
p.print_help()
p.exit()
return p
def validate_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
"""Validates arguments provided by the user.
Returns:
Arguments provided by the users.
Raises:
argparse.ArgumentParser.error if arguments are invalid.
"""
args = parser.parse_args()
# Validate k-mer length.
if args.k < 3 or args.k > 30:
parser.error(f'K-mer length should be in range 3-30.')
# Validate input directories.
vdir_path = Path(args.virus_dir)
hdir_path = Path(args.host_dir)
for directory in [vdir_path, hdir_path]:
if not directory.exists() or not directory.is_dir():
parser.error(f'Input directory does not exist: {directory}')
args.vdir_path = vdir_path
args.hdir_path = hdir_path
# Validate output files
if len(args.out_dir) > 1:
args.outtable_path = Path(args.out_dir[0])
args.outpred_path = Path(args.out_dir[1])
args.out_dir = args.outtable_path.parent
else:
args.out_dir = Path(args.out_dir[0])
args.outtable_path = args.out_dir / 'common_kmers.csv'
args.outpred_path = args.out_dir / 'predictions.csv'
return args
if __name__ == '__main__':
PHIST_DIR = Path(__file__).resolve().parent
if platform.system() == "Windows":
kmer_exec = PHIST_DIR.joinpath('kmer-db', 'src', 'x64', 'Release', 'kmer-db.exe')
util_exec = PHIST_DIR.joinpath('utils', 'x64', 'Release', 'phist.exe')
else:
kmer_exec = PHIST_DIR.joinpath('kmer-db', 'kmer-db')
util_exec = PHIST_DIR.joinpath('utils', 'phist')
parser = get_parser()
args = validate_args(parser)
print(
f'PHIST v{__version__}\n',
'A. Zielezinski, S. Deorowicz, A. Gudys (c) 2021\n\n')
vdir_path = args.vdir_path
hdir_path = args.hdir_path
out_dir = args.out_dir
out_dir.mkdir(parents=True, exist_ok=True)
# Paths to temp files.
vlst_path = out_dir / 'vir.list'
hlst_path = out_dir / 'host.list'
db_path = out_dir / 'vir.db'
# Create vir.list and host.list.
for lst_path, dir_path in [(vlst_path, vdir_path), (hlst_path, hdir_path)]:
oh = open(lst_path, 'w')
for f in sorted(dir_path.iterdir()):
oh.write(f"{f}\n")
oh.close()
# Kmer-db build
cmd = [
f'{kmer_exec}',
'build',
'-k',
f'{args.k}',
'-t',
f'{args.num_threads}',
f'{vlst_path}',
f'{db_path}',
]
subprocess.run(cmd)
# Kmer-db new2all
cmd = [
f'{kmer_exec}',
'new2all',
'-sparse',
'-t',
f'{args.num_threads}',
f'{db_path}',
f'{hlst_path}',
f'{args.outtable_path}',
]
subprocess.run(cmd)
# Remove temp files.
vlst_path.unlink()
hlst_path.unlink()
db_path.unlink()
# Postprocessing
cmd = [
f'{util_exec}',
f'{args.outtable_path}',
f'{args.outpred_path}',
]
subprocess.run(cmd)