/
uploader_utils.py
483 lines (414 loc) · 17.5 KB
/
uploader_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
# -*- coding: utf-8 -*-
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Shared utils for tensorboard log uploader."""
import abc
import contextlib
import json
import logging
import re
import time
from typing import Callable, Dict, Generator, Optional, List, Tuple
import uuid
from tensorboard.util import tb_logging
from google.api_core import exceptions
from google.cloud import storage
from google.cloud.aiplatform.compat.types import (
tensorboard_run_v1beta1 as tensorboard_run,
)
from google.cloud.aiplatform.compat.types import (
tensorboard_service_v1beta1 as tensorboard_service,
)
from google.cloud.aiplatform.compat.types import (
tensorboard_time_series_v1beta1 as tensorboard_time_series,
)
from google.cloud.aiplatform.compat.services import tensorboard_service_client_v1beta1
TensorboardServiceClient = tensorboard_service_client_v1beta1.TensorboardServiceClient
logger = tb_logging.get_logger()
logger.setLevel(logging.WARNING)
class ExistingResourceNotFoundError(RuntimeError):
"""Resource could not be created or retrieved."""
class RequestSender(object):
"""A base class for additional request sender objects.
Currently just used for typing.
"""
@abc.abstractmethod
def send_requests(run_name: str):
"""Sends any request for the run."""
pass
class OnePlatformResourceManager(object):
"""Helper class managing One Platform resources."""
CREATE_RUN_BATCH_SIZE = 1000
CREATE_TIME_SERIES_BATCH_SIZE = 1000
def __init__(self, experiment_resource_name: str, api: TensorboardServiceClient):
"""Constructor for OnePlatformResourceManager.
Args:
experiment_resource_name (str):
Required. The resource id for the run with the following format
projects/{project}/locations/{location}/tensorboards/{tensorboard}/experiments/{experiment}
api (TensorboardServiceClient):
Required. TensorboardServiceStub for calling various tensorboard services.
"""
self._experiment_resource_name = experiment_resource_name
self._api = api
self._run_name_to_run_resource_name: Dict[str, str] = {}
self._run_tag_name_to_time_series_name: Dict[(str, str), str] = {}
def batch_create_runs(
self, run_names: List[str]
) -> List[tensorboard_run.TensorboardRun]:
"""Batch creates TensorboardRuns.
Args:
run_names: a list of run_names for creating the TensorboardRuns.
Returns:
the created TensorboardRuns
"""
batch_size = OnePlatformResourceManager.CREATE_RUN_BATCH_SIZE
created_runs = []
for i in range(0, len(run_names), batch_size):
one_batch_run_names = run_names[i : i + batch_size]
tb_run_requests = [
tensorboard_service.CreateTensorboardRunRequest(
parent=self._experiment_resource_name,
tensorboard_run=tensorboard_run.TensorboardRun(
display_name=run_name
),
tensorboard_run_id=str(uuid.uuid4()),
)
for run_name in one_batch_run_names
]
tb_runs = self._api.batch_create_tensorboard_runs(
parent=self._experiment_resource_name, requests=tb_run_requests,
).tensorboard_runs
self._run_name_to_run_resource_name.update(
{run.display_name: run.name for run in tb_runs}
)
created_runs.extend(tb_runs)
return created_runs
def batch_create_time_series(
self,
run_tag_name_to_time_series: Dict[
Tuple[str, str], tensorboard_time_series.TensorboardTimeSeries
],
) -> List[tensorboard_time_series.TensorboardTimeSeries]:
"""Batch creates TensorboardTimeSeries.
Args:
run_tag_name_to_time_series: a dictionary of
(run_name, tag_name) to TensorboardTimeSeries proto, containing
the TensorboardTimeSeries to create.
Returns:
the created TensorboardTimeSeries
"""
batch_size = OnePlatformResourceManager.CREATE_TIME_SERIES_BATCH_SIZE
run_tag_name_to_time_series_entries = list(run_tag_name_to_time_series.items())
run_resource_name_to_run_name = {
v: k for k, v in self._run_name_to_run_resource_name.items()
}
created_time_series = []
for i in range(0, len(run_tag_name_to_time_series_entries), batch_size):
requests = [
tensorboard_service.CreateTensorboardTimeSeriesRequest(
parent=self._run_name_to_run_resource_name[run_name],
tensorboard_time_series=time_series,
)
for (
(run_name, tag_name),
time_series,
) in run_tag_name_to_time_series_entries[i : i + batch_size]
]
time_series = self._api.batch_create_tensorboard_time_series(
parent=self._experiment_resource_name, requests=requests,
).tensorboard_time_series
self._run_tag_name_to_time_series_name.update(
{
(
run_resource_name_to_run_name[
ts.name[: ts.name.index("/timeSeries")]
],
ts.display_name,
): ts.name
for ts in time_series
}
)
created_time_series.extend(time_series)
return created_time_series
def get_run_resource_name(self, run_name: str) -> str:
"""
Get the resource name of the run if it exists, otherwise creates the run
on One Platform before returning its resource name.
Args:
run_name (str):
Required. The name of the run.
Returns:
run_resource (str):
Resource name of the run.
"""
if run_name not in self._run_name_to_run_resource_name:
tb_run = self._create_or_get_run_resource(run_name)
self._run_name_to_run_resource_name[run_name] = tb_run.name
return self._run_name_to_run_resource_name[run_name]
def _create_or_get_run_resource(
self, run_name: str
) -> tensorboard_run.TensorboardRun:
"""Creates a new run resource in current tensorboard experiment resource.
Args:
run_name (str):
Required. The display name of this run.
Returns:
tb_run (google.cloud.aiplatform_v1beta1.types.TensorboardRun):
The TensorboardRun given the run_name.
Raises:
ExistingResourceNotFoundError:
Run name could not be found in resource list.
exceptions.InvalidArgument:
run_name argument is invalid.
"""
tb_run = tensorboard_run.TensorboardRun()
tb_run.display_name = run_name
try:
tb_run = self._api.create_tensorboard_run(
parent=self._experiment_resource_name,
tensorboard_run=tb_run,
tensorboard_run_id=str(uuid.uuid4()),
)
except exceptions.InvalidArgument as e:
# If the run name already exists then retrieve it
if "already exist" in e.message:
runs_pages = self._api.list_tensorboard_runs(
parent=self._experiment_resource_name
)
for tb_run in runs_pages:
if tb_run.display_name == run_name:
break
if tb_run.display_name != run_name:
raise ExistingResourceNotFoundError(
"Run with name %s already exists but is not resource list."
% run_name
)
else:
raise
return tb_run
def get_time_series_resource_name(
self,
run_name: str,
tag_name: str,
time_series_resource_creator: Callable[
[], tensorboard_time_series.TensorboardTimeSeries
],
) -> str:
"""
Get the resource name of the time series corresponding to the tag, if it
exists, otherwise creates the time series on One Platform before
returning its resource name.
Args:
run_name (str):
Required. The name of the run.
tag_name (str):
Required. The name of the tag.
time_series_resource_creator (tensorboard_time_series.TensorboardTimeSeries):
Required. A constructor used for creating the time series on One Platform.
Returns:
time_series_name (str):
Resource name of the time series
"""
if (run_name, tag_name) not in self._run_tag_name_to_time_series_name:
time_series = self._create_or_get_time_series(
self.get_run_resource_name(run_name),
tag_name,
time_series_resource_creator,
)
self._run_tag_name_to_time_series_name[
(run_name, tag_name)
] = time_series.name
return self._run_tag_name_to_time_series_name[(run_name, tag_name)]
def _create_or_get_time_series(
self,
run_resource_name: str,
tag_name: str,
time_series_resource_creator: Callable[
[], tensorboard_time_series.TensorboardTimeSeries
],
) -> tensorboard_time_series.TensorboardTimeSeries:
"""
Get a time series resource with given tag_name, and create a new one on
OnePlatform if not present.
Args:
tag_name (str):
Required. The tag name of the time series in the Tensorboard log dir.
time_series_resource_creator (Callable[[], tensorboard_time_series.TensorboardTimeSeries):
Required. A callable that produces a TimeSeries for creation.
Returns:
time_series (tensorboard_time_series.TensorboardTimeSeries):
A created or existing tensorboard_time_series.TensorboardTimeSeries.
Raises:
exceptions.InvalidArgument:
Invalid run_resource_name, tag_name, or time_series_resource_creator.
ExistingResourceNotFoundError:
Could not find the resource given the tag name.
ValueError:
More than one time series with the resource name was found.
"""
time_series = time_series_resource_creator()
time_series.display_name = tag_name
try:
time_series = self._api.create_tensorboard_time_series(
parent=run_resource_name, tensorboard_time_series=time_series
)
except exceptions.InvalidArgument as e:
# If the time series display name already exists then retrieve it
if "already exist" in e.message:
list_of_time_series = self._api.list_tensorboard_time_series(
request=tensorboard_service.ListTensorboardTimeSeriesRequest(
parent=run_resource_name,
filter="display_name = {}".format(json.dumps(str(tag_name))),
)
)
num = 0
time_series = None
for ts in list_of_time_series:
num += 1
if num > 1:
break
time_series = ts
if not time_series:
raise ExistingResourceNotFoundError(
"Could not find time series resource with display name: {}".format(
tag_name
)
)
if num != 1:
raise ValueError(
"More than one time series resource found with display_name: {}".format(
tag_name
)
)
else:
raise
return time_series
class TimeSeriesResourceManager(object):
"""Helper class managing Time Series resources."""
def __init__(self, run_resource_id: str, api: TensorboardServiceClient):
"""Constructor for TimeSeriesResourceManager.
Args:
run_resource_id (str):
Required. The resource id for the run with the following format.
projects/{project}/locations/{location}/tensorboards/{tensorboard}/experiments/{experiment}/runs/{run}
api (TensorboardServiceClient):
Required. A TensorboardServiceStub.
"""
self._run_resource_id = run_resource_id
self._api = api
self._tag_to_time_series_proto: Dict[
str, tensorboard_time_series.TensorboardTimeSeries
] = {}
def get_or_create(
self,
tag_name: str,
time_series_resource_creator: Callable[
[], tensorboard_time_series.TensorboardTimeSeries
],
) -> tensorboard_time_series.TensorboardTimeSeries:
"""
Get a time series resource with given tag_name, and create a new one on
OnePlatform if not present.
Args:
tag_name (str):
Required. The tag name of the time series in the Tensorboard log dir.
time_series_resource_creator (Callable[[], tensorboard_time_series.TensorboardTimeSeries]):
Required. A callable that produces a TimeSeries for creation.
Returns:
time_series (tensorboard_time_series.TensorboardTimeSeries):
A new or existing tensorboard_time_series.TensorbaordTimeSeries.
Raises:
exceptions.InvalidArgument:
The tag_name or time_series_resource_creator is an invalid argument
to create_tensorboard_time_series api call.
ExistingResourceNotFoundError:
Could not find the resource given the tag name.
ValueError:
More than one time series with the resource name was found.
"""
if tag_name in self._tag_to_time_series_proto:
return self._tag_to_time_series_proto[tag_name]
time_series = time_series_resource_creator()
time_series.display_name = tag_name
try:
time_series = self._api.create_tensorboard_time_series(
parent=self._run_resource_id, tensorboard_time_series=time_series
)
except exceptions.InvalidArgument as e:
# If the time series display name already exists then retrieve it
if "already exist" in e.message:
list_of_time_series = self._api.list_tensorboard_time_series(
request=tensorboard_service.ListTensorboardTimeSeriesRequest(
parent=self._run_resource_id,
filter="display_name = {}".format(json.dumps(str(tag_name))),
)
)
num = 0
time_series = None
for ts in list_of_time_series:
num += 1
if num > 1:
break
time_series = ts
if not time_series:
raise ExistingResourceNotFoundError(
"Could not find time series resource with display name: {}".format(
tag_name
)
)
if num != 1:
raise ValueError(
"More than one time series resource found with display_name: {}".format(
tag_name
)
)
else:
raise
self._tag_to_time_series_proto[tag_name] = time_series
return time_series
def get_source_bucket(logdir: str) -> Optional[storage.Bucket]:
"""Returns a storage bucket object given a log directory.
Args:
logdir (str):
Required. Path of the log directory.
Returns:
bucket (Optional[storage.Bucket]):
A bucket if the path is a gs bucket, None otherwise.
"""
m = re.match(r"gs:\/\/(.*?)(?=\/|$)", logdir)
if not m:
return None
bucket = storage.Client().bucket(m[1])
return bucket
@contextlib.contextmanager
def request_logger(
request: tensorboard_service.WriteTensorboardRunDataRequest,
) -> Generator[None, None, None]:
"""Context manager to log request size and duration.
Args:
request (tensorboard_service.WriteTensorboardRunDataRequest):
Required. A request object that provides the size of the request.
Yields:
An empty response when the request logger has started.
"""
upload_start_time = time.time()
request_bytes = request._pb.ByteSize() # pylint: disable=protected-access
logger.info("Trying request of %d bytes", request_bytes)
yield
upload_duration_secs = time.time() - upload_start_time
logger.info(
"Upload of (%d bytes) took %.3f seconds", request_bytes, upload_duration_secs,
)