diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index c37530a78f..89ea52097d 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -36,6 +36,7 @@ from google.cloud.aiplatform import initializer from google.cloud.aiplatform import hyperparameter_tuning from google.cloud.aiplatform import utils +from google.cloud.aiplatform.utils import console_utils from google.cloud.aiplatform.utils import source_utils from google.cloud.aiplatform.utils import worker_spec_utils @@ -1209,6 +1210,14 @@ def run( _LOGGER.info("View Custom Job:\n%s" % self._dashboard_uri()) + if tensorboard: + _LOGGER.info( + "View Tensorboard:\n%s" + % console_utils.custom_job_tensorboard_console_uri( + tensorboard, self.resource_name + ) + ) + self._block_until_complete() @property @@ -1521,6 +1530,14 @@ def run( _LOGGER.info("View HyperparameterTuningJob:\n%s" % self._dashboard_uri()) + if tensorboard: + _LOGGER.info( + "View Tensorboard:\n%s" + % console_utils.custom_job_tensorboard_console_uri( + tensorboard, self.resource_name + ) + ) + self._block_until_complete() @property diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 51fdb55d13..57329fe937 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -28,6 +28,7 @@ from google.cloud.aiplatform import models from google.cloud.aiplatform import schema from google.cloud.aiplatform import utils +from google.cloud.aiplatform.utils import console_utils from google.cloud.aiplatform.compat.types import ( env_var as gca_env_var, @@ -619,6 +620,10 @@ def _get_model(self) -> Optional[models.Model]: fields.id, project=fields.project, location=fields.location, ) + def _wait_callback(self): + """Callback performs custom logging during _block_until_complete. Override in subclass.""" + pass + def _block_until_complete(self): """Helper method to block and check on job until complete.""" @@ -629,6 +634,7 @@ def _block_until_complete(self): multiplier = 2 # scale wait by 2 every iteration previous_time = time.time() + while self.state not in _PIPELINE_COMPLETE_STATES: current_time = time.time() if current_time - previous_time >= log_wait: @@ -642,6 +648,7 @@ def _block_until_complete(self): ) log_wait = min(log_wait * multiplier, max_wait) previous_time = current_time + self._wait_callback() time.sleep(wait) self._raise_failure() @@ -997,6 +1004,11 @@ def __init__( "set using aiplatform.init(staging_bucket='gs://my-bucket')" ) + # Backing Custom Job resource is not known until after data preprocessing + # once Custom Job is known we log the console uri and the tensorboard uri + # this flags keeps that state so we don't log it multiple times + self._has_logged_custom_job = False + def _prepare_and_validate_run( self, model_display_name: Optional[str] = None, @@ -1076,6 +1088,7 @@ def _prepare_training_task_inputs_and_output_dir( base_output_dir: Optional[str] = None, service_account: Optional[str] = None, network: Optional[str] = None, + tensorboard: Optional[str] = None, ) -> Tuple[Dict, str]: """Prepares training task inputs and output directory for custom job. @@ -1093,6 +1106,21 @@ def _prepare_training_task_inputs_and_output_dir( should be peered. For example, projects/12345/global/networks/myVPC. Private services access must already be configured for the network. If left unspecified, the job is not peered with any network. + tensorboard (str): + Optional. The name of an Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training Returns: Training task inputs and Output directory for custom job. """ @@ -1113,9 +1141,42 @@ def _prepare_training_task_inputs_and_output_dir( training_task_inputs["service_account"] = service_account if network: training_task_inputs["network"] = network + if tensorboard: + training_task_inputs["tensorboard"] = tensorboard return training_task_inputs, base_output_dir + def _wait_callback(self): + if ( + self._gca_resource.training_task_metadata.get("backingCustomJob") + and not self._has_logged_custom_job + ): + _LOGGER.info(f"View backing custom job:\n{self._custom_job_console_uri()}") + + if self._gca_resource.training_task_inputs.get("tensorboard"): + _LOGGER.info(f"View tensorboard:\n{self._tensorboard_console_uri()}") + + self._has_logged_custom_job = True + + def _custom_job_console_uri(self) -> str: + """Helper method to compose the dashboard uri where custom job can be viewed.""" + custom_job_resource_name = self._gca_resource.training_task_metadata.get( + "backingCustomJob" + ) + return console_utils.custom_job_console_uri(custom_job_resource_name) + + def _tensorboard_console_uri(self) -> str: + """Helper method to compose dashboard uri where tensorboard can be viewed.""" + tensorboard_resource_name = self._gca_resource.training_task_inputs.get( + "tensorboard" + ) + custom_job_resource_name = self._gca_resource.training_task_metadata.get( + "backingCustomJob" + ) + return console_utils.custom_job_tensorboard_console_uri( + tensorboard_resource_name, custom_job_resource_name + ) + @property def _model_upload_fail_string(self) -> str: """Helper property for model upload failure.""" @@ -1372,6 +1433,7 @@ def run( validation_fraction_split: float = 0.1, test_fraction_split: float = 0.1, predefined_split_column_name: Optional[str] = None, + tensorboard: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Runs the custom training job. @@ -1512,6 +1574,21 @@ def run( ignored by the pipeline. Supported only for tabular and time series Datasets. + tensorboard (str): + Optional. The name of an Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -1550,6 +1627,7 @@ def run( validation_fraction_split=validation_fraction_split, test_fraction_split=test_fraction_split, predefined_split_column_name=predefined_split_column_name, + tensorboard=tensorboard, sync=sync, ) @@ -1578,6 +1656,7 @@ def _run( validation_fraction_split: float = 0.1, test_fraction_split: float = 0.1, predefined_split_column_name: Optional[str] = None, + tensorboard: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Packages local script and launches training_job. @@ -1665,6 +1744,21 @@ def _run( ignored by the pipeline. Supported only for tabular and time series Datasets. + tensorboard (str): + Optional. The name of an Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -1704,6 +1798,7 @@ def _run( base_output_dir=base_output_dir, service_account=service_account, network=network, + tensorboard=tensorboard, ) model = self._run_job( @@ -1960,6 +2055,7 @@ def run( validation_fraction_split: float = 0.1, test_fraction_split: float = 0.1, predefined_split_column_name: Optional[str] = None, + tensorboard: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Runs the custom training job. @@ -2093,6 +2189,21 @@ def run( ignored by the pipeline. Supported only for tabular and time series Datasets. + tensorboard (str): + Optional. The name of an Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2130,6 +2241,7 @@ def run( validation_fraction_split=validation_fraction_split, test_fraction_split=test_fraction_split, predefined_split_column_name=predefined_split_column_name, + tensorboard=tensorboard, sync=sync, ) @@ -2157,6 +2269,7 @@ def _run( validation_fraction_split: float = 0.1, test_fraction_split: float = 0.1, predefined_split_column_name: Optional[str] = None, + tensorboard: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Packages local script and launches training_job. @@ -2240,6 +2353,21 @@ def _run( ignored by the pipeline. Supported only for tabular and time series Datasets. + tensorboard (str): + Optional. The name of an Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2273,6 +2401,7 @@ def _run( base_output_dir=base_output_dir, service_account=service_account, network=network, + tensorboard=tensorboard, ) model = self._run_job( @@ -3791,6 +3920,7 @@ def run( validation_fraction_split: float = 0.1, test_fraction_split: float = 0.1, predefined_split_column_name: Optional[str] = None, + tensorboard: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Runs the custom training job. @@ -3924,6 +4054,21 @@ def run( ignored by the pipeline. Supported only for tabular and time series Datasets. + tensorboard (str): + Optional. The name of an Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -3956,6 +4101,7 @@ def run( test_fraction_split=test_fraction_split, predefined_split_column_name=predefined_split_column_name, bigquery_destination=bigquery_destination, + tensorboard=tensorboard, sync=sync, ) @@ -3983,6 +4129,7 @@ def _run( test_fraction_split: float = 0.1, predefined_split_column_name: Optional[str] = None, bigquery_destination: Optional[str] = None, + tensorboard: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Packages local script and launches training_job. @@ -4053,6 +4200,21 @@ def _run( ignored by the pipeline. Supported only for tabular and time series Datasets. + tensorboard (str): + Optional. The name of an Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -4086,6 +4248,7 @@ def _run( base_output_dir=base_output_dir, service_account=service_account, network=network, + tensorboard=tensorboard, ) model = self._run_job( diff --git a/google/cloud/aiplatform/utils/console_utils.py b/google/cloud/aiplatform/utils/console_utils.py new file mode 100644 index 0000000000..ff9baba4cf --- /dev/null +++ b/google/cloud/aiplatform/utils/console_utils.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- + +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from google.cloud.aiplatform import utils + + +def custom_job_console_uri(custom_job_resource_name: str) -> str: + """Helper method to create console uri from custom job resource name.""" + fields = utils.extract_fields_from_resource_name(custom_job_resource_name) + return f"https://console.cloud.google.com/ai/platform/locations/{fields.location}/training/{fields.id}?project={fields.project}" + + +def custom_job_tensorboard_console_uri( + tensorboard_resource_name: str, custom_job_resource_name: str +) -> str: + """Helper method to create console uri to tensorboard from custom job resource.""" + # projects+40556267596+locations+us-central1+tensorboards+740208820004847616+experiments+2214368039829241856 + fields = utils.extract_fields_from_resource_name(tensorboard_resource_name) + experiment_resource_name = f"{tensorboard_resource_name}/experiments/{custom_job_resource_name.split('/')[-1]}" + uri_experiment_resource_name = experiment_resource_name.replace("/", "+") + return f"https://{fields.location}.tensorboard.googleusercontent.com/experiment/{uri_experiment_resource_name}" diff --git a/tests/unit/aiplatform/test_end_to_end.py b/tests/unit/aiplatform/test_end_to_end.py index 4aede65f08..35006a3e95 100644 --- a/tests/unit/aiplatform/test_end_to_end.py +++ b/tests/unit/aiplatform/test_end_to_end.py @@ -47,6 +47,7 @@ from test_models import deploy_model_mock # noqa: F401 import test_training_jobs +from test_training_jobs import make_training_pipeline from test_training_jobs import mock_model_service_get # noqa: F401 from test_training_jobs import mock_pipeline_service_create # noqa: F401 from test_training_jobs import mock_pipeline_service_get # noqa: F401 @@ -264,7 +265,9 @@ def test_dataset_create_to_model_predict( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with( name=test_training_jobs._TEST_MODEL_NAME diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 75478263e8..cd9a6a4033 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -110,6 +110,12 @@ _TEST_NAME = ( f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/trainingPipelines/{_TEST_ID}" ) +_TEST_TENSORBOARD_RESOURCE_NAME = ( + f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/tensorboards/{_TEST_ID}" +) +_TEST_CUSTOM_JOB_RESOURCE_NAME = ( + f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/customJobs/{_TEST_ID}" +) _TEST_ALT_PROJECT = "test-project-alt" _TEST_ALT_LOCATION = "europe-west4" _TEST_NETWORK = f"projects/{_TEST_PROJECT}/global/networks/{_TEST_ID}" @@ -437,16 +443,51 @@ def mock_pipeline_service_create(): yield mock_create_training_pipeline +def make_training_pipeline(state): + return gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, + state=state, + model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME), + training_task_inputs={"tensorboard": _TEST_TENSORBOARD_RESOURCE_NAME}, + training_task_metadata={"backingCustomJob": _TEST_CUSTOM_JOB_RESOURCE_NAME}, + ) + + @pytest.fixture def mock_pipeline_service_get(): with mock.patch.object( pipeline_service_client.PipelineServiceClient, "get_training_pipeline" ) as mock_get_training_pipeline: - mock_get_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline( - name=_TEST_PIPELINE_RESOURCE_NAME, - state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED, - model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME), - ) + mock_get_training_pipeline.side_effect = [ + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING + ), + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ), + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ), + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ), + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ), + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ), + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ), + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ), + make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ), + ] + yield mock_get_training_pipeline @@ -614,6 +655,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT, test_fraction_split=_TEST_TEST_FRACTION_SPLIT, predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME, + tensorboard=_TEST_TENSORBOARD_RESOURCE_NAME, sync=sync, ) @@ -708,6 +750,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( }, "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, + "tensorboard": _TEST_TENSORBOARD_RESOURCE_NAME, }, struct_pb2.Value(), ), @@ -721,7 +764,9 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -733,6 +778,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + assert job._has_logged_custom_job + @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_bigquery_destination( self, @@ -880,7 +927,9 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -1116,7 +1165,9 @@ def test_run_call_pipeline_service_create_with_no_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -1407,7 +1458,9 @@ def test_run_call_pipeline_service_create_distributed_training( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -1636,7 +1689,9 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -1784,6 +1839,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT, test_fraction_split=_TEST_TEST_FRACTION_SPLIT, predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME, + service_account=_TEST_SERVICE_ACCOUNT, + tensorboard=_TEST_TENSORBOARD_RESOURCE_NAME, sync=sync, ) @@ -1869,6 +1926,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( "base_output_directory": { "output_uri_prefix": _TEST_BASE_OUTPUT_DIR }, + "service_account": _TEST_SERVICE_ACCOUNT, + "tensorboard": _TEST_TENSORBOARD_RESOURCE_NAME, }, struct_pb2.Value(), ), @@ -1882,7 +1941,9 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -1894,6 +1955,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + assert job._has_logged_custom_job + @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_bigquery_destination( self, @@ -2031,7 +2094,9 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -2249,7 +2314,9 @@ def test_run_call_pipeline_service_create_with_no_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -2515,7 +2582,9 @@ def test_run_call_pipeline_service_create_distributed_training( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -2661,7 +2730,9 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -3110,7 +3181,9 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -3262,7 +3335,9 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_without_model_dis training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -3413,7 +3488,9 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -3636,7 +3713,9 @@ def test_run_call_pipeline_service_create_with_no_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -3910,7 +3989,9 @@ def test_run_call_pipeline_service_create_distributed_training( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -3964,6 +4045,8 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( accelerator_type=_TEST_ACCELERATOR_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, model_display_name=_TEST_MODEL_DISPLAY_NAME, + service_account=_TEST_SERVICE_ACCOUNT, + tensorboard=_TEST_TENSORBOARD_RESOURCE_NAME, sync=sync, ) @@ -4042,6 +4125,8 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( "base_output_directory": { "output_uri_prefix": _TEST_BASE_OUTPUT_DIR }, + "service_account": _TEST_SERVICE_ACCOUNT, + "tensorboard": _TEST_TENSORBOARD_RESOURCE_NAME, }, struct_pb2.Value(), ), @@ -4054,7 +4139,9 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( training_pipeline=true_training_pipeline, ) - assert job._gca_resource is mock_pipeline_service_get.return_value + assert job._gca_resource == make_training_pipeline( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) @@ -4066,6 +4153,8 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + assert job._has_logged_custom_job + def test_run_call_pipeline_service_create_with_nontabular_dataset_raises_if_annotation_schema_uri( self, mock_nontabular_dataset, ):