Skip to content

Commit

Permalink
feat: Add PipelineJob.submit to create PipelineJob without monitoring…
Browse files Browse the repository at this point in the history
… it's completion. (#798)
  • Loading branch information
sasha-gitg committed Oct 27, 2021
1 parent 45401c0 commit 7ab05d5
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 7 deletions.
37 changes: 33 additions & 4 deletions README.rst
Expand Up @@ -358,14 +358,12 @@ To delete an endpoint:
Pipelines
---------

To create a Vertex Pipeline run:
To create a Vertex Pipeline run and monitor until completion:

.. code-block:: Python
# Instantiate PipelineJob object
pl = PipelineJob(
# Display name is required but seemingly not used
# see https://github.com/googleapis/python-aiplatform/blob/9dcf6fb0bc8144d819938a97edf4339fe6f2e1e6/google/cloud/aiplatform/pipeline_jobs.py#L260
display_name="My first pipeline",
# Whether or not to enable caching
Expand All @@ -384,7 +382,7 @@ To create a Vertex Pipeline run:
pipeline_root=pipeline_root,
)
# Execute pipeline in Vertex
# Execute pipeline in Vertex and monitor until completion
pl.run(
# Email address of service account to use for the pipeline run
# You must have iam.serviceAccounts.actAs permission on the service account to use it
Expand All @@ -395,6 +393,37 @@ To create a Vertex Pipeline run:
sync=True
)
To create a Vertex Pipeline without monitoring until completion, use `submit` instead of `run`:

.. code-block:: Python
# Instantiate PipelineJob object
pl = PipelineJob(
display_name="My first pipeline",
# Whether or not to enable caching
# True = always cache pipeline step result
# False = never cache pipeline step result
# None = defer to cache option for each pipeline component in the pipeline definition
enable_caching=False,
# Local or GCS path to a compiled pipeline definition
template_path="pipeline.json",
# Dictionary containing input parameters for your pipeline
parameter_values=parameter_values,
# GCS path to act as the pipeline root
pipeline_root=pipeline_root,
)
# Submit the Pipeline to Vertex
pl.submit(
# Email address of service account to use for the pipeline run
# You must have iam.serviceAccounts.actAs permission on the service account to use it
service_account=service_account,
)
Explainable AI: Get Metadata
----------------------------
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/aiplatform/base.py
Expand Up @@ -671,7 +671,7 @@ def wrapper(*args, **kwargs):
# if sync then wait for any Futures to complete and execute
if sync:
if self:
self.wait()
VertexAiResourceNounWithFutureManager.wait(self)
return method(*args, **kwargs)

# callbacks to call within the Future (in same Thread)
Expand Down
29 changes: 27 additions & 2 deletions google/cloud/aiplatform/pipeline_jobs.py
Expand Up @@ -232,7 +232,7 @@ def run(
network: Optional[str] = None,
sync: Optional[bool] = True,
) -> None:
"""Run this configured PipelineJob.
"""Run this configured PipelineJob and monitor the job until completion.
Args:
service_account (str):
Expand All @@ -247,6 +247,26 @@ def run(
sync (bool):
Optional. Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future.
"""
self.submit(service_account=service_account, network=network)

self._block_until_complete()

def submit(
self, service_account: Optional[str] = None, network: Optional[str] = None,
) -> None:
"""Run this configured PipelineJob.
Args:
service_account (str):
Optional. Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
network (str):
Optional. The full name of the Compute Engine network to which the job
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
If left unspecified, the job is not peered with any network.
"""
if service_account:
self._gca_resource.service_account = service_account

Expand All @@ -267,7 +287,12 @@ def run(

_LOGGER.info("View Pipeline Job:\n%s" % self._dashboard_uri())

self._block_until_complete()
def wait(self):
"""Wait for thie PipelineJob to complete."""
if self._latest_future is None:
self._block_until_complete()
else:
super().wait()

@property
def pipeline_spec(self):
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/aiplatform/test_pipeline_jobs.py
Expand Up @@ -275,6 +275,65 @@ def test_run_call_pipeline_service_pipeline_job_create(
gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED
)

@pytest.mark.usefixtures("mock_load_pipeline_job_json")
def test_submit_call_pipeline_service_pipeline_job_create(
self, mock_pipeline_service_create, mock_pipeline_service_get
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
enable_caching=True,
)

job.submit(service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK)

expected_runtime_config_dict = {
"gcs_output_directory": _TEST_GCS_BUCKET_NAME,
"parameters": {"name_param": {"stringValue": "hello"}},
}
runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb
json_format.ParseDict(expected_runtime_config_dict, runtime_config)

# Construct expected request
expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
pipeline_spec={
"components": {},
"pipelineInfo": _TEST_PIPELINE_JOB["pipelineSpec"]["pipelineInfo"],
"root": _TEST_PIPELINE_JOB["pipelineSpec"]["root"],
},
runtime_config=runtime_config,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
)

mock_pipeline_service_create.assert_called_once_with(
parent=_TEST_PARENT,
pipeline_job=expected_gapic_pipeline_job,
pipeline_job_id=_TEST_PIPELINE_JOB_ID,
)

assert not mock_pipeline_service_get.called

job.wait()

mock_pipeline_service_get.assert_called_with(
name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY
)

assert job._gca_resource == make_pipeline_job(
gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED
)

@pytest.mark.usefixtures("mock_load_pipeline_spec_json")
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_service_pipeline_spec_create(
Expand Down

0 comments on commit 7ab05d5

Please sign in to comment.