Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tensorboard support to custom job and hyperparameter tuning job #404

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 56 additions & 2 deletions google/cloud/aiplatform/jobs.py
Expand Up @@ -45,11 +45,13 @@
batch_prediction_job_v1 as gca_bp_job_v1,
batch_prediction_job_v1beta1 as gca_bp_job_v1beta1,
custom_job as gca_custom_job_compat,
custom_job_v1beta1 as gca_custom_job_v1beta1,
explanation_v1beta1 as gca_explanation_v1beta1,
io as gca_io_compat,
io_v1beta1 as gca_io_v1beta1,
job_state as gca_job_state,
hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat,
hyperparameter_tuning_job_v1beta1 as gca_hyperparameter_tuning_job_v1beta1,
machine_resources as gca_machine_resources_compat,
machine_resources_v1beta1 as gca_machine_resources_v1beta1,
study as gca_study_compat,
Expand Down Expand Up @@ -1132,6 +1134,7 @@ def run(
network: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
tensorboard: Optional[str] = None,
sync: bool = True,
) -> None:
"""Run this configured CustomJob.
Expand All @@ -1152,6 +1155,20 @@ def run(
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
tensorboard (str):
Optional. The name of an AI Platform
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``

The training script should write Tensorboard to following AI Platform environment
variable:

AIP_TENSORBOARD_LOG_DIR

`service_account` is required with provided `tensorboard`.
(TODO: add documentation when released)
sasha-gitg marked this conversation as resolved.
Show resolved Hide resolved
sync (bool):
Whether to execute this method synchronously. If False, this method
will unblock and it will be executed in a concurrent Future.
Expand All @@ -1170,9 +1187,18 @@ def run(
restart_job_on_worker_restart=restart_job_on_worker_restart,
)

if tensorboard:
v1beta1_gca_resource = gca_custom_job_v1beta1.CustomJob()
v1beta1_gca_resource._pb.MergeFromString(
sasha-gitg marked this conversation as resolved.
Show resolved Hide resolved
self._gca_resource._pb.SerializeToString()
)
self._gca_resource = v1beta1_gca_resource
self._gca_resource.job_spec.tensorboard = tensorboard

_LOGGER.log_create_with_lro(self.__class__)

self._gca_resource = self.api_client.create_custom_job(
version = "v1beta1" if tensorboard else "v1"
self._gca_resource = self.api_client.select_version(version).create_custom_job(
parent=self._parent, custom_job=self._gca_resource
)

Expand Down Expand Up @@ -1415,6 +1441,7 @@ def run(
network: Optional[str] = None,
timeout: Optional[int] = None, # seconds
restart_job_on_worker_restart: bool = False,
tensorboard: Optional[str] = None,
sync: bool = True,
) -> None:
"""Run this configured CustomJob.
Expand All @@ -1435,6 +1462,20 @@ def run(
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
tensorboard (str):
Optional. The name of an AI Platform
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``

The training script should write Tensorboard to following AI Platform environment
variable:

AIP_TENSORBOARD_LOG_DIR

`service_account` is required with provided `tensorboard`.
(TODO: add documentation when released)
sync (bool):
Whether to execute this method synchronously. If False, this method
will unblock and it will be executed in a concurrent Future.
Expand All @@ -1453,9 +1494,22 @@ def run(
restart_job_on_worker_restart=restart_job_on_worker_restart,
)

if tensorboard:
sasha-gitg marked this conversation as resolved.
Show resolved Hide resolved
v1beta1_gca_resource = (
ivanmkc marked this conversation as resolved.
Show resolved Hide resolved
gca_hyperparameter_tuning_job_v1beta1.HyperparameterTuningJob()
)
v1beta1_gca_resource._pb.MergeFromString(
self._gca_resource._pb.SerializeToString()
)
self._gca_resource = v1beta1_gca_resource
self._gca_resource.trial_job_spec.tensorboard = tensorboard

_LOGGER.log_create_with_lro(self.__class__)

self._gca_resource = self.api_client.create_hyperparameter_tuning_job(
version = "v1beta1" if tensorboard else "v1"
self._gca_resource = self.api_client.select_version(
version
).create_hyperparameter_tuning_job(
parent=self._parent, hyperparameter_tuning_job=self._gca_resource
)

Expand Down
71 changes: 70 additions & 1 deletion tests/unit/aiplatform/test_custom_job.py
Expand Up @@ -29,12 +29,18 @@

from google.cloud import aiplatform
from google.cloud.aiplatform.compat.types import custom_job as gca_custom_job_compat
from google.cloud.aiplatform.compat.types import (
custom_job_v1beta1 as gca_custom_job_v1beta1,
)
from google.cloud.aiplatform.compat.types import io as gca_io_compat
from google.cloud.aiplatform.compat.types import job_state as gca_job_state_compat
from google.cloud.aiplatform.compat.types import (
encryption_spec as gca_encryption_spec_compat,
)
from google.cloud.aiplatform_v1.services.job_service import client as job_service_client
from google.cloud.aiplatform_v1beta1.services.job_service import (
client as job_service_client_v1beta1,
)

_TEST_PROJECT = "test-project"
_TEST_LOCATION = "us-central1"
Expand All @@ -44,6 +50,7 @@
_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}"

_TEST_CUSTOM_JOB_NAME = f"{_TEST_PARENT}/customJobs/{_TEST_ID}"
_TEST_TENSORBOARD_NAME = f"{_TEST_PARENT}/tensorboards/{_TEST_ID}"

_TEST_TRAINING_CONTAINER_IMAGE = "gcr.io/test-training/container:image"

Expand Down Expand Up @@ -97,11 +104,20 @@
)


