Skip to content

Commit

Permalink
feat: add tensorboard support for CustomTrainingJob, CustomContainerT…
Browse files Browse the repository at this point in the history
…rainingJob, CustomPythonPackageTrainingJob (#462)
  • Loading branch information
sasha-gitg committed Jun 9, 2021
1 parent e7bf0d8 commit 8cfd611
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 22 deletions.
17 changes: 17 additions & 0 deletions google/cloud/aiplatform/jobs.py
Expand Up @@ -36,6 +36,7 @@
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import hyperparameter_tuning
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import console_utils
from google.cloud.aiplatform.utils import source_utils
from google.cloud.aiplatform.utils import worker_spec_utils

Expand Down Expand Up @@ -1209,6 +1210,14 @@ def run(

_LOGGER.info("View Custom Job:\n%s" % self._dashboard_uri())

if tensorboard:
_LOGGER.info(
"View Tensorboard:\n%s"
% console_utils.custom_job_tensorboard_console_uri(
tensorboard, self.resource_name
)
)

self._block_until_complete()

@property
Expand Down Expand Up @@ -1521,6 +1530,14 @@ def run(

_LOGGER.info("View HyperparameterTuningJob:\n%s" % self._dashboard_uri())

if tensorboard:
_LOGGER.info(
"View Tensorboard:\n%s"
% console_utils.custom_job_tensorboard_console_uri(
tensorboard, self.resource_name
)
)

self._block_until_complete()

@property
Expand Down
163 changes: 163 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -28,6 +28,7 @@
from google.cloud.aiplatform import models
from google.cloud.aiplatform import schema
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import console_utils

from google.cloud.aiplatform.compat.types import (
env_var as gca_env_var,
Expand Down Expand Up @@ -619,6 +620,10 @@ def _get_model(self) -> Optional[models.Model]:
fields.id, project=fields.project, location=fields.location,
)

def _wait_callback(self):
"""Callback performs custom logging during _block_until_complete. Override in subclass."""
pass

def _block_until_complete(self):
"""Helper method to block and check on job until complete."""

Expand All @@ -629,6 +634,7 @@ def _block_until_complete(self):
multiplier = 2 # scale wait by 2 every iteration

previous_time = time.time()

while self.state not in _PIPELINE_COMPLETE_STATES:
current_time = time.time()
if current_time - previous_time >= log_wait:
Expand All @@ -642,6 +648,7 @@ def _block_until_complete(self):
)
log_wait = min(log_wait * multiplier, max_wait)
previous_time = current_time
self._wait_callback()
time.sleep(wait)

self._raise_failure()
Expand Down Expand Up @@ -997,6 +1004,11 @@ def __init__(
"set using aiplatform.init(staging_bucket='gs://my-bucket')"
)

# Backing Custom Job resource is not known until after data preprocessing
# once Custom Job is known we log the console uri and the tensorboard uri
# this flags keeps that state so we don't log it multiple times
self._has_logged_custom_job = False

def _prepare_and_validate_run(
self,
model_display_name: Optional[str] = None,
Expand Down Expand Up @@ -1076,6 +1088,7 @@ def _prepare_training_task_inputs_and_output_dir(
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
network: Optional[str] = None,
tensorboard: Optional[str] = None,
) -> Tuple[Dict, str]:
"""Prepares training task inputs and output directory for custom job.
Expand All @@ -1093,6 +1106,21 @@ def _prepare_training_task_inputs_and_output_dir(
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
If left unspecified, the job is not peered with any network.
tensorboard (str):
Optional. The name of an Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
Returns:
Training task inputs and Output directory for custom job.
"""
Expand All @@ -1113,9 +1141,42 @@ def _prepare_training_task_inputs_and_output_dir(
training_task_inputs["service_account"] = service_account
if network:
training_task_inputs["network"] = network
if tensorboard:
training_task_inputs["tensorboard"] = tensorboard

return training_task_inputs, base_output_dir

def _wait_callback(self):
if (
self._gca_resource.training_task_metadata.get("backingCustomJob")
and not self._has_logged_custom_job
):
_LOGGER.info(f"View backing custom job:\n{self._custom_job_console_uri()}")

if self._gca_resource.training_task_inputs.get("tensorboard"):
_LOGGER.info(f"View tensorboard:\n{self._tensorboard_console_uri()}")

self._has_logged_custom_job = True

def _custom_job_console_uri(self) -> str:
"""Helper method to compose the dashboard uri where custom job can be viewed."""
custom_job_resource_name = self._gca_resource.training_task_metadata.get(
"backingCustomJob"
)
return console_utils.custom_job_console_uri(custom_job_resource_name)

def _tensorboard_console_uri(self) -> str:
"""Helper method to compose dashboard uri where tensorboard can be viewed."""
tensorboard_resource_name = self._gca_resource.training_task_inputs.get(
"tensorboard"
)
custom_job_resource_name = self._gca_resource.training_task_metadata.get(
"backingCustomJob"
)
return console_utils.custom_job_tensorboard_console_uri(
tensorboard_resource_name, custom_job_resource_name
)

@property
def _model_upload_fail_string(self) -> str:
"""Helper property for model upload failure."""
Expand Down Expand Up @@ -1372,6 +1433,7 @@ def run(
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
predefined_split_column_name: Optional[str] = None,
tensorboard: Optional[str] = None,
sync=True,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -1512,6 +1574,21 @@ def run(
ignored by the pipeline.
Supported only for tabular and time series Datasets.
tensorboard (str):
Optional. The name of an Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -1550,6 +1627,7 @@ def run(
validation_fraction_split=validation_fraction_split,
test_fraction_split=test_fraction_split,
predefined_split_column_name=predefined_split_column_name,
tensorboard=tensorboard,
sync=sync,
)

Expand Down Expand Up @@ -1578,6 +1656,7 @@ def _run(
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
predefined_split_column_name: Optional[str] = None,
tensorboard: Optional[str] = None,
sync=True,
) -> Optional[models.Model]:
"""Packages local script and launches training_job.
Expand Down Expand Up @@ -1665,6 +1744,21 @@ def _run(
ignored by the pipeline.
Supported only for tabular and time series Datasets.
tensorboard (str):
Optional. The name of an Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -1704,6 +1798,7 @@ def _run(
base_output_dir=base_output_dir,
service_account=service_account,
network=network,
tensorboard=tensorboard,
)

model = self._run_job(
Expand Down Expand Up @@ -1960,6 +2055,7 @@ def run(
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
predefined_split_column_name: Optional[str] = None,
tensorboard: Optional[str] = None,
sync=True,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -2093,6 +2189,21 @@ def run(
ignored by the pipeline.
Supported only for tabular and time series Datasets.
tensorboard (str):
Optional. The name of an Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -2130,6 +2241,7 @@ def run(
validation_fraction_split=validation_fraction_split,
test_fraction_split=test_fraction_split,
predefined_split_column_name=predefined_split_column_name,
tensorboard=tensorboard,
sync=sync,
)

Expand Down Expand Up @@ -2157,6 +2269,7 @@ def _run(
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
predefined_split_column_name: Optional[str] = None,
tensorboard: Optional[str] = None,
sync=True,
) -> Optional[models.Model]:
"""Packages local script and launches training_job.
Expand Down Expand Up @@ -2240,6 +2353,21 @@ def _run(
ignored by the pipeline.
Supported only for tabular and time series Datasets.
tensorboard (str):
Optional. The name of an Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -2273,6 +2401,7 @@ def _run(
base_output_dir=base_output_dir,
service_account=service_account,
network=network,
tensorboard=tensorboard,
)

model = self._run_job(
Expand Down Expand Up @@ -3791,6 +3920,7 @@ def run(
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
predefined_split_column_name: Optional[str] = None,
tensorboard: Optional[str] = None,
sync=True,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -3924,6 +4054,21 @@ def run(
ignored by the pipeline.
Supported only for tabular and time series Datasets.
tensorboard (str):
Optional. The name of an Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -3956,6 +4101,7 @@ def run(
test_fraction_split=test_fraction_split,
predefined_split_column_name=predefined_split_column_name,
bigquery_destination=bigquery_destination,
tensorboard=tensorboard,
sync=sync,
)

Expand Down Expand Up @@ -3983,6 +4129,7 @@ def _run(
test_fraction_split: float = 0.1,
predefined_split_column_name: Optional[str] = None,
bigquery_destination: Optional[str] = None,
tensorboard: Optional[str] = None,
sync=True,
) -> Optional[models.Model]:
"""Packages local script and launches training_job.
Expand Down Expand Up @@ -4053,6 +4200,21 @@ def _run(
ignored by the pipeline.
Supported only for tabular and time series Datasets.
tensorboard (str):
Optional. The name of an Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -4086,6 +4248,7 @@ def _run(
base_output_dir=base_output_dir,
service_account=service_account,
network=network,
tensorboard=tensorboard,
)

model = self._run_job(
Expand Down

0 comments on commit 8cfd611

Please sign in to comment.