/
base.py
287 lines (237 loc) · 10.8 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""Base class for the various assembler wrappers."""
from os.path import basename, exists, getsize, join, splitext, abspath
import datetime
from subprocess import CalledProcessError
import lib.db as db
import lib.log as log
import lib.bio as bio
import lib.util as util
class BaseAssembler: # pylint: disable=too-many-public-methods
"""A base class for the assemblers."""
def __init__(self, args, cxn):
"""Build the assembler."""
self.args = args # Parsed command line arguments
self.blast_only = False # Used to short-circuit the assembler
self.steps = [] # Assembler steps setup by the assembler
self.file = {} # Files and record counts
# We need to pass these variables to child processes.
# So they cannot be directly attached to an object.
self.state = {
'iteration': 0, # Current iteration
'query_target': '', # Original name of the query sequence
'query_file': '', # Current query file name
'blast_db': '', # Current blast DB name
'iter_dir': '', # Name of the temp dir for this iteration
'cxn': cxn} # Save the DB connection
def init_iteration(self, blast_db, query_file, iteration):
"""Make file names used by the assembler."""
self.state['blast_db'] = blast_db
self.state['query_file'] = query_file
self.state['iteration'] = iteration
if iteration == 1:
self.state['query_target'] = query_file
def setup_files(self, iter_dir):
"""Build the file names and counts for the iteration."""
self.state['iter_dir'] = iter_dir
self.file['long_reads'] = '' # Set up in atram.py for now
names = 'output paired_1 paired_2 single_1 single_2 single_any'.split()
for name in names:
self.file[name] = self.iter_file(name + '.fasta')
# paired = paired_1_count + paired_2_count
for name in 'paired single_1 single_2 single_any'.split():
self.file[name + '_count'] = 0
def file_prefix(self):
"""Build a prefix for the iteration's work directory."""
return '{}_{}_{:02d}_'.format(
basename(self.state['blast_db']),
basename(self.state['query_target']),
self.state['iteration'])
def iter_file(self, file_name):
"""Put files into the temp dir."""
rel_path = join(self.state['iter_dir'], file_name)
# return rel_path
return abspath(rel_path)
def work_path(self):
"""Assembler output directory name may have unique requirements."""
return self.state['iter_dir']
def run(self):
"""Try to assemble the input."""
try:
log.info('Assembling shards with {}: iteration {}'.format(
self.args['assembler'], self.state['iteration']))
self.assemble()
except TimeoutError:
msg = 'Time ran out for the assembler after {} (HH:MM:SS)'.format(
datetime.timedelta(seconds=self.args['timeout']))
log.error(msg)
raise TimeoutError(msg)
except CalledProcessError as cpe:
msg = 'The assembler failed with error: ' + str(cpe)
log.error(msg)
raise RuntimeError(msg)
def count_blast_hits(self):
"""Make sure we have blast hits."""
count = db.sra_blast_hits_count(
self.state['cxn'], self.state['iteration'])
log.info('{} blast hits in iteration {}'.format(
count, self.state['iteration']))
return count
def nothing_assembled(self):
"""Make there is assembler output."""
if not exists(self.file['output']) \
or not getsize(self.file['output']):
log.info('No new assemblies in iteration {}'.format(
self.state['iteration']))
return True
return False
def assembled_contigs_count(self, high_score):
"""How many contigs were assembled and are above the thresholds."""
count = db.assembled_contigs_count(
self.state['cxn'],
self.state['iteration'],
self.args['bit_score'],
self.args['contig_length'])
if not count:
log.info('No contigs had a bit score greater than {} and are at '
'least {} bp long in iteration {}. The highest score for '
'this iteration is {}'.format(
self.args['bit_score'],
self.args['contig_length'],
self.state['iteration'],
high_score))
return count
def no_new_contigs(self, count):
"""Make the are new contigs in the assembler output."""
if count == db.iteration_overlap_count(
self.state['cxn'],
self.state['iteration'],
self.args['bit_score'],
self.args['contig_length']):
log.info('No new contigs were found in iteration {}'.format(
self.state['iteration']))
return True
return False
def assemble(self):
"""
Use the assembler to build up the contigs.
We take and array of subprocess steps and execute them in order. We
bracket this with pre and post assembly steps.
"""
for step in self.steps:
cmd = step()
log.subcommand(cmd, self.args['temp_dir'], self.args['timeout'])
self.post_assembly()
def post_assembly(self):
"""Handle unique post assembly steps."""
@staticmethod
def parse_contig_id(header):
"""Given a fasta header line from the assembler return contig ID."""
return header.split()[0]
def write_input_files(self):
"""Write blast hits and matching ends to fasta files."""
log.info('Writing assembler input files: iteration {}'.format(
self.state['iteration']))
self.write_paired_input_files()
self.write_single_input_files()
def write_paired_input_files(self):
"""Write blast hits and matching ends to fasta files."""
with open(self.file['paired_1'], 'w') as end_1, \
open(self.file['paired_2'], 'w') as end_2:
for row in db.get_blast_hits_by_end_count(
self.state['cxn'], self.state['iteration'], 2):
self.file['paired_count'] += 1
out_file = end_1 if row['seq_end'] == '1' else end_2
util.write_fasta_record(
out_file, row['seq_name'], row['seq'], row['seq_end'])
def write_single_input_files(self):
"""Write blast hits and matching ends to fasta files."""
with open(self.file['single_1'], 'w') as end_1, \
open(self.file['single_2'], 'w') as end_2, \
open(self.file['single_any'], 'w') as end_any:
for row in db.get_blast_hits_by_end_count(
self.state['cxn'], self.state['iteration'], 1):
if row['seq_end'] == '1':
out_file = end_1
seq_end = '/1'
self.file['single_1_count'] += 1
elif row['seq_end'] == '2':
out_file = end_2
seq_end = '/2'
self.file['single_2_count'] += 1
else:
out_file = end_any
seq_end = ''
self.file['single_any_count'] += 1
util.write_fasta_record(
out_file, row['seq_name'], row['seq'], seq_end)
def final_output_prefix(self, blast_db, query):
"""Build the prefix for the name of the final output file."""
blast_db = basename(blast_db)
query = splitext(basename(query))[0]
return '{}.{}_{}'.format(self.args['output_prefix'], blast_db, query)
def write_final_output(self, blast_db, query):
"""Write the assembler results file.
In this default case we're writing assembled contigs to fasta files.
"""
prefix = self.final_output_prefix(blast_db, query)
self.write_filtered_contigs(prefix)
self.write_all_contigs(prefix)
def write_filtered_contigs(self, prefix):
"""Write to the filtered contigs to a final output file."""
if self.args['no_filter']:
return
count = db.all_assembled_contigs_count(
self.state['cxn'],
self.args['bit_score'],
self.args['contig_length'])
if not count:
return
file_name = '{}.{}'.format(prefix, 'filtered_contigs.fasta')
contigs = db.get_all_assembled_contigs(
self.state['cxn'],
self.args['bit_score'],
self.args['contig_length'])
with open(file_name, 'w') as output_file:
for contig in contigs:
self.output_assembled_contig(output_file, contig)
def write_all_contigs(self, prefix):
"""Write all contigs to a final ouput file."""
count = db.all_assembled_contigs_count(self.state['cxn'])
if not count:
return
file_name = '{}.{}'.format(prefix, 'all_contigs.fasta')
contigs = db.get_all_assembled_contigs(self.state['cxn'])
with open(file_name, 'w') as output_file:
for contig in contigs:
self.output_assembled_contig(output_file, contig)
@staticmethod
def output_assembled_contig(output_file, contig):
"""Write one assembled contig to the output fasta file."""
seq = contig['seq']
suffix = ''
if contig['query_strand'] and contig['hit_strand'] and \
contig['query_strand'] != contig['hit_strand']:
seq = bio.reverse_complement(seq)
suffix = '_REV'
header = '{}_{}_{} iteration={} contig_id={} score={}'.format(
contig['iteration'], contig['contig_id'], suffix,
contig['iteration'], contig['contig_id'], contig['bit_score'])
util.write_fasta_record(output_file, header, seq)
def get_single_ends(self):
"""Gather single ends files for the assembly command."""
single_ends = []
if self.file['single_1_count']:
single_ends.append(self.file['single_1'])
if self.file['single_2_count']:
single_ends.append(self.file['single_2'])
if self.file['single_any_count']:
single_ends.append(self.file['single_any'])
return single_ends
def simple_state(self):
"""Save the state passed to subprocesses."""
return {
'blast_db': self.state['blast_db'],
'iteration': self.state['iteration'],
'query_file': self.state['query_file'],
'query_target': self.state['query_target'],
'iter_dir': self.state['iter_dir']}