From 7ab05d5e127636d96365b7ea408974ccd6c2f0fe Mon Sep 17 00:00:00 2001 From: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> Date: Wed, 27 Oct 2021 16:38:25 -0400 Subject: [PATCH] feat: Add PipelineJob.submit to create PipelineJob without monitoring it's completion. (#798) --- README.rst | 37 +++++++++++-- google/cloud/aiplatform/base.py | 2 +- google/cloud/aiplatform/pipeline_jobs.py | 29 +++++++++- tests/unit/aiplatform/test_pipeline_jobs.py | 59 +++++++++++++++++++++ 4 files changed, 120 insertions(+), 7 deletions(-) diff --git a/README.rst b/README.rst index c18aa28ccc..8673fe0a81 100644 --- a/README.rst +++ b/README.rst @@ -358,14 +358,12 @@ To delete an endpoint: Pipelines --------- -To create a Vertex Pipeline run: +To create a Vertex Pipeline run and monitor until completion: .. code-block:: Python # Instantiate PipelineJob object pl = PipelineJob( - # Display name is required but seemingly not used - # see https://github.com/googleapis/python-aiplatform/blob/9dcf6fb0bc8144d819938a97edf4339fe6f2e1e6/google/cloud/aiplatform/pipeline_jobs.py#L260 display_name="My first pipeline", # Whether or not to enable caching @@ -384,7 +382,7 @@ To create a Vertex Pipeline run: pipeline_root=pipeline_root, ) - # Execute pipeline in Vertex + # Execute pipeline in Vertex and monitor until completion pl.run( # Email address of service account to use for the pipeline run # You must have iam.serviceAccounts.actAs permission on the service account to use it @@ -395,6 +393,37 @@ To create a Vertex Pipeline run: sync=True ) +To create a Vertex Pipeline without monitoring until completion, use `submit` instead of `run`: + +.. code-block:: Python + + # Instantiate PipelineJob object + pl = PipelineJob( + display_name="My first pipeline", + + # Whether or not to enable caching + # True = always cache pipeline step result + # False = never cache pipeline step result + # None = defer to cache option for each pipeline component in the pipeline definition + enable_caching=False, + + # Local or GCS path to a compiled pipeline definition + template_path="pipeline.json", + + # Dictionary containing input parameters for your pipeline + parameter_values=parameter_values, + + # GCS path to act as the pipeline root + pipeline_root=pipeline_root, + ) + + # Submit the Pipeline to Vertex + pl.submit( + # Email address of service account to use for the pipeline run + # You must have iam.serviceAccounts.actAs permission on the service account to use it + service_account=service_account, + ) + Explainable AI: Get Metadata ---------------------------- diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index a1ee4d348d..c4eb2e4853 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -671,7 +671,7 @@ def wrapper(*args, **kwargs): # if sync then wait for any Futures to complete and execute if sync: if self: - self.wait() + VertexAiResourceNounWithFutureManager.wait(self) return method(*args, **kwargs) # callbacks to call within the Future (in same Thread) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index e719c0d5fd..1a49ea7fea 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -232,7 +232,7 @@ def run( network: Optional[str] = None, sync: Optional[bool] = True, ) -> None: - """Run this configured PipelineJob. + """Run this configured PipelineJob and monitor the job until completion. Args: service_account (str): @@ -247,6 +247,26 @@ def run( sync (bool): Optional. Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future. """ + self.submit(service_account=service_account, network=network) + + self._block_until_complete() + + def submit( + self, service_account: Optional[str] = None, network: Optional[str] = None, + ) -> None: + """Run this configured PipelineJob. + + Args: + service_account (str): + Optional. Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. + network (str): + Optional. The full name of the Compute Engine network to which the job + should be peered. For example, projects/12345/global/networks/myVPC. + + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network. + """ if service_account: self._gca_resource.service_account = service_account @@ -267,7 +287,12 @@ def run( _LOGGER.info("View Pipeline Job:\n%s" % self._dashboard_uri()) - self._block_until_complete() + def wait(self): + """Wait for thie PipelineJob to complete.""" + if self._latest_future is None: + self._block_until_complete() + else: + super().wait() @property def pipeline_spec(self): diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index f8a62a19c3..098aef2570 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -275,6 +275,65 @@ def test_run_call_pipeline_service_pipeline_job_create( gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED ) + @pytest.mark.usefixtures("mock_load_pipeline_job_json") + def test_submit_call_pipeline_service_pipeline_job_create( + self, mock_pipeline_service_create, mock_pipeline_service_get + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + enable_caching=True, + ) + + job.submit(service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK) + + expected_runtime_config_dict = { + "gcs_output_directory": _TEST_GCS_BUCKET_NAME, + "parameters": {"name_param": {"stringValue": "hello"}}, + } + runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + pipeline_spec={ + "components": {}, + "pipelineInfo": _TEST_PIPELINE_JOB["pipelineSpec"]["pipelineInfo"], + "root": _TEST_PIPELINE_JOB["pipelineSpec"]["root"], + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=_TEST_PIPELINE_JOB_ID, + ) + + assert not mock_pipeline_service_get.called + + job.wait() + + mock_pipeline_service_get.assert_called_with( + name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY + ) + + assert job._gca_resource == make_pipeline_job( + gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + @pytest.mark.usefixtures("mock_load_pipeline_spec_json") @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_pipeline_spec_create(