/
SNPInputReader.py
executable file
·449 lines (371 loc) · 16.3 KB
/
SNPInputReader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# This is a helper module for Prephix. It uses a pseudo-factory pattern to
# handle the different SNP input file formats that Prephix expects to support.
#
# Each SNP input file reader class is expected to be ITERABLE, and returns
# a tupe of (line,lineNumber,realLocus,snpBase,refBase,isIndel,isInsert,isDelete)
#
# Where:
#
# line = The raw line as read from the input file.
# lineNumber = The line number of the file line.
# realLocus = Adjusted locus value (some formats have an offset relative to the base reference)
# snpBase = The sample base at the given locus
# refBase = The reference base at the given locus
# isIndel = Boolean to indicate if this line is an indel.
# isInsert = Boolean to indicate if this line is an indel insertion.
# isDelete = Boolean to indicate if this line is an indel deletion.
#
import re
import os
import sys
#
# Public Methods
#
def getSNPFileReader(fileName,filterQuality=True,multiChrom=False):
'''
This is basically an factory method that takes
in an SNP input file name for Prephix, determines
the file format, and returns an appropriate
reader/iterator object for that file.
SNP File Reader/Iterator objects are expected
to support iteration behavior (i.e. "for...in" statements)
that returns an SNPDataLine object with each call.
Parameters:
fileName = The SNP input file name. Assumes data for one strain per file.
filterQuality = (OPTIONAL) Used by VCF file reader for filtering on quality (default is to return only PASS quality).
Setting filterQuality to False will have the reader return all lines without regard to quality.
multiChrom = (OPTIONAL) Used by VCF file read for indicating multi-Chrom parsing or not.
'''
fileFormat = "unknown"
fileEmpty = True
# Define regexes for recognizing the file type based on its contents.
k28Re = re.compile("^#(.+?)\/")
nucmerRe = re.compile("^NUCMER$")
vcfRe = re.compile("^##fileformat=VCF")
empty = re.compile("^ *$")
with open(fileName,"r") as inputFile:
for line in inputFile:
if k28Re.match(line):
fileFormat = "k28"
break
elif nucmerRe.match(line):
fileFormat = "nucmer"
break
elif vcfRe.match(line):
fileFormat = "vcf"
break
elif not empty.match(line):
fileEmpty = False
if fileFormat == "k28":
return K28FileReader(fileName)
elif fileFormat == "nucmer":
return NucmerFileReader(fileName,multiChrom)
elif fileFormat == "vcf":
return VCFFileReader(fileName,filterQuality,multiChrom)
elif fileEmpty:
raise EmptyFileError
else:
raise NotImplementedError
#
# Custom Exceptions
#
class EmptyFileError(Exception):
'''
This is a custom exception thrown when an empty or size
zero file is encountered.
'''
pass
class SNPFileReadError(Exception):
'''
This is a custom exception thrown when an error has occurred
while reading an SNP data file.
It requires a message, with optional line number and the
line text itself where the error occurred.
'''
def __init__(self,message,lineNumber=None,line=None):
super(SNPFileReadError,self).__init__(message)
self.lineNumber = lineNumber
self.lineText = line
class SNPFileUnrecognizedLineError(Exception):
'''
This is a custom exception thrown when an unrecognized line
is read from an SNP data file.
It requires a message, with optional line number and the
line text itself where the error occurred.
'''
def __init__(self,message,lineNumber=None,line=None):
super(SNPFileUnrecognizedLineError,self).__init__(message)
self.lineNumber = lineNumber
self.lineText = line
#
# Classes
#
class SNPFileReader(object):
'''
This is an abstract base class that defines the interface
that SNPFileReader objects are expected to support.
Subclasses should be iterable (i.e. override the __iter__ method).
'''
# Public Attributes.
strainID = None # The strain name associatd with the SNP data
fileName = None # The data source (typically filename)
fileFormat = None # The file format
def __init__(self,fileName):
'''
Default constructor.
'''
self.fileName = fileName
def __iter__():
raise NotImplementedError
#
# Private Classes
#
class K28FileReader(SNPFileReader):
'''
This is a concrete implementation of a file reader
for k28.out (VAAL) data.
'''
def __init__(self,fileName):
super(K28FileReader,self).__init__(fileName)
self.fileFormat = "k28"
self.lineNumber = 0
self.k28lineRe = re.compile("^[0-9]+\s+(?P<locus>[0-9]+)\s+left=[ATCG]*\s+sample=(?P<sample_base>[ATCG]*)\s+ref=(?P<ref_base>[ATCG]*)\s+right=[ATCG]*$")
# Find the strainID
fh = open(self.fileName,"r")
self._fileLines = fh.readlines()
fh.close()
# Get strain ID from header comments: #<strain_id>/<reference_genome_filename>
strainRe = re.compile ("^#(?P<strainid>.+?)\/")
for line in self._fileLines:
self.lineNumber +=1
strainMatch = strainRe.match(line)
if strainMatch:
self.strainID = str(strainMatch.group('strainid'))
break
# Skip to the first line of data.
while self.lineNumber <= len(self._fileLines):
line = self._fileLines[self.lineNumber - 1]
# Ignore other comments in the file (lines starting with #). This includes the header comments.
# Also skip the > line (don't care about genbank_id_from_ref_genome_file).
if not re.match("^(#|>)",line):
# Exit now since this IS a data line. Want to end reading right here.
break
else:
self.lineNumber += 1
def __iter__(self):
'''
Concrete implementation of iterator protocol (as a generator) to get the next line of data.
'''
while self.lineNumber <= len(self._fileLines):
# At this point, should be at a data line.
# Assuming k28 input body data to be in the format:
# 0 <snp_locus> <left flank seq> <sample> <ref> <right flank seq>
# Only care about the locus, sample, and ref columns.
#
# Regex note:
#
# Some times sample= and ref= may have no value, so match for [ATCG] and check for length 1.
# If it is not length 1, then it is either blank or have more than one base, so skip as indel.
lineNumber = self.lineNumber
line = self._fileLines[lineNumber - 1].strip(os.linesep)
self.lineNumber += 1
# Skip comments (for K28/VAAL files, there are some comments at the end, not just the beginning).
if line.startswith("#"):
continue
# Else parse the line.
lineMatch = self.k28lineRe.match(line)
if lineMatch:
# VAAL k28.out file loci is offset by +1
realLocus = int(lineMatch.group('locus')) + 1
snpBase = lineMatch.group('sample_base')
refBase = lineMatch.group('ref_base')
isIndel = False
isDelete = False
isInsert = False
# Check for indels.
if len(snpBase) != 1:
if len(snpBase) == 0:
# Deletion found.
isIndel = True
isDelete = True
else:
# Insertion found.
isIndel = True
isInsert = True
elif len(refBase) != 1:
if len(refBase) == 0:
# Insertion found.
isIndel = True
isInsert = True
else:
# Deletion found.
isIndel = True
isDelete = True
yield (line,lineNumber,realLocus,snpBase,refBase,isIndel,isInsert,isDelete)
else:
raise SNPFileUnrecognizedLineError("Unrecognized line at {}: {}".format(lineNumber,line),lineNumber,line)
class NucmerFileReader(SNPFileReader):
'''
This is a concrete implementation of a file reader
for nucmer data.
'''
def __init__(self,fileName,multiChrom = False):
super(NucmerFileReader,self).__init__(fileName)
self.fileFormat = "nucmer"
self.lineNumber = 0
self.multiChrom = multiChrom
self.nucmerlineRe = re.compile("^(?P<locus>[0-9]+)\t(?P<ref_base>[ATCG]*)\t(?P<sample_base>[ATCG]*)\t[0-9]+\t.*\t(?P<chrom>.*?)\t.*$")
# Find the strainID
fh = open(fileName,"r")
self._fileLines = fh.readlines()
fh.close()
# Get strain ID from header line: /path/to/reference/file /path/to/query/file
strainRe = re.compile("\s.+\/(?P<strainid>[^\/]+)$")
# Assuming Strain ID is the query file name
for line in self._fileLines:
self.lineNumber += 1
strainMatch = strainRe.search(line)
if strainMatch:
self.strainID = str(strainMatch.group('strainid')).strip(os.linesep)
break
# Open the file and skip to the first line of data.
while self.lineNumber <= len(self._fileLines):
line = self._fileLines[self.lineNumber - 1].strip(os.linesep)
# Keep skipping lines until we reach the data portion. This should occur after the data header line:
# [P1] [SUB] [SUB] [P2] [BUFF] [DIST] [LEN R] [LEN Q] [FRM] [TAGS]
# So look for [P1]
if re.match("^\[P1\]",line):
# Found header. Advance line number to the first data line (first line after this header) and stop.
self.lineNumber += 1
break
self.lineNumber += 1
def __iter__(self):
'''
Concrete implementation of iterator protocol (as a generator) to get the next line of data.
'''
while self.lineNumber <= len(self._fileLines):
lineNumber = self.lineNumber
line = self._fileLines[lineNumber - 1].strip(os.linesep)
self.lineNumber += 1
# At this point, should be at a data line.
# Assuming NUCMER input body data to be in the format:
# [P1] [SUB] [SUB] [P2] [BUFF] [DIST] [LEN R] [LEN Q] [FRM] [TAGS]
# Only care about the locus, ref, and sample base columns. So, P1 and the first two SUBs, respectively.
#
# Assuming fields are tab-delimited.
#
# Regex note:
#
# Some times one or another SUB may have no value, so match for [ATCG] and check for length 1.
# If it is not length 1, then it is either blank or have more than one base, so skip as indel.
lineMatch = self.nucmerlineRe.search(line)
if lineMatch:
if self.multiChrom:
realLocus = str(lineMatch.group('chrom')) + '-' + str(lineMatch.group('locus'))
else:
realLocus = int(lineMatch.group('locus'))
snpBase = lineMatch.group('sample_base')
refBase = lineMatch.group('ref_base')
isIndel = False
isDelete = False
isInsert = False
# Check for indels.
if len(snpBase) != 1:
if len(snpBase) == 0:
# Deletion found.
isIndel = True
isDelete = True
else:
# Insertion found.
isIndel = True
isInsert = True
elif len(refBase) != 1:
if len(refBase) == 0:
# Insertion found.
isIndel = True
isInsert = True
else:
# Deletion found.
isIndel = True
isDelete = True
yield (line,lineNumber,realLocus,snpBase,refBase,isIndel,isInsert,isDelete)
else:
raise SNPFileUnrecognizedLineError("Unrecognized line at {}: {}".format(lineNumber,line),lineNumber,line)
class VCFFileReader(SNPFileReader):
'''
This is a concrete implementation of a file reader
for VCF data.
'''
def __init__(self,fileName,filterQuality=True, multiChrom=False):
'''
filterQuality parameter is a flag to indicate if it should filter data lines having only PASS quality filter values.
'''
super(VCFFileReader,self).__init__(fileName)
self.fileFormat = "vcf"
self.filterQuality = filterQuality
self.lineNumber = 0
self._fileLines = []
self.multiChrom = multiChrom
# NOTE: If processing multi-chrom FASTA files, Locus becomes CHROM + POS, instead of just POS.
self.vcflineRe = re.compile("^(?P<chrom>[^\t]+)\t(?P<locus>[0-9]+)\t[^\t]+\t(?P<ref_base>[ATCGNatcgn,]+)\t(?P<sample_base>[ATCGNatcgn,]+)\t[^\t]+\t(?P<filter>[^\t]+)\t")
# Set the strainID to the filename for now.
self.strainID = os.path.basename(fileName)
# Open the file and skip to the first line of data.
# Move past the header lines (not needed since it's been determined elsewhere what this file type is).
fh = open(fileName,"r")
self._fileLines = fh.readlines()
fh.close()
for line in self._fileLines:
self.lineNumber += 1
# Keep skipping lines until we reach the data portion. This should occur after the data header line:
#CHROM POS ID REF ALT QUAL FILTER INFO
# So look for #CHROM
if re.match("^#CHROM\s+POS\s+ID",line):
# Found header! Advance one more line to actual data line and stop looking.
self.lineNumber += 1
#print "lineNumber is " + str(self.lineNumber) + " and filesLines is " + str(len(self._fileLines))
break
def __iter__(self):
'''
Concrete implementation of iterator protocol (as a generator) to get the next line of data.
'''
while self.lineNumber <= len(self._fileLines):
lineNumber = self.lineNumber
line = self._fileLines[lineNumber - 1].strip(os.linesep)
self.lineNumber += 1
#print "lineNumber is " + str(self.lineNumber) + " and filesLines is " + str(len(self._fileLines))
# At this point, should be at a data line.
# Assuming VCF input body data to be in the format:
# CHROM POS ID REF ALT QUAL FILTER INFO
# Only care about the pos (loci), ref, alt (sample), and filter columns.
#
# Assuming fields are tab-delimited.
#
lineMatch = self.vcflineRe.match(line)
if lineMatch:
if self.multiChrom:
realLocus = str(lineMatch.group('chrom')) + '-' + str(lineMatch.group('locus'))
else:
realLocus = int(lineMatch.group('locus'))
snpBase = lineMatch.group('sample_base')
refBase = lineMatch.group('ref_base')
filter = lineMatch.group('filter')
isIndel = False
isDelete = False
isInsert = False
if self.filterQuality and filter != "PASS":
# Ignore low quality line.
continue
# Check for indels.
if len(snpBase) != len(refBase):
if len(snpBase) < len(refBase):
# Deletion found.
isIndel = True
isDelete = True
else:
# Insertion found.
isIndel = True
isInsert = True
yield (line,lineNumber,realLocus,snpBase,refBase,isIndel,isInsert,isDelete)
else:
raise SNPFileUnrecognizedLineError("Unrecognized line at {}: {}".format(lineNumber,line),lineNumber,line)