From df68ec3441eeb7670531f50aaed00df6f7e2a1a3 Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Mon, 26 Jul 2021 13:30:23 -0700 Subject: [PATCH] fix: create pipeline job with user-specified job id (#567) * fix: create pipeline job with user-specified job id * Add comments for pipelineJob name not used for service --- google/cloud/aiplatform/pipeline_jobs.py | 6 +++- tests/unit/aiplatform/test_pipeline_jobs.py | 31 +++++++++++---------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 4d26dbf4c6..7d252725ef 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -249,8 +249,12 @@ def run( _LOGGER.log_create_with_lro(self.__class__) + # PipelineJob.name is not used by pipeline service + pipeline_job_id = self._gca_resource.name.split("/")[-1] self._gca_resource = self.api_client.create_pipeline_job( - parent=self._parent, pipeline_job=self._gca_resource + parent=self._parent, + pipeline_job=self._gca_resource, + pipeline_job_id=pipeline_job_id, ) _LOGGER.log_create_complete_with_getter( diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index adfa08f979..ae740b1bcc 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -40,6 +40,7 @@ _TEST_PROJECT = "test-project" _TEST_LOCATION = "us-central1" +_TEST_PIPELINE_JOB_DISPLAY_NAME = "sample-pipeline-job-display-name" _TEST_PIPELINE_JOB_ID = "sample-test-pipeline-202111111" _TEST_GCS_BUCKET_NAME = "my-bucket" _TEST_CREDENTIALS = auth_credentials.AnonymousCredentials() @@ -199,7 +200,7 @@ def test_run_call_pipeline_service_create( ) job = pipeline_jobs.PipelineJob( - display_name=_TEST_PIPELINE_JOB_ID, + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, template_path=_TEST_TEMPLATE_PATH, job_id=_TEST_PIPELINE_JOB_ID, parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, @@ -222,7 +223,7 @@ def test_run_call_pipeline_service_create( # Construct expected request expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob( - display_name=_TEST_PIPELINE_JOB_ID, + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, name=_TEST_PIPELINE_JOB_NAME, pipeline_spec={ "components": {}, @@ -233,7 +234,9 @@ def test_run_call_pipeline_service_create( ) mock_pipeline_service_create.assert_called_once_with( - parent=_TEST_PARENT, pipeline_job=expected_gapic_pipeline_job, + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=_TEST_PIPELINE_JOB_ID, ) mock_pipeline_service_get.assert_called_with(name=_TEST_PIPELINE_JOB_NAME) @@ -242,6 +245,14 @@ def test_run_call_pipeline_service_create( gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED ) + @pytest.mark.usefixtures("mock_pipeline_service_get") + def test_get_pipeline_job(self, mock_pipeline_service_get): + aiplatform.init(project=_TEST_PROJECT) + job = pipeline_jobs.PipelineJob.get(resource_name=_TEST_PIPELINE_JOB_ID) + + mock_pipeline_service_get.assert_called_once_with(name=_TEST_PIPELINE_JOB_NAME) + assert isinstance(job, pipeline_jobs.PipelineJob) + @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", "mock_load_json", ) @@ -255,7 +266,7 @@ def test_cancel_pipeline_job( ) job = pipeline_jobs.PipelineJob( - display_name=_TEST_PIPELINE_JOB_ID, + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, template_path=_TEST_TEMPLATE_PATH, job_id=_TEST_PIPELINE_JOB_ID, ) @@ -267,14 +278,6 @@ def test_cancel_pipeline_job( name=_TEST_PIPELINE_JOB_NAME ) - @pytest.mark.usefixtures("mock_pipeline_service_get") - def test_get_training_job(self, mock_pipeline_service_get): - aiplatform.init(project=_TEST_PROJECT) - job = pipeline_jobs.PipelineJob.get(resource_name=_TEST_PIPELINE_JOB_ID) - - mock_pipeline_service_get.assert_called_once_with(name=_TEST_PIPELINE_JOB_NAME) - assert isinstance(job, pipeline_jobs.PipelineJob) - @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", "mock_load_json", ) @@ -288,7 +291,7 @@ def test_cancel_pipeline_job_without_running( ) job = pipeline_jobs.PipelineJob( - display_name=_TEST_PIPELINE_JOB_ID, + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, template_path=_TEST_TEMPLATE_PATH, job_id=_TEST_PIPELINE_JOB_ID, ) @@ -313,7 +316,7 @@ def test_pipeline_failure_raises(self, sync): ) job = pipeline_jobs.PipelineJob( - display_name=_TEST_PIPELINE_JOB_ID, + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, template_path=_TEST_TEMPLATE_PATH, job_id=_TEST_PIPELINE_JOB_ID, parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,