/
start_varscan.py
349 lines (285 loc) · 12.1 KB
/
start_varscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
"""Create and sbatch varscan command files if all realigned bamfiles have been created.
If a single sample per pool_name:
- set min freq to 0
If multiple samps per pool_name:
- set min freq to 1/(total_ploidy_across_samples_in_a_pool)
# usage
# python start_varscan.py parentdir pool
#
# fix
# check_seff does not handle slurm timeouts
"""
import sys, os, time, random, subprocess, shutil
from os import path as op
from datetime import datetime as dt
from coadaptree import makedir, fs, pklload, get_email_info
from balance_queue import getsq
def gettimestamp(f):
"""Get last time modified."""
return time.ctime(op.getmtime(f))
def getmostrecent(files):
"""Determine the most recent file from a list of files."""
if not isinstance(files, list):
files = [files]
if len(files) > 1:
whichout = files[0]
dt1 = dt.strptime(gettimestamp(whichout)[4:], "%b %d %H:%M:%S %Y")
for o in files[1:]:
dt2 = dt.strptime(gettimestamp(o)[4:], "%b %d %H:%M:%S %Y")
if dt2 > dt1:
whichout = o
dt1 = dt2
return whichout
elif len(files) == 1:
return files[0]
else:
# if len(files) == 0
return None
def getfiles(samps, shdir, grep):
"""Determine if all realign bam jobs have been created and sbatched.
Positional arguments:
samps - list of sample names (length is number of expected shfiles)
shdir - directory where .sh and .out files are
grep - program name - keyword used to find correct files
Returns:
files - dictionary where key = sh file, val = most recent outfile
"""
found = [sh for sh in fs(shdir) if sh.endswith(".sh") and grep in sh]
outs = [out for out in fs(shdir) if out.endswith('.out') and grep in out]
if len(found) != len(samps):
print('not all shfiles have been created, exiting %s' % sys.argv[0])
exit()
files = dict((f, getmostrecent([out for out in outs if op.basename(f).replace(".sh", "") in out]))
for f in found)
if None in files.values():
print('not all shfiles have been sbatched, exiting %s' % sys.argv[0])
exit()
return files
def check_seff(outs):
"""Execute slurm seff command on each outfile's slurm_job_id to ensure it ran without error.
Exit otherwise.
"""
print('checking seff')
jobid = os.environ['SLURM_JOB_ID']
for i,f in enumerate(outs):
pid = f.split("_")[-1].replace(".out", "")
if not pid == jobid:
seff, seffcount = '', 0
while isinstance(seff, list) is False:
# sometimes slurm sucks
seff = subprocess.check_output([shutil.which('seff'), pid]).decode('utf-8').split('\n')
if seffcount == 10:
print('slurm is screwing something up with seff, exiting %s' % sys.argv[0])
exit()
time.sleep(1)
seffcount += 1
state = [x.lower() for x in seff if 'State' in x][0]
if 'exit code 0' not in state:
status = 'died' if 'running' not in state else 'is running'
print('cannot proceed with %s' % sys.argv[0])
print('job %s (%s) for %s' % (status, state, f))
print('exiting %s' % sys.argv[0])
exit()
if (i+1) % 10 == 0:
print('\t%s/%s' % (i+1, len(outs)))
def checkpids(outs, queue):
"""If any of the other varscan jobs are pending or running, exit."""
print('checking pids')
pids = [q[0] for q in queue]
jobid = os.environ['SLURM_JOB_ID']
for out in outs:
pid = out.split("_")[-1].replace(".out", "")
if pid in pids and pid != jobid: # if the job is running, but it's not this job
print('the following file is still in the queue - exiting %s' % sys.argv[0],
'\n', '\t%(out)s' % locals())
exit()
def check_queue(outs, pooldir):
"""Get jobs from the queue, except those that are closing (assumes jobs haven't failed)."""
print('checking queue')
sq = getsq(grepping=['bedfile', op.basename(pooldir)])
if len(sq) > 0:
checkpids(outs, sq)
# no need for an else statement here, if len(sq) == 0: no need to check the pids
def get_bamfiles(samps, pooldir):
"""Using a list of sample names, find the realigned bamfiels.
Return:
files - dictionary with key = samp_name, val = /path/to/bamfile
"""
print('getting bamfiles')
found = fs(op.join(pooldir, '04_realign'))
files = dict((samp, f.replace(".bai", ".bam")) for samp in samps for f in found if samp in f and f.endswith('.bai'))
if not len(files) == len(samps):
print('len(files) != len(samps)')
print('files = ', files)
print('samps = ', samps)
exit()
return files
def checkfiles(pooldir):
"""Call get_bamfiles."""
# get the list of file names
print('checking files')
pool = op.basename(pooldir)
samps = pklload(op.join(op.dirname(pooldir), 'poolsamps.pkl'))[pool]
shdir = op.join(pooldir, 'shfiles/05_indelRealign_shfiles')
files = getfiles(samps, shdir, 'indelRealign')
check_queue(files.values(), pooldir) # make sure job isn't in the queue (running or pending)
check_seff(files.values()) # make sure the jobs didn't die
return get_bamfiles(samps, pooldir)
def create_reservation(pooldir, exitneeded=False):
"""Create a file so that other realign jobs can't start varscan too."""
print('creating reservation')
shdir = makedir(op.join(pooldir, 'shfiles/varscan'))
file = op.join(shdir, '%s_varscan_reservation.sh' % pool)
jobid = os.environ['SLURM_JOB_ID']
if not op.exists(file):
with open(file, 'w') as o:
o.write("%s" % jobid)
else:
exitneeded = True
time.sleep(random.random()*15)
with open(file, 'r') as o:
fjobid = o.read().split()[0]
if not fjobid == jobid or exitneeded is True:
# just in case two jobs try at nearly the same time
print('\tanother job has already created varscan_reservation.sh for %s' % pool)
exit()
return shdir
def get_prereqs(bedfile, parentdir, pool, program):
"""Get object names."""
num = bedfile.split("_")[-1].split(".bed")[0]
ref = pklload(op.join(parentdir, 'poolref.pkl'))[pool]
pooldir = op.join(parentdir, pool)
outdir = makedir(op.join(pooldir, program))
vcf = op.join(outdir, f'{pool}_{program}_bedfile_{num}.vcf')
return (num, ref, vcf)
def get_small_bam_cmds(bamfiles, bednum, bedfile):
"""Get samtools commands to reduce a bamfile to intervals in the bedfile."""
smallbams = []
cmds = '''module load java\nmodule load samtools/1.9\n'''
for bam in bamfiles:
pool = op.basename(bam).split("_realigned")[0]
smallbam = f'$SLURM_TMPDIR/{pool}_realigned_{bednum}.bam'
cmd = f'''samtools view -b -L {bedfile} {bam} > {smallbam}\n'''
cmds = cmds + cmd
smallbams.append(smallbam)
return (smallbams, cmds)
def get_varscan_cmd(bamfiles, bedfile, bednum, vcf, ref, pooldir, program):
"""Create command to call varscan."""
smallbams, smallcmds = get_small_bam_cmds(bamfiles, bednum, bedfile)
smallbams = ' '.join(smallbams)
ploidy = pklload(op.join(parentdir, 'ploidy.pkl'))[pool]
# if single-sample then set minfreq to 0, else use min possible allele freq
minfreq = 1/sum(ploidy.values()) if len(ploidy.keys()) > 1 else 0
cmd = f'''samtools mpileup -B -f {ref} {smallbams} | java -Xmx15g -jar \
$VARSCAN_DIR/VarScan.v2.4.3.jar mpileup2cns --min-coverage 8 --p-value 0.05 \
--min-var-freq {minfreq} --strand-filter 1 --min-freq-for-hom 0.80 \
--min-avg-qual 20 --output-vcf 1 > {vcf}
module unload samtools
'''
# final vcf
outdir = makedir(op.join(pooldir, program))
finalvcf = op.join(outdir, op.basename(vcf))
cmds = smallcmds + cmd
return (cmds, finalvcf)
def make_sh(bamfiles, bedfile, shdir, pool, pooldir, program, parentdir):
"""Create sh file for varscan command."""
num, ref, vcf = get_prereqs(bedfile, parentdir, pool, program)
cmd, finalvcf = get_varscan_cmd(bamfiles, bedfile, num,
vcf, ref, pooldir, program)
fields = '''-F ADP -F WT -F HET -F HOM -F NC -GF GT -GF GQ -GF SDP -GF DP \
-GF FREQ -GF PVAL -GF AD -GF RD'''
tablefile = finalvcf.replace(".vcf", "_table.txt")
bash_variables = op.join(parentdir, 'bash_variables')
text = f'''#!/bin/bash
#SBATCH --ntasks=1
#SBATCH --job-name={pool}-{program}_bedfile_{num}
#SBATCH --time='7-00:00:00'
#SBATCH --mem=2000M
#SBATCH --output={pool}-{program}_bedfile_{num}_%j.out
# run VarScan (v.2.4.2)
{cmd}
# vcf -> table (multiallelic to multiple lines, filtered in combine_varscan.py
module load gatk/4.1.0.0
gatk VariantsToTable --variant {finalvcf} -F CHROM -F POS -F REF -F ALT -F AF -F QUAL \
-F TYPE -F FILTER {fields} -O {tablefile} --split-multi-allelic
module unload gatk
# gzip outfiles to save space
module load nixpkgs/16.09 gcc/7.3.0 htslib/1.9
cd $(dirname {finalvcf})
bgzip -f {finalvcf}
# if any other varscan jobs are hanging due to priority, change the account
source {bash_variables}
python $HOME/pipeline/balance_queue.py {program} {parentdir}
'''
file = op.join(shdir, f'{pool}-{program}_bedfile_{num}.sh')
with open(file, 'w') as o:
o.write("%s" % text)
return file
def sbatch(file):
"""Sbatch file."""
os.chdir(op.dirname(file))
pid = subprocess.check_output([shutil.which('sbatch'), file]).decode('utf-8').replace("\n", "").split()[-1]
print("sbatched %s" % file)
time.sleep(10)
return pid
def get_bedfiles(parentdir, pool):
"""Get a list of paths to all of the bed files for ref.fa."""
ref = pklload(op.join(parentdir, 'poolref.pkl'))[pool]
beddir = op.join(op.dirname(ref), 'bedfiles_%s' % op.basename(ref).split(".fa")[0])
return [f for f in fs(beddir) if f.endswith('.bed')]
def create_sh(bamfiles, shdir, pool, pooldir, program, parentdir):
"""Create and sbatch shfiles, record pid to use as dependency for combine job."""
bedfiles = get_bedfiles(parentdir, pool)
pids = []
for bedfile in bedfiles:
file = make_sh(bamfiles, bedfile, shdir, pool, pooldir, program, parentdir)
pids.append(sbatch(file))
return pids
def create_combine(pids, parentdir, pool, program, shdir):
"""Create command file to combine varscan jobs once they're finished.
Positional arguments:
pids = list of slurm job id dependencies (the jobs that need to finish first)
...
"""
pooldir = op.join(parentdir, pool)
email_text = get_email_info(parentdir, 'final')
dependencies = '#SBATCH --dependency=afterok:' + ','.join(pids)
bash_variables = op.join(parentdir, 'bash_variables')
text = f'''#!/bin/bash
#SBATCH --job-name={pool}-combine-{program}
#SBATCH --time=12:00:00
#SBATCH --mem=20000M
#SBATCH --cpus-per-task=1
#SBATCH --output={pool}-combine-{program}_%j.out
{dependencies}
{email_text}
source {bash_variables}
python $HOME/pipeline/combine_varscan.py {pooldir} {program} {pool}
'''
combfile = op.join(shdir, f'{pool}-combine-{program}.sh')
with open(combfile, 'w') as o:
o.write("%s" % text)
sbatch(combfile)
print(f'sbatched {program} combinefile with dependencies: ' + ','.join(pids))
def main(parentdir, pool):
"""Start <program> if it's appropriate to do so."""
# check to see if all bam files have been created; if not: exit()
bamfiles = checkfiles(op.join(parentdir, pool))
# create reservation so other files don't try and write files.sh, exit() if needed
shdir = create_reservation(op.join(parentdir, pool))
# create .sh files
for program in ['varscan']:
print('starting %s commands' % program)
# create .sh file and submit to scheduler
pids = create_sh(bamfiles.values(),
shdir,
pool,
op.join(parentdir, pool),
program,
parentdir)
# create .sh file to combine varscan parallels using jobIDs as dependencies
create_combine(pids, parentdir, pool, program, shdir)
if __name__ == "__main__":
# args
thisfile, parentdir, pool = sys.argv
main(parentdir, pool)