def _get_custom_job_proto(state=None, name=None, error=None):
def _get_custom_job_proto(state=None, name=None, error=None, version="v1"):
custom_job_proto = copy.deepcopy(_TEST_BASE_CUSTOM_JOB_PROTO)
custom_job_proto.name = name
custom_job_proto.state = state
custom_job_proto.error = error

if version == "v1beta1":
v1beta1_custom_job_proto = gca_custom_job_v1beta1.CustomJob()
v1beta1_custom_job_proto._pb.MergeFromString(
custom_job_proto._pb.SerializeToString()
)
custom_job_proto = v1beta1_custom_job_proto
custom_job_proto.job_spec.tensorboard = _TEST_TENSORBOARD_NAME

return custom_job_proto


Expand Down Expand Up @@ -162,6 +178,19 @@ def create_custom_job_mock():
yield create_custom_job_mock


@pytest.fixture
def create_custom_job_v1beta1_mock():
with mock.patch.object(
job_service_client_v1beta1.JobServiceClient, "create_custom_job"
) as create_custom_job_mock:
create_custom_job_mock.return_value = _get_custom_job_proto(
name=_TEST_CUSTOM_JOB_NAME,
state=gca_job_state_compat.JobState.JOB_STATE_PENDING,
version="v1beta1",
)
yield create_custom_job_mock


class TestCustomJob:
def setup_method(self):
reload(aiplatform.initializer)
Expand Down Expand Up @@ -321,3 +350,43 @@ def test_create_from_local_script_raises_with_no_staging_bucket(
script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME,
container_uri=_TEST_TRAINING_CONTAINER_IMAGE,
)

@pytest.mark.parametrize("sync", [True, False])
def test_create_custom_job_with_tensorboard(
self, create_custom_job_v1beta1_mock, get_custom_job_mock, sync
):

aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

job = aiplatform.CustomJob(
display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC
)

job.run(
service_account=_TEST_SERVICE_ACCOUNT,
tensorboard=_TEST_TENSORBOARD_NAME,
network=_TEST_NETWORK,
timeout=_TEST_TIMEOUT,
restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART,
sync=sync,
)

job.wait()

expected_custom_job = _get_custom_job_proto(version="v1beta1")

create_custom_job_v1beta1_mock.assert_called_once_with(
parent=_TEST_PARENT, custom_job=expected_custom_job
)

expected_custom_job = _get_custom_job_proto()

assert job.job_spec == expected_custom_job.job_spec
assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED
)
113 changes: 106 additions & 7 deletions tests/unit/aiplatform/test_hyperparameter_tuning_job.py
Expand Up @@ -31,9 +31,13 @@
)
from google.cloud.aiplatform.compat.types import (
hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat,
hyperparameter_tuning_job_v1beta1 as gca_hyperparameter_tuning_job_v1beta1,
)
from google.cloud.aiplatform.compat.types import study as gca_study_compat
from google.cloud.aiplatform_v1.services.job_service import client as job_service_client
from google.cloud.aiplatform_v1beta1.services.job_service import (
client as job_service_client_v1beta1,
)

import test_custom_job

Expand Down Expand Up @@ -122,12 +126,29 @@
)


