/
export.py
395 lines (329 loc) · 16.9 KB
/
export.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Configuration and utilities for receiving inputs at serving time."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import os
import time
import six
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.framework import tensor_shape
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import parsing_ops
from tensorflow.python.platform import gfile
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.saved_model import signature_constants
from tensorflow.python.saved_model import signature_def_utils
from tensorflow.python.util import compat
from tensorflow.python.util.tf_export import tf_export
_SINGLE_FEATURE_DEFAULT_NAME = 'feature'
_SINGLE_RECEIVER_DEFAULT_NAME = 'input'
@tf_export('estimator.export.ServingInputReceiver')
class ServingInputReceiver(collections.namedtuple(
'ServingInputReceiver',
['features', 'receiver_tensors', 'receiver_tensors_alternatives'])):
"""A return type for a serving_input_receiver_fn.
The expected return values are:
features: A `Tensor`, `SparseTensor`, or dict of string to `Tensor` or
`SparseTensor`, specifying the features to be passed to the model.
receiver_tensors: a `Tensor`, or a dict of string to `Tensor`, specifying
input nodes where this receiver expects to be fed by default. Typically,
this is a single placeholder expecting serialized `tf.Example` protos.
receiver_tensors_alternatives: a dict of string to additional
groups of receiver tensors, each of which may be a `Tensor` or a dict of
string to `Tensor`. These named receiver tensor alternatives generate
additional serving signatures, which may be used to feed inputs at
different points within the input receiver subgraph. A typical usage is
to allow feeding raw feature `Tensor`s *downstream* of the
tf.parse_example() op. Defaults to None.
"""
def __new__(cls, features, receiver_tensors,
receiver_tensors_alternatives=None):
if features is None:
raise ValueError('features must be defined.')
if not isinstance(features, dict):
features = {_SINGLE_FEATURE_DEFAULT_NAME: features}
for name, tensor in features.items():
if not isinstance(name, six.string_types):
raise ValueError('feature keys must be strings: {}.'.format(name))
if not (isinstance(tensor, ops.Tensor)
or isinstance(tensor, sparse_tensor.SparseTensor)):
raise ValueError(
'feature {} must be a Tensor or SparseTensor.'.format(name))
if receiver_tensors is None:
raise ValueError('receiver_tensors must be defined.')
if not isinstance(receiver_tensors, dict):
receiver_tensors = {_SINGLE_RECEIVER_DEFAULT_NAME: receiver_tensors}
for name, tensor in receiver_tensors.items():
if not isinstance(name, six.string_types):
raise ValueError(
'receiver_tensors keys must be strings: {}.'.format(name))
if not isinstance(tensor, ops.Tensor):
raise ValueError(
'receiver_tensor {} must be a Tensor.'.format(name))
if receiver_tensors_alternatives is not None:
if not isinstance(receiver_tensors_alternatives, dict):
raise ValueError(
'receiver_tensors_alternatives must be a dict: {}.'.format(
receiver_tensors_alternatives))
for alternative_name, receiver_tensors_alt in (
six.iteritems(receiver_tensors_alternatives)):
if not isinstance(receiver_tensors_alt, dict):
receiver_tensors_alt = {_SINGLE_RECEIVER_DEFAULT_NAME:
receiver_tensors_alt}
# Updating dict during iteration is OK in this case.
receiver_tensors_alternatives[alternative_name] = (
receiver_tensors_alt)
for name, tensor in receiver_tensors_alt.items():
if not isinstance(name, six.string_types):
raise ValueError(
'receiver_tensors keys must be strings: {}.'.format(name))
if not (isinstance(tensor, ops.Tensor)
or isinstance(tensor, sparse_tensor.SparseTensor)):
raise ValueError(
'receiver_tensor {} must be a Tensor or SparseTensor.'.format(
name))
return super(ServingInputReceiver, cls).__new__(
cls,
features=features,
receiver_tensors=receiver_tensors,
receiver_tensors_alternatives=receiver_tensors_alternatives)
@tf_export('estimator.export.TensorServingInputReceiver')
class TensorServingInputReceiver(collections.namedtuple(
'TensorServingInputReceiver',
['features', 'receiver_tensors', 'receiver_tensors_alternatives'])):
"""A return type for a serving_input_receiver_fn.
This is for use with models that expect a single `Tensor` or `SparseTensor`
as an input feature, as opposed to a dict of features.
The normal `ServingInputReceiver` always returns a feature dict, even if it
contains only one entry, and so can be used only with models that accept such
a dict. For models that accept only a single raw feature, the
`serving_input_receiver_fn` provided to `Estimator.export_savedmodel()` should
return this `TensorServingInputReceiver` instead. See:
https://github.com/tensorflow/tensorflow/issues/11674
Note that the receiver_tensors and receiver_tensor_alternatives arguments
will be automatically converted to the dict representation in either case,
because the SavedModel format requires each input `Tensor` to have a name
(provided by the dict key).
The expected return values are:
features: A single `Tensor` or `SparseTensor`, representing the feature
to be passed to the model.
receiver_tensors: a `Tensor`, or a dict of string to `Tensor`, specifying
input nodes where this receiver expects to be fed by default. Typically,
this is a single placeholder expecting serialized `tf.Example` protos.
receiver_tensors_alternatives: a dict of string to additional
groups of receiver tensors, each of which may be a `Tensor` or a dict of
string to `Tensor`. These named receiver tensor alternatives generate
additional serving signatures, which may be used to feed inputs at
different points within the input receiver subgraph. A typical usage is
to allow feeding raw feature `Tensor`s *downstream* of the
tf.parse_example() op. Defaults to None.
"""
def __new__(cls, features, receiver_tensors,
receiver_tensors_alternatives=None):
if features is None:
raise ValueError('features must be defined.')
if not (isinstance(features, ops.Tensor)
or isinstance(features, sparse_tensor.SparseTensor)):
raise ValueError('feature must be a Tensor or SparseTensor.')
receiver = ServingInputReceiver(
features=features,
receiver_tensors=receiver_tensors,
receiver_tensors_alternatives=receiver_tensors_alternatives)
return super(TensorServingInputReceiver, cls).__new__(
cls,
features=receiver.features[_SINGLE_FEATURE_DEFAULT_NAME],
receiver_tensors=receiver.receiver_tensors,
receiver_tensors_alternatives=receiver.receiver_tensors_alternatives)
@tf_export('estimator.export.build_parsing_serving_input_receiver_fn')
def build_parsing_serving_input_receiver_fn(feature_spec,
default_batch_size=None):
"""Build a serving_input_receiver_fn expecting fed tf.Examples.
Creates a serving_input_receiver_fn that expects a serialized tf.Example fed
into a string placeholder. The function parses the tf.Example according to
the provided feature_spec, and returns all parsed Tensors as features.
Args:
feature_spec: a dict of string to `VarLenFeature`/`FixedLenFeature`.
default_batch_size: the number of query examples expected per batch.
Leave unset for variable batch size (recommended).
Returns:
A serving_input_receiver_fn suitable for use in serving.
"""
def serving_input_receiver_fn():
"""An input_fn that expects a serialized tf.Example."""
serialized_tf_example = array_ops.placeholder(dtype=dtypes.string,
shape=[default_batch_size],
name='input_example_tensor')
receiver_tensors = {'examples': serialized_tf_example}
features = parsing_ops.parse_example(serialized_tf_example, feature_spec)
return ServingInputReceiver(features, receiver_tensors)
return serving_input_receiver_fn
@tf_export('estimator.export.build_raw_serving_input_receiver_fn')
def build_raw_serving_input_receiver_fn(features, default_batch_size=None):
"""Build a serving_input_receiver_fn expecting feature Tensors.
Creates an serving_input_receiver_fn that expects all features to be fed
directly.
Args:
features: a dict of string to `Tensor`.
default_batch_size: the number of query examples expected per batch.
Leave unset for variable batch size (recommended).
Returns:
A serving_input_receiver_fn.
"""
def serving_input_receiver_fn():
"""A serving_input_receiver_fn that expects features to be fed directly."""
receiver_tensors = {}
for name, t in features.items():
shape_list = t.get_shape().as_list()
shape_list[0] = default_batch_size
shape = tensor_shape.TensorShape(shape_list)
# Reuse the feature tensor's op name (t.op.name) for the placeholder,
# excluding the index from the tensor's name (t.name):
# t.name = "%s:%d" % (t.op.name, t._value_index)
receiver_tensors[name] = array_ops.placeholder(
dtype=t.dtype, shape=shape, name=t.op.name)
# TODO(b/34885899): remove the unnecessary copy
# The features provided are simply the placeholders, but we defensively copy
# the dict because it may be mutated.
return ServingInputReceiver(receiver_tensors, receiver_tensors.copy())
return serving_input_receiver_fn
### Below utilities are specific to SavedModel exports.
def build_all_signature_defs(receiver_tensors,
export_outputs,
receiver_tensors_alternatives=None):
"""Build `SignatureDef`s for all export outputs."""
if not isinstance(receiver_tensors, dict):
receiver_tensors = {_SINGLE_RECEIVER_DEFAULT_NAME: receiver_tensors}
if export_outputs is None or not isinstance(export_outputs, dict):
raise ValueError('export_outputs must be a dict and not'
'{}'.format(type(export_outputs)))
signature_def_map = {}
excluded_signatures = {}
for output_key, export_output in export_outputs.items():
signature_name = '{}'.format(output_key or 'None')
try:
signature = export_output.as_signature_def(receiver_tensors)
signature_def_map[signature_name] = signature
except ValueError as e:
excluded_signatures[signature_name] = str(e)
if receiver_tensors_alternatives:
for receiver_name, receiver_tensors_alt in (
six.iteritems(receiver_tensors_alternatives)):
if not isinstance(receiver_tensors_alt, dict):
receiver_tensors_alt = {_SINGLE_RECEIVER_DEFAULT_NAME:
receiver_tensors_alt}
for output_key, export_output in export_outputs.items():
signature_name = '{}:{}'.format(receiver_name or 'None',
output_key or 'None')
try:
signature = export_output.as_signature_def(receiver_tensors_alt)
signature_def_map[signature_name] = signature
except ValueError as e:
excluded_signatures[signature_name] = str(e)
_log_signature_report(signature_def_map, excluded_signatures)
# The above calls to export_output.as_signature_def should return only
# valid signatures; if there is a validity problem, they raise ValueError,
# which we ignore above. Consequently the call to is_valid_signature here
# should not remove anything else; it's just an extra sanity check.
return {k: v for k, v in signature_def_map.items()
if signature_def_utils.is_valid_signature(v)}
_FRIENDLY_METHOD_NAMES = {
signature_constants.CLASSIFY_METHOD_NAME: 'Classify',
signature_constants.REGRESS_METHOD_NAME: 'Regress',
signature_constants.PREDICT_METHOD_NAME: 'Predict',
}
def _log_signature_report(signature_def_map, excluded_signatures):
"""Log a report of which signatures were produced."""
sig_names_by_method_name = collections.defaultdict(list)
# We'll collect whatever method_names are present, but also we want to make
# sure to output a line for each of the three standard methods even if they
# have no signatures.
for method_name in _FRIENDLY_METHOD_NAMES:
sig_names_by_method_name[method_name] = []
for signature_name, sig in signature_def_map.items():
sig_names_by_method_name[sig.method_name].append(signature_name)
# TODO(b/67733540): consider printing the full signatures, not just names
for method_name, sig_names in sig_names_by_method_name.items():
if method_name in _FRIENDLY_METHOD_NAMES:
method_name = _FRIENDLY_METHOD_NAMES[method_name]
logging.info('Signatures INCLUDED in export for {}: {}'.format(
method_name, sig_names if sig_names else 'None'))
if excluded_signatures:
logging.info('Signatures EXCLUDED from export because they cannot be '
'be served via TensorFlow Serving APIs:')
for signature_name, message in excluded_signatures.items():
logging.info('\'{}\' : {}'.format(signature_name, message))
if not signature_def_map:
logging.warn('Export includes no signatures!')
elif (signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
not in signature_def_map):
logging.warn('Export includes no default signature!')
# When we create a timestamped directory, there is a small chance that the
# directory already exists because another worker is also writing exports.
# In this case we just wait one second to get a new timestamp and try again.
# If this fails several times in a row, then something is seriously wrong.
MAX_DIRECTORY_CREATION_ATTEMPTS = 10
def get_timestamped_export_dir(export_dir_base):
"""Builds a path to a new subdirectory within the base directory.
Each export is written into a new subdirectory named using the
current time. This guarantees monotonically increasing version
numbers even across multiple runs of the pipeline.
The timestamp used is the number of seconds since epoch UTC.
Args:
export_dir_base: A string containing a directory to write the exported
graph and checkpoints.
Returns:
The full path of the new subdirectory (which is not actually created yet).
Raises:
RuntimeError: if repeated attempts fail to obtain a unique timestamped
directory name.
"""
attempts = 0
while attempts < MAX_DIRECTORY_CREATION_ATTEMPTS:
export_timestamp = int(time.time())
export_dir = os.path.join(
compat.as_bytes(export_dir_base),
compat.as_bytes(str(export_timestamp)))
if not gfile.Exists(export_dir):
# Collisions are still possible (though extremely unlikely): this
# directory is not actually created yet, but it will be almost
# instantly on return from this function.
return export_dir
time.sleep(1)
attempts += 1
logging.warn(
'Export directory {} already exists; retrying (attempt {}/{})'.format(
export_dir, attempts, MAX_DIRECTORY_CREATION_ATTEMPTS))
raise RuntimeError('Failed to obtain a unique export directory name after '
'{} attempts.'.format(MAX_DIRECTORY_CREATION_ATTEMPTS))
def get_temp_export_dir(timestamped_export_dir):
"""Builds a directory name based on the argument but starting with 'temp-'.
This relies on the fact that TensorFlow Serving ignores subdirectories of
the base directory that can't be parsed as integers.
Args:
timestamped_export_dir: the name of the eventual export directory, e.g.
/foo/bar/<timestamp>
Returns:
A sister directory prefixed with 'temp-', e.g. /foo/bar/temp-<timestamp>.
"""
(dirname, basename) = os.path.split(timestamped_export_dir)
temp_export_dir = os.path.join(
compat.as_bytes(dirname),
compat.as_bytes('temp-{}'.format(basename)))
return temp_export_dir