From fa9bc3943df55bc0d077ba9b02101ae792a6fb57 Mon Sep 17 00:00:00 2001 From: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> Date: Tue, 18 May 2021 18:01:28 -0400 Subject: [PATCH] feat: add tensorboard support to custom job and hyperparameter tuning job (#404) --- google/cloud/aiplatform/jobs.py | 60 +++++++++- tests/unit/aiplatform/test_custom_job.py | 71 ++++++++++- .../test_hyperparameter_tuning_job.py | 113 ++++++++++++++++-- 3 files changed, 234 insertions(+), 10 deletions(-) diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 7b1f5cccc5..1b06289ec8 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -45,11 +45,13 @@ batch_prediction_job_v1 as gca_bp_job_v1, batch_prediction_job_v1beta1 as gca_bp_job_v1beta1, custom_job as gca_custom_job_compat, + custom_job_v1beta1 as gca_custom_job_v1beta1, explanation_v1beta1 as gca_explanation_v1beta1, io as gca_io_compat, io_v1beta1 as gca_io_v1beta1, job_state as gca_job_state, hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat, + hyperparameter_tuning_job_v1beta1 as gca_hyperparameter_tuning_job_v1beta1, machine_resources as gca_machine_resources_compat, machine_resources_v1beta1 as gca_machine_resources_v1beta1, study as gca_study_compat, @@ -1132,6 +1134,7 @@ def run( network: Optional[str] = None, timeout: Optional[int] = None, restart_job_on_worker_restart: bool = False, + tensorboard: Optional[str] = None, sync: bool = True, ) -> None: """Run this configured CustomJob. @@ -1152,6 +1155,21 @@ def run( gets restarted. This feature can be used by distributed training jobs that are not resilient to workers leaving and joining a job. + tensorboard (str): + Optional. The name of an AI Platform + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following AI Platform environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training sync (bool): Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future. @@ -1170,9 +1188,18 @@ def run( restart_job_on_worker_restart=restart_job_on_worker_restart, ) + if tensorboard: + v1beta1_gca_resource = gca_custom_job_v1beta1.CustomJob() + v1beta1_gca_resource._pb.MergeFromString( + self._gca_resource._pb.SerializeToString() + ) + self._gca_resource = v1beta1_gca_resource + self._gca_resource.job_spec.tensorboard = tensorboard + _LOGGER.log_create_with_lro(self.__class__) - self._gca_resource = self.api_client.create_custom_job( + version = "v1beta1" if tensorboard else "v1" + self._gca_resource = self.api_client.select_version(version).create_custom_job( parent=self._parent, custom_job=self._gca_resource ) @@ -1415,6 +1442,7 @@ def run( network: Optional[str] = None, timeout: Optional[int] = None, # seconds restart_job_on_worker_restart: bool = False, + tensorboard: Optional[str] = None, sync: bool = True, ) -> None: """Run this configured CustomJob. @@ -1435,6 +1463,21 @@ def run( gets restarted. This feature can be used by distributed training jobs that are not resilient to workers leaving and joining a job. + tensorboard (str): + Optional. The name of an AI Platform + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following AI Platform environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training sync (bool): Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future. @@ -1453,9 +1496,22 @@ def run( restart_job_on_worker_restart=restart_job_on_worker_restart, ) + if tensorboard: + v1beta1_gca_resource = ( + gca_hyperparameter_tuning_job_v1beta1.HyperparameterTuningJob() + ) + v1beta1_gca_resource._pb.MergeFromString( + self._gca_resource._pb.SerializeToString() + ) + self._gca_resource = v1beta1_gca_resource + self._gca_resource.trial_job_spec.tensorboard = tensorboard + _LOGGER.log_create_with_lro(self.__class__) - self._gca_resource = self.api_client.create_hyperparameter_tuning_job( + version = "v1beta1" if tensorboard else "v1" + self._gca_resource = self.api_client.select_version( + version + ).create_hyperparameter_tuning_job( parent=self._parent, hyperparameter_tuning_job=self._gca_resource ) diff --git a/tests/unit/aiplatform/test_custom_job.py b/tests/unit/aiplatform/test_custom_job.py index 37c2ac3df0..7797e0edef 100644 --- a/tests/unit/aiplatform/test_custom_job.py +++ b/tests/unit/aiplatform/test_custom_job.py @@ -29,12 +29,18 @@ from google.cloud import aiplatform from google.cloud.aiplatform.compat.types import custom_job as gca_custom_job_compat +from google.cloud.aiplatform.compat.types import ( + custom_job_v1beta1 as gca_custom_job_v1beta1, +) from google.cloud.aiplatform.compat.types import io as gca_io_compat from google.cloud.aiplatform.compat.types import job_state as gca_job_state_compat from google.cloud.aiplatform.compat.types import ( encryption_spec as gca_encryption_spec_compat, ) from google.cloud.aiplatform_v1.services.job_service import client as job_service_client +from google.cloud.aiplatform_v1beta1.services.job_service import ( + client as job_service_client_v1beta1, +) _TEST_PROJECT = "test-project" _TEST_LOCATION = "us-central1" @@ -44,6 +50,7 @@ _TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" _TEST_CUSTOM_JOB_NAME = f"{_TEST_PARENT}/customJobs/{_TEST_ID}" +_TEST_TENSORBOARD_NAME = f"{_TEST_PARENT}/tensorboards/{_TEST_ID}" _TEST_TRAINING_CONTAINER_IMAGE = "gcr.io/test-training/container:image" @@ -97,11 +104,20 @@ ) -def _get_custom_job_proto(state=None, name=None, error=None): +def _get_custom_job_proto(state=None, name=None, error=None, version="v1"): custom_job_proto = copy.deepcopy(_TEST_BASE_CUSTOM_JOB_PROTO) custom_job_proto.name = name custom_job_proto.state = state custom_job_proto.error = error + + if version == "v1beta1": + v1beta1_custom_job_proto = gca_custom_job_v1beta1.CustomJob() + v1beta1_custom_job_proto._pb.MergeFromString( + custom_job_proto._pb.SerializeToString() + ) + custom_job_proto = v1beta1_custom_job_proto + custom_job_proto.job_spec.tensorboard = _TEST_TENSORBOARD_NAME + return custom_job_proto @@ -162,6 +178,19 @@ def create_custom_job_mock(): yield create_custom_job_mock +@pytest.fixture +def create_custom_job_v1beta1_mock(): + with mock.patch.object( + job_service_client_v1beta1.JobServiceClient, "create_custom_job" + ) as create_custom_job_mock: + create_custom_job_mock.return_value = _get_custom_job_proto( + name=_TEST_CUSTOM_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_PENDING, + version="v1beta1", + ) + yield create_custom_job_mock + + class TestCustomJob: def setup_method(self): reload(aiplatform.initializer) @@ -321,3 +350,43 @@ def test_create_from_local_script_raises_with_no_staging_bucket( script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME, container_uri=_TEST_TRAINING_CONTAINER_IMAGE, ) + + @pytest.mark.parametrize("sync", [True, False]) + def test_create_custom_job_with_tensorboard( + self, create_custom_job_v1beta1_mock, get_custom_job_mock, sync + ): + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + job = aiplatform.CustomJob( + display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + ) + + job.run( + service_account=_TEST_SERVICE_ACCOUNT, + tensorboard=_TEST_TENSORBOARD_NAME, + network=_TEST_NETWORK, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + sync=sync, + ) + + job.wait() + + expected_custom_job = _get_custom_job_proto(version="v1beta1") + + create_custom_job_v1beta1_mock.assert_called_once_with( + parent=_TEST_PARENT, custom_job=expected_custom_job + ) + + expected_custom_job = _get_custom_job_proto() + + assert job.job_spec == expected_custom_job.job_spec + assert ( + job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED + ) diff --git a/tests/unit/aiplatform/test_hyperparameter_tuning_job.py b/tests/unit/aiplatform/test_hyperparameter_tuning_job.py index fcd15f93ac..f4102fc3bb 100644 --- a/tests/unit/aiplatform/test_hyperparameter_tuning_job.py +++ b/tests/unit/aiplatform/test_hyperparameter_tuning_job.py @@ -31,9 +31,13 @@ ) from google.cloud.aiplatform.compat.types import ( hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat, + hyperparameter_tuning_job_v1beta1 as gca_hyperparameter_tuning_job_v1beta1, ) from google.cloud.aiplatform.compat.types import study as gca_study_compat from google.cloud.aiplatform_v1.services.job_service import client as job_service_client +from google.cloud.aiplatform_v1beta1.services.job_service import ( + client as job_service_client_v1beta1, +) import test_custom_job @@ -122,12 +126,29 @@ ) -def _get_hyperparameter_tuning_job_proto(state=None, name=None, error=None): - custom_job_proto = copy.deepcopy(_TEST_BASE_HYPERPARAMETER_TUNING_JOB_PROTO) - custom_job_proto.name = name - custom_job_proto.state = state - custom_job_proto.error = error - return custom_job_proto +def _get_hyperparameter_tuning_job_proto( + state=None, name=None, error=None, version="v1" +): + hyperparameter_tuning_job_proto = copy.deepcopy( + _TEST_BASE_HYPERPARAMETER_TUNING_JOB_PROTO + ) + hyperparameter_tuning_job_proto.name = name + hyperparameter_tuning_job_proto.state = state + hyperparameter_tuning_job_proto.error = error + + if version == "v1beta1": + v1beta1_hyperparameter_tuning_job_proto = ( + gca_hyperparameter_tuning_job_v1beta1.HyperparameterTuningJob() + ) + v1beta1_hyperparameter_tuning_job_proto._pb.MergeFromString( + hyperparameter_tuning_job_proto._pb.SerializeToString() + ) + hyperparameter_tuning_job_proto = v1beta1_hyperparameter_tuning_job_proto + hyperparameter_tuning_job_proto.trial_job_spec.tensorboard = ( + test_custom_job._TEST_TENSORBOARD_NAME + ) + + return hyperparameter_tuning_job_proto @pytest.fixture @@ -187,7 +208,20 @@ def create_hyperparameter_tuning_job_mock(): yield create_hyperparameter_tuning_job_mock -class TestCustomJob: +@pytest.fixture +def create_hyperparameter_tuning_job_v1beta1_mock(): + with mock.patch.object( + job_service_client_v1beta1.JobServiceClient, "create_hyperparameter_tuning_job" + ) as create_hyperparameter_tuning_job_mock: + create_hyperparameter_tuning_job_mock.return_value = _get_hyperparameter_tuning_job_proto( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_PENDING, + version="v1beta1", + ) + yield create_hyperparameter_tuning_job_mock + + +class TestHyperparameterTuningJob: def setup_method(self): reload(aiplatform.initializer) reload(aiplatform) @@ -366,3 +400,68 @@ def test_get_hyperparameter_tuning_job(self, get_hyperparameter_tuning_job_mock) assert ( job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_PENDING ) + + @pytest.mark.parametrize("sync", [True, False]) + def test_create_hyperparameter_tuning_job_with_tensorboard( + self, + create_hyperparameter_tuning_job_v1beta1_mock, + get_hyperparameter_tuning_job_mock, + sync, + ): + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + custom_job = aiplatform.CustomJob( + display_name=test_custom_job._TEST_DISPLAY_NAME, + worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + ) + + job = aiplatform.HyperparameterTuningJob( + display_name=_TEST_DISPLAY_NAME, + custom_job=custom_job, + metric_spec={_TEST_METRIC_SPEC_KEY: _TEST_METRIC_SPEC_VALUE}, + parameter_spec={ + "lr": hpt.DoubleParameterSpec(min=0.001, max=0.1, scale="log"), + "units": hpt.IntegerParameterSpec(min=4, max=1028, scale="linear"), + "activation": hpt.CategoricalParameterSpec( + values=["relu", "sigmoid", "elu", "selu", "tanh"] + ), + "batch_size": hpt.DiscreteParameterSpec( + values=[16, 32], scale="linear" + ), + }, + parallel_trial_count=_TEST_PARALLEL_TRIAL_COUNT, + max_trial_count=_TEST_MAX_TRIAL_COUNT, + max_failed_trial_count=_TEST_MAX_FAILED_TRIAL_COUNT, + search_algorithm=_TEST_SEARCH_ALGORITHM, + measurement_selection=_TEST_MEASUREMENT_SELECTION, + ) + + job.run( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + tensorboard=test_custom_job._TEST_TENSORBOARD_NAME, + sync=sync, + ) + + job.wait() + + expected_hyperparameter_tuning_job = _get_hyperparameter_tuning_job_proto( + version="v1beta1" + ) + + create_hyperparameter_tuning_job_v1beta1_mock.assert_called_once_with( + parent=_TEST_PARENT, + hyperparameter_tuning_job=expected_hyperparameter_tuning_job, + ) + + assert ( + job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED + )