-
Notifications
You must be signed in to change notification settings - Fork 32
/
compute.py
219 lines (191 loc) · 9.1 KB
/
compute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import os
from ..taxbrain.helpers import package_up_vars, arrange_totals_by_row
import json
import requests
from requests.exceptions import Timeout, RequestException
import requests_mock
import taxcalc
from ..taxbrain.compute import DropqCompute, MockCompute
from .models import OGUSAWorkerNodesCounter
from .helpers import filter_ogusa_only
from ..constants import START_YEAR
dropq_version = taxcalc.__version__
NUM_BUDGET_YEARS = int(os.environ.get('NUM_BUDGET_YEARS', 10))
#Hard fail on lack of dropq workers
dropq_workers = os.environ.get('DROPQ_WORKERS', '')
DROPQ_WORKERS = dropq_workers.split(",")
ENFORCE_REMOTE_VERSION_CHECK = os.environ.get('ENFORCE_VERSION', 'False') == 'True'
TIMEOUT_IN_SECONDS = 1.0
MAX_ATTEMPTS_SUBMIT_JOB = 20
AGG_ROW_NAMES = taxcalc.tbi.AGG_ROW_NAMES
GDP_ELAST_ROW_NAMES = taxcalc.tbi.GDP_ELAST_ROW_NAMES
ogusa_workers = os.environ.get('OGUSA_WORKERS', '')
OGUSA_WORKERS = ogusa_workers.split(",")
CALLBACK_HOSTNAME = os.environ.get('CALLBACK_HOSTNAME', 'localhost:8000')
ENFORCE_REMOTE_VERSION_CHECK = os.environ.get('ENFORCE_VERSION', 'False') == 'True'
class DynamicCompute(DropqCompute):
def remote_register_job(self, theurl, data, timeout=TIMEOUT_IN_SECONDS):
response = requests.post(theurl, data=data, timeout=timeout)
return response
def submit_json_ogusa_calculation(self, ogusa_mods, first_budget_year,
microsim_data, pack_up_user_mods):
return self.submit_ogusa_calculation(ogusa_mods, first_budget_year,
microsim_data, pack_up_user_mods=False)
def submit_ogusa_calculation(self, ogusa_mods, first_budget_year, microsim_data,
pack_up_user_mods=True):
print("mods is ", ogusa_mods)
ogusa_params = filter_ogusa_only(ogusa_mods)
data = {}
if pack_up_user_mods:
microsim_params = package_up_vars(microsim_data, first_budget_year)
microsim_params = {first_budget_year:microsim_params}
print("microsim data is", microsim_params)
else:
data['taxio_format'] = True
data['first_budget_year'] = first_budget_year
microsim_params = microsim_data
print("submit dynamic work")
hostnames = OGUSA_WORKERS
DEFAULT_PARAMS = {
'callback': "http://{}/dynamic/dynamic_finished".format(CALLBACK_HOSTNAME),
}
data['ogusa_params'] = json.dumps(ogusa_params)
data['user_mods'] = json.dumps(microsim_params)
data['first_year'] = first_budget_year
job_ids = []
guids = []
onc, created = OGUSAWorkerNodesCounter.objects.get_or_create(singleton_enforce=1)
# Get the current value mod'd to the current worker set
ogusa_worker_idx = onc.current_idx % len(OGUSA_WORKERS)
# Increment for next time
onc.current_idx = (ogusa_worker_idx + 1) % len(OGUSA_WORKERS)
onc.save()
hostname_idx = ogusa_worker_idx
print("hostname_idx is", hostname_idx)
submitted = False
registered = False
attempts = 0
while not submitted:
theurl = "http://{hn}/ogusa_start_job".format(hn=hostnames[hostname_idx])
try:
response = self.remote_submit_job(theurl, data=data, timeout=TIMEOUT_IN_SECONDS)
if response.status_code == 200:
print("submitted: ", hostnames[hostname_idx])
submitted = True
resp_data = json.loads(response.text)
job_ids.append((resp_data['job_id'], hostnames[hostname_idx]))
guids.append((resp_data['job_id'], resp_data.get('guid', 'None')))
else:
print("FAILED: ", hostnames[hostname_idx])
attempts += 1
except Timeout:
print("Couldn't submit to: ", hostnames[hostname_idx])
# Increment to next ogusa node
onc.current_offset = (ogusa_worker_idx + 1) % len(OGUSA_WORKERS)
onc.save()
attempts += 1
except RequestException as re:
print("Something unexpected happened: ", re)
# Increment to next ogusa node
onc.current_offset = (ogusa_worker_idx + 1) % len(OGUSA_WORKERS)
onc.save()
attempts += 1
if attempts > MAX_ATTEMPTS_SUBMIT_JOB:
print("Exceeded max attempts. Bailing out.")
# Increment to next ogusa node
onc.current_offset = (ogusa_worker_idx + 1) % len(OGUSA_WORKERS)
onc.save()
raise IOError()
params = DEFAULT_PARAMS.copy()
params['job_id'] = job_ids[0]
reg_url = "http://" + hostnames[hostname_idx] + "/register_job"
while not registered:
reg_url = "http://{hn}/register_job".format(hn=hostnames[hostname_idx])
try:
params = DEFAULT_PARAMS.copy()
params['job_id'] = job_ids[0][0]
reg_url = "http://" + hostnames[hostname_idx] + "/register_job"
register = self.remote_register_job(reg_url, data=params, timeout=TIMEOUT_IN_SECONDS)
if response.status_code == 200:
print("registered: ", hostnames[hostname_idx])
registered = True
else:
print("FAILED: ", hostnames[hostname_idx])
attempts += 1
except Timeout:
print("Couldn't submit to: ", hostnames[hostname_idx])
attempts += 1
except RequestException as re:
print("Something unexpected happened: ", re)
attempts += 1
if attempts > MAX_ATTEMPTS_SUBMIT_JOB:
print("Exceeded max attempts. Bailing out.")
raise IOError()
return job_ids, guids
def ogusa_get_results(self, job_ids, status):
'''
job_ids = celery ID and hostname of job
status = either "SUCCESS" or "FAILURE"
'''
id_hostname = job_ids[0]
id_, hostname = id_hostname
result_url = "http://{hn}/dropq_get_result".format(hn=hostname)
job_response = self.remote_retrieve_results(result_url, params={'job_id':id_})
if job_response.status_code == 200: # Valid response
if status == "SUCCESS":
response = job_response.json()
df_ogusa = {}
df_ogusa.update(response['df_ogusa'])
results = {'df_ogusa': df_ogusa}
elif status == "FAILURE":
results = {'job_fail': job_response.text}
else:
raise ValueError("only know 'SUCCESS' or 'FAILURE' status")
else:
msg = "Don't know how to handle response: {0}"
msg = msg.format(job_response.status_code)
raise IOError(msg)
if ENFORCE_REMOTE_VERSION_CHECK:
versions = [r.get('ogusa_version', None) for r in ans]
if not all([ver==ogusa_version for ver in versions]):
msg ="Got different taxcalc versions from workers. Bailing out"
print(msg)
raise IOError(msg)
versions = [r.get('dropq_version', None) for r in ans]
if not all([same_version(ver, dropq_version) for ver in versions]):
msg ="Got different dropq versions from workers. Bailing out"
print(msg)
raise IOError(msg)
return results
class MockDynamicCompute(DynamicCompute):
__slots__ = ('count', 'num_times_to_wait', 'increment')
def __init__(self, **kwargs):
if 'increment' in kwargs:
self.increment = kwargs['increment']
del kwargs['increment']
else:
self.increment = 0
super(MockDynamicCompute, self).__init__(**kwargs)
def remote_submit_job(self, theurl, data, timeout):
with requests_mock.Mocker() as mock:
job_id = 'ogusa' + str(424242 + self.increment)
resp = {'job_id': job_id, 'guid': 'guia123456789'}
resp = json.dumps(resp)
mock.register_uri('POST', '/ogusa_start_job', text=resp)
self.last_posted = data
return DynamicCompute.remote_submit_job(self, theurl, data, timeout)
def remote_register_job(self, theurl, data, timeout):
with requests_mock.Mocker() as mock:
resp = {'registered': 'guia123456789'}
resp = json.dumps(resp)
mock.register_uri('POST', '/register_job', text=resp)
return DynamicCompute.remote_register_job(self, theurl, data, timeout)
def remote_retrieve_results(self, theurl, params):
mock_path = os.path.join(os.path.split(__file__)[0], "tests",
"ogusa_results_0}.json")
with open(mock_path.format(self.count), 'r') as f:
text = f.read()
self.count += 1
with requests_mock.Mocker() as mock:
mock.register_uri('GET', '/dropq_get_result', text=text)
return DynamicCompute.remote_retrieve_results(self, theurl, params)