def _get_hyperparameter_tuning_job_proto(state=None, name=None, error=None):
custom_job_proto = copy.deepcopy(_TEST_BASE_HYPERPARAMETER_TUNING_JOB_PROTO)
custom_job_proto.name = name
custom_job_proto.state = state
custom_job_proto.error = error
return custom_job_proto
def _get_hyperparameter_tuning_job_proto(
state=None, name=None, error=None, version="v1"
):
hyperparameter_tuning_job_proto = copy.deepcopy(
_TEST_BASE_HYPERPARAMETER_TUNING_JOB_PROTO
)
hyperparameter_tuning_job_proto.name = name
hyperparameter_tuning_job_proto.state = state
hyperparameter_tuning_job_proto.error = error

if version == "v1beta1":
v1beta1_hyperparameter_tuning_job_proto = (
gca_hyperparameter_tuning_job_v1beta1.HyperparameterTuningJob()
)
v1beta1_hyperparameter_tuning_job_proto._pb.MergeFromString(
hyperparameter_tuning_job_proto._pb.SerializeToString()
)
hyperparameter_tuning_job_proto = v1beta1_hyperparameter_tuning_job_proto
hyperparameter_tuning_job_proto.trial_job_spec.tensorboard = (
test_custom_job._TEST_TENSORBOARD_NAME
)

return hyperparameter_tuning_job_proto


@pytest.fixture
Expand Down Expand Up @@ -187,7 +208,20 @@ def create_hyperparameter_tuning_job_mock():
yield create_hyperparameter_tuning_job_mock


class TestCustomJob:
@pytest.fixture
def create_hyperparameter_tuning_job_v1beta1_mock():
with mock.patch.object(
job_service_client_v1beta1.JobServiceClient, "create_hyperparameter_tuning_job"
) as create_hyperparameter_tuning_job_mock:
create_hyperparameter_tuning_job_mock.return_value = _get_hyperparameter_tuning_job_proto(
name=_TEST_HYPERPARAMETERTUNING_JOB_NAME,
state=gca_job_state_compat.JobState.JOB_STATE_PENDING,
version="v1beta1",
)
yield create_hyperparameter_tuning_job_mock


class TestHyperparameterTuningJob:
def setup_method(self):
reload(aiplatform.initializer)
reload(aiplatform)
Expand Down Expand Up @@ -366,3 +400,68 @@ def test_get_hyperparameter_tuning_job(self, get_hyperparameter_tuning_job_mock)
assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_PENDING
)

@pytest.mark.parametrize("sync", [True, False])
def test_create_hyperparameter_tuning_job_with_tensorboard(
self,
create_hyperparameter_tuning_job_v1beta1_mock,
get_hyperparameter_tuning_job_mock,
sync,
):

aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

custom_job = aiplatform.CustomJob(
display_name=test_custom_job._TEST_DISPLAY_NAME,
worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC,
)

job = aiplatform.HyperparameterTuningJob(
display_name=_TEST_DISPLAY_NAME,
custom_job=custom_job,
metric_spec={_TEST_METRIC_SPEC_KEY: _TEST_METRIC_SPEC_VALUE},
parameter_spec={
"lr": hpt.DoubleParameterSpec(min=0.001, max=0.1, scale="log"),
"units": hpt.IntegerParameterSpec(min=4, max=1028, scale="linear"),
"activation": hpt.CategoricalParameterSpec(
values=["relu", "sigmoid", "elu", "selu", "tanh"]
),
"batch_size": hpt.DiscreteParameterSpec(
values=[16, 32], scale="linear"
),
},
parallel_trial_count=_TEST_PARALLEL_TRIAL_COUNT,
max_trial_count=_TEST_MAX_TRIAL_COUNT,
max_failed_trial_count=_TEST_MAX_FAILED_TRIAL_COUNT,
search_algorithm=_TEST_SEARCH_ALGORITHM,
measurement_selection=_TEST_MEASUREMENT_SELECTION,
)

job.run(
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
timeout=_TEST_TIMEOUT,
restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART,
tensorboard=test_custom_job._TEST_TENSORBOARD_NAME,
sync=sync,
)

job.wait()

expected_hyperparameter_tuning_job = _get_hyperparameter_tuning_job_proto(
version="v1beta1"
)

create_hyperparameter_tuning_job_v1beta1_mock.assert_called_once_with(
parent=_TEST_PARENT,
hyperparameter_tuning_job=expected_hyperparameter_tuning_job,
)

assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED
)