/
scheduler.py
175 lines (151 loc) · 5.8 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
###
# usage: python scheduler.py /path/to/fastq.gz/folder/
###
### imports
import os, sys, time, random, subprocess, shutil
from os import path as op
from coadaptree import *
from balance_queue import getsq
###
### args
thisfile, pooldir = sys.argv
###
### reqs
if pooldir.endswith("/"): #sometimes I run the scheduler from the command line, which appends / which screws up op.dirname()
pooldir = pooldir[:-1]
parentdir = op.dirname(pooldir)
scheddir = op.join(parentdir, 'shfiles/gvcf_shfiles')
print("scheddir=", scheddir)
scheduler = op.join(scheddir, 'scheduler.txt')
os.chdir(scheddir)
cluster = os.environ['CC_CLUSTER'] # which compute canada cluster is this job running on?
qthresh = 1000 if cluster == 'cedar' else 950
user = os.environ['USER']
###
### defs
print('running scheduler.py')
def checksq(rt):
exitneeded = False
if not type(rt) == list:
os.system('echo "type(sq) != list, exiting rescheduler.py"')
exitneeded = True
count = 0
for s in rt:
if 'socket' in s.lower():
os.system('echo "socket in sq return, exiting rescheduler.py"')
exitneeded = True
try:
assert int(s.split()[0]) == float(s.split()[0])
count += 1
except:
os.system('echo "could not assert int == float, %s %s"' % (s[0], s[0]))
exitneeded = True
if count == 0 and len(rt) > 0:
os.system('echo never asserted pid, exiting rescheduler.py')
exitneeded = True
if exitneeded is True:
delsched(globals()['scheduler'])
exit()
def sq(command):
# how many jobs are running
q = [x for x in os.popen(str(command)).read().split("\n") if not x == '']
checksq(q)
return len(q)
def delsched(scheduler):
# stop scheduler
try:
os.remove(scheduler)
except OSError as e:
pass
def getpids():
pids = os.popen('squeue -u lindb -h -o "%i"').read().split("\n")
pids = [p for p in pids if not p == '']
if len(pids) != len(set(pids)):
print('len != luni pids')
delsched(scheduler)
exit()
return pids
def startscheduler(scheduler):
with open(scheduler, 'w') as o:
# after creating the file, write job id in case i want to cancel process
jobid = os.environ['SLURM_JOB_ID']
o.write("scheduler id = %s" % jobid)
# double check that the scheduler is correct
with open(scheduler, 'r') as o:
text = o.read()
if not text.split()[-1] == '=':
if not text.split()[-1] == jobid:
os.system('echo another scheduler is in conflict. Allowing other scheduler to proceed. Exiting')
exit()
def sbatchjobs(files):
for f in files:
realp = op.realpath(f) # find the file to which the symlink file is linked
if op.exists(f): # as long as the symlink is still there
# print (f)
try:
os.unlink(f) # first try to remove the symlink from the scheddir
print('unlinked %s' % f)
except: # unless gvcf_helper has already done so (shouldnt be the case, but maybe with high qthresh)
print('unable to unlink symlink %s' % f)
continue
# then sbatch the real sh file if & only if the symlink was successfully unlinked
print('realp = ', realp)
print('shutil.which(sbatch) =', shutil.which('sbatch'))
try:
output = subprocess.check_output([shutil.which('sbatch'), realp]).decode('utf-8').replace("\n", "").split()[-1]
except subprocess.CalledProcessError as e:
print("couldn't sbatch. Here is the error:\n%s" % e)
os.symlink(realp, f)
print(f'relinked {op.basename(f)} to file: {realp}')
return
if not float(output) == int(output): # check to see if the return is a jobID
print('got an sbatch error: %s' % output)
return
time.sleep(5)
def main(scheddir):
# write a file and reserve scheduling to this call of the scheduler, or pass if another scheduler is running
startscheduler(scheduler) # reserve right away
x = len(getsq())
print ('queue length = ', x)
if x < qthresh: # if there is room in the queue
print('scheduler not running')
print('queue length less than thresh')
nsbatch = qthresh - x # how many should I submit?
print ('nsbatch =', nsbatch)
print (len(fs(scheddir)))
files = [f for f in fs(scheddir)
if 'scheduler.txt' not in f
and '.out' not in f
and 'workingdir' not in f][0:nsbatch]
if len(files) > 0:
print('submitting %s jobs' % str(len(files)))
print(files)
sbatchjobs(files)
else:
print('no files to sbatch')
else:
print('scheduler was not running, but no room in queue' )
balance_queue = op.join(os.environ['HOME'], 'gatk_pipeline/balance_queue.py')
subprocess.call([sys.executable, balance_queue, 'scatter', parentdir])
delsched(scheduler)
def bigbrother(scheduler, scheddir):
# if the scheduler controller has died, remove the scheduler
with open(scheduler, 'r') as o:
text = o.read()
pid = text.split()[-1]
if not pid == '=':
pids = getpids()
if not pid in pids:
print(f'controller ({pid}) was not running, so the scheduler was destroyed')
delsched(scheduler)
main(scheddir)
else:
print('controller is running, allowing it to proceed')
###
# main
time.sleep(random.random()) # just in case the very first instances of scheduler.py start at v similar times
if not op.exists(scheduler): # if scheduler isn't running
main(scheddir)
else:
print('scheduler was running')
bigbrother(scheduler, scheddir)