/
model.py
202 lines (173 loc) · 6.55 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from abc import ABC, abstractmethod
import numpy as np
import mxnet as mx
import warnings
from mxnet import autograd, nd
from utils.model_utils import batch_data
from baseline_constants import INPUT_SIZE
class Model(ABC):
def __init__(self, seed, lr, ctx, optimizer=None):
mx.random.seed(123 + seed)
np.random.seed(seed)
self.lr = lr
self.seed = seed
self._optimizer = optimizer
self.ctx = ctx
self.net, self.loss, self.trainer = self.create_model()
self.flops_per_sample = self.calc_flops()
@property
def optimizer(self):
"""Optimizer to be used by the model."""
if self._optimizer is None:
self._optimizer = "sgd"
return self._optimizer
@abstractmethod
def create_model(self):
"""Creates the model for the task.
Returns:
A 3-tuple consisting of:
net: A neural network workflow.
loss: An operation that, when run with the features and the
labels, computes the loss value.
train_op: An operation that, when grads are computed, trains
the model.
"""
return None, None, None
def train(self, data, my_round, num_epochs, batch_size, lr_factor=1.0):
"""
Train the model for num_epochs epochs.
Args:
data: Dict of the form {'x': NDArray, 'y': NDArray}.
my_round: The current training round, used for learning rate
decay.
num_epochs: Number of epochs to train.
batch_size: Size of training batches.
lr_factor: Decay factor for learning rate.
Return:
comp: Number of FLOPs computed while training given data
update: Trained model params.
"""
# Decay learning rate
self.trainer.set_learning_rate(
self.lr * (lr_factor ** my_round))
# Train on data for epochs
for i in range(num_epochs):
seed = my_round * 11 + i
self.run_epochs(seed, data, batch_size)
# Wait to avoid running out of GPU memory
nd.waitall()
update = self.get_params()
comp = num_epochs * len(data["y"]) * self.flops_per_sample
return comp, update
def run_epochs(self, seed, data, batch_size):
for batched_x, batched_y in batch_data(data, batch_size, seed):
input_data = self.preprocess_x(batched_x)
target_data = self.preprocess_y(batched_y)
num_batch = len(batched_y)
# Set MXNET_ENFORCE_DETERMINISM=1 to avoid difference in
# calculation precision.
with autograd.record():
y_hats = self.net(input_data)
ls = self.loss(y_hats, target_data)
ls.backward()
self.trainer.step(num_batch)
def __num_elems(self, shape):
"""Returns the number of elements in the given shape
Args:
shape: Parameter shape
Return:
tot_elems: int
"""
tot_elems = 1
for s in shape:
tot_elems *= int(s)
return tot_elems
@property
def size(self):
"""Returns the size of the network in bytes.
The size of the network is calculated by summing up the sizes of each
trainable variable. The sizes of variables are calculated by multiplying
the number of bytes in their dtype with their number of elements, captured
in their shape attribute.
Returns:
tot_size: Integer representing size of neural network (in bytes).
"""
if not hasattr(self, "_size"):
params = self.net.collect_params().values()
tot_size = 0
for p in params:
tot_elems = self.__num_elems(p.shape)
dtype_size = np.dtype(p.dtype).itemsize
var_size = tot_elems * dtype_size
tot_size += var_size
self._size = tot_size
return self._size
@property
def flops(self):
"""Returns the number of flops needed to propagate a sample through the
network.
The package MXOP is required:
https://github.com/hey-yahei/OpSummary.MXNet
If MXOP is not installed, 0 will be directly returned. Note that
pip install --index-url https://pypi.org/simple/ mxop
may change the version of the dependent package.
Since MXOP runs on CPU, the context is set to cpu and then reset back
to the specified device.
Returns:
flops: Integer representing the number of flops.
"""
try:
from mxop.gluon import count_ops
self.set_context(mx.cpu())
op_counter = count_ops(self.net, (1, *INPUT_SIZE))
self.set_context(self.ctx)
return sum(op_counter.values())
except ModuleNotFoundError:
warnings.warn("MXOP is not installed, num_flops=0 is returned.")
return 0
def set_params(self, model_params):
"""Set current model data to given model data.
Args:
model_params: Given model data.
"""
source_params = list(model_params)
target_params = list(self.get_params())
num_params = len(target_params)
for p in range(num_params):
if source_params:
data = source_params[p].data()
else:
data = nd.zeros(target_params[p].shape, ctx=self.ctx)
target_params[p].set_data(data)
def get_params(self):
"""Return current model data.
Returns:
params: Current model data.
"""
return self.net.collect_params().values()
def set_context(self, ctx):
"""Move current model to the specified context.
Args:
ctx: The specified CPU or GPU context.
"""
self.net.collect_params().reset_ctx(ctx)
@abstractmethod
def test(self, data):
"""Tests the current model on the given data.
Args:
data: Dict of the form {"x": NDArray, "y": NDArray}
Returns:
stat_metrics: dict of metrics that will be recorded
by the simulation.
"""
return None
@abstractmethod
def preprocess_x(self, raw_x_batch):
"""Pre-processes each batch of train data before being
fed to the model."""
return None
@abstractmethod
def preprocess_y(self, raw_y_batch):
"""Pre-processes each batch of labels before being fed
to the model."""
return None