From 57fe05da79d53799bed8fea14feeed15a8676a08 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Sat, 12 Jun 2021 16:57:08 -0500 Subject: [PATCH 01/10] Add most missing fields --- google/cloud/aiplatform/base.py | 60 +++++++++++++++- google/cloud/aiplatform/datasets/dataset.py | 6 ++ google/cloud/aiplatform/models.py | 65 +++++++++++++++-- google/cloud/aiplatform/training_jobs.py | 77 +++++++++++++++++++-- 4 files changed, 199 insertions(+), 9 deletions(-) diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index 07e4c2fe4a..1aee67013f 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -42,7 +42,8 @@ from google.auth import credentials as auth_credentials from google.cloud.aiplatform import initializer from google.cloud.aiplatform import utils - +from google.cloud.aiplatform.compat.types import encryption_spec as gca_encryption_spec +from google.rpc import status_pb2 logging.basicConfig(level=logging.INFO, stream=sys.stdout) @@ -563,6 +564,63 @@ def update_time(self) -> datetime.datetime: self._sync_gca_resource() return self._gca_resource.update_time + @property + def start_time(self) -> Optional[datetime.datetime]: + """Time when the Pipline or Job entered the `JOB_STATE_RUNNING` or + `PIPELINE_STATE_RUNNING` for the first time. Only for Job or Pipeline resources.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "start_time") + + @property + def end_time(self) -> Optional[datetime.datetime]: + """Time when the Job resource entered the `JOB_STATE_SUCCEEDED`, + `JOB_STATE_FAILED`, `JOB_STATE_CANCELLED` state, or when the Pipeline resource + entered the `PIPELINE_STATE_SUCCEEDED`, `PIPELINE_STATE_FAILED`, + `PIPELINE_STATE_CANCELLED` state. Only for Job or Pipeline resources.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "end_time") + + @property + def error(self) -> Optional[status_pb2.Status]: + """Detailed error info for Job or Pipeline resources. + Only populated when the Pipeline's state is `PIPELINE_STATE_FAILED` or + `PIPELINE_STATE_CANCELLED` or when the Job's state is `JOB_STATE_FAILED` + or `JOB_STATE_CANCELLED`.""" + + self._sync_gca_resource() + return getattr(self._gca_resource, "error") + + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + Vertex AI resource should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the resource is not peered with any network. + """ + return getattr(self._gca_resource, "network") + + @property + def encryption_spec(self) -> Optional[gca_encryption_spec.EncryptionSpec]: + """Customer-managed encryption key options for this Vertex AI resource. + + If this is set, then all resources created by this Vertex AI resource will + be encrypted with the provided encryption key. + """ + return getattr(self._gca_resource, "encryption_spec") + + @property + def labels(self) -> Dict[str, str]: + """User-defined labels containing metadata about this resource. + + Read more about labels at https://goo.gl/xmQnxf + """ + return self._gca_resource.labels + @property def gca_resource(self) -> proto.Message: """The underlying resource proto represenation.""" diff --git a/google/cloud/aiplatform/datasets/dataset.py b/google/cloud/aiplatform/datasets/dataset.py index 1eb1663b2b..09d3980b9d 100644 --- a/google/cloud/aiplatform/datasets/dataset.py +++ b/google/cloud/aiplatform/datasets/dataset.py @@ -81,6 +81,12 @@ def __init__( self._gca_resource = self._get_gca_resource(resource_name=dataset_name) self._validate_metadata_schema_uri() + # TODO(b/190008971): Use `data_item_count` once available + def __len__(self): + return list( + self.api_client.list_data_items(parent=self.resource_name) + ).__len__() + @property def metadata_schema_uri(self) -> str: """The metadata schema uri of this dataset resource.""" diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index b93f569eaa..b0669e2cf4 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -20,6 +20,7 @@ from google.api_core import operation from google.auth import credentials as auth_credentials +from google.cloud import aiplatform from google.cloud.aiplatform import base from google.cloud.aiplatform import compat from google.cloud.aiplatform import explain @@ -119,6 +120,11 @@ def __init__( credentials=credentials, ) + @property + def traffic_split(self): + self._sync_gca_resource() + return dict(self._gca_resource.traffic_split) + @classmethod def create( cls, @@ -1211,12 +1217,13 @@ class Model(base.VertexAiResourceNounWithFutureManager): _delete_method = "delete_model" @property - def uri(self): - """Uri of the model.""" - return self._gca_resource.artifact_uri + def uri(self) -> Optional[str]: + """Path to the directory containing the Model artifact and any of its + supporting files. Not present for AutoML Models.""" + return self._gca_resource.artifact_uri or None @property - def description(self): + def description(self) -> str: """Description of the model.""" return self._gca_resource.description @@ -1240,6 +1247,56 @@ def supported_export_formats( for export_format in self._gca_resource.supported_export_formats } + @property + def supported_deployment_resources_types( + self, + ) -> List[aiplatform.gapic.Model.DeploymentResourcesType]: + return list(self._gca_resource.supported_deployment_resources_types) + + @property + def supported_input_storage_formats(self) -> List[str]: + """The formats this Model supports in the `input_config` field of a + `BatchPredictionJob`. If `Model.predict_schemata.instance_schema_uri` + exists, the instances should be given as per that schema. + + [Read the docs for more on batch prediction formats](https://cloud.google.com/vertex-ai/docs/predictions/batch-predictions#batch_request_input) + + If this Model doesn't support any of these formats it means it cannot be + used with a `BatchPredictionJob`. + """ + return list(self._gca_resource.supported_input_storage_formats) + + @property + def supported_output_storage_formats(self) -> List[str]: + return list(self._gca_resource.supported_output_storage_formats) + + @property + def predict_schemata(self) -> Optional[aiplatform.gapic.PredictSchemata]: + """The schemata that describe formats of the Model's predictions and + explanations, if available.""" + return getattr(self._gca_resource, "predict_schemata") + + @property + def training_job(self) -> Optional["aiplatform.training_jobs._TrainingJob"]: + """The TrainingJob that uploaded this Model, if any.""" + job_name = self._gca_resource.training_pipeline + + if not job_name: + return None + + return aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + resource_name=job_name, + project=self.project, + location=self.location, + credentials=self.credentials, + ) + + @property + def container_spec(self) -> Optional[aiplatform.gapic.ModelContainerSpec]: + """The specification of the container that is to be used when deploying + this Model. Not present for AutoML Models.""" + return getattr(self._gca_resource, "container_spec") + def __init__( self, model_name: str, diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 91e061f4ba..ef516e3027 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -21,6 +21,7 @@ import abc from google.auth import credentials as auth_credentials +from google.api_core import exceptions as api_exceptions from google.cloud.aiplatform import base from google.cloud.aiplatform import constants from google.cloud.aiplatform import datasets @@ -135,7 +136,6 @@ def __init__( @abc.abstractmethod def _supported_training_schemas(cls) -> Tuple[str]: """List of supported schemas for this training job.""" - pass @classmethod @@ -152,10 +152,10 @@ def get( resource_name (str): Required. A fully-qualified resource name or ID. project (str): - Optional project to retrieve dataset from. If not set, project + Optional project to retrieve training job from. If not set, project set in aiplatform.init will be used. location (str): - Optional location to retrieve dataset from. If not set, location + Optional location to retrieve training job from. If not set, location set in aiplatform.init will be used. credentials (auth_credentials.Credentials): Custom credentials to use to upload this model. Overrides @@ -193,11 +193,80 @@ def get( return self + @classmethod + def _get_and_return_subclass( + cls, + resource_name: str, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> "_TrainingJob": + """Retrieve Training Job subclass for the given resource_name without + knowing the training_task_definition. + + Example usage: + ``` + aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + 'projects/.../locations/.../trainingPipelines/12345' + ) + # Returns: + ``` + + Args: + resource_name (str): + Required. A fully-qualified resource name or ID. + project (str): + Optional project to retrieve dataset from. If not set, project + set in aiplatform.init will be used. + location (str): + Optional location to retrieve dataset from. If not set, location + set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Custom credentials to use to upload this model. Overrides + credentials set in aiplatform.init. + + Raises: + api_core.exceptions.NotFound: If the provided training job's resource + name cannot be found on the Vertex service. + + Returns: + An Vertex AI Training Job + """ + + # Retrieve training pipeline resource before class construction + client = cls._instantiate_client(location=location, credentials=credentials) + + try: + gca_training_pipeline = getattr(client, cls._getter_method)( + name=resource_name + ) + except api_exceptions.NotFound: + raise api_exceptions.NotFound( + "The training job used to create this model could not be found:" + f" {resource_name}" + ) + + schema_uri = gca_training_pipeline.training_task_definition + + # Collect all AutoML training job classes and CustomTrainingJob + class_list = [ + c for c in cls.__subclasses__() if c.__name__.startswith("AutoML") + ] + [CustomTrainingJob] + + # Identify correct training job subclass, construct and return object + for c in class_list: + if schema_uri in c._supported_training_schemas: + return c._empty_constructor( + project=project, + location=location, + credentials=credentials, + resource_name=resource_name, + ) + @property @abc.abstractmethod def _model_upload_fail_string(self) -> str: """Helper property for model upload failure.""" - pass @abc.abstractmethod From fadd8486340d2768e514dc561a10f1833b329ace Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Sat, 12 Jun 2021 16:57:56 -0500 Subject: [PATCH 02/10] Add tests for get trainingjob subclass --- tests/unit/aiplatform/test_training_jobs.py | 43 +++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 75478263e8..e6eda5b351 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -29,6 +29,7 @@ from unittest.mock import patch from google.auth import credentials as auth_credentials +from google.api_core import exceptions as api_core_exceptions from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import source_utils @@ -206,6 +207,18 @@ def get_training_job_tabular_mock(): yield get_training_job_tabular_mock +@pytest.fixture +def get_training_job_non_existent_mock(): + with patch.object( + pipeline_service_client.PipelineServiceClient, "get_training_pipeline" + ) as get_training_job_non_existent_mock: + get_training_job_non_existent_mock.side_effect = api_core_exceptions.NotFound( + "404" + ) + + yield get_training_job_non_existent_mock + + @pytest.fixture def mock_client_bucket(): with patch.object(storage.Client, "bucket") as mock_client_bucket: @@ -1497,6 +1510,36 @@ def test_get_training_job_with_project_and_alt_location(self): location=_TEST_ALT_LOCATION, ) + @pytest.mark.usefixtures("get_training_job_tabular_mock") + def test_get_and_return_subclass_automl(self): + subcls = aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + resource_name=_TEST_PIPELINE_RESOURCE_NAME + ) + + assert isinstance(subcls, aiplatform.training_jobs.AutoMLTabularTrainingJob) + + @pytest.mark.usefixtures("get_training_job_custom_mock") + def test_get_and_return_subclass_custom(self): + subcls = aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + resource_name=_TEST_PIPELINE_RESOURCE_NAME + ) + + assert isinstance(subcls, aiplatform.training_jobs.CustomTrainingJob) + + @pytest.mark.usefixtures("get_training_job_non_existent_mock") + def test_get_and_return_subclass_not_found(self): + with pytest.raises(api_core_exceptions.NotFound) as e: + aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + resource_name=_TEST_PIPELINE_RESOURCE_NAME + ) + + assert e.match( + regexp=( + r"The training job used to create this model could not be found: " + fr"{_TEST_PIPELINE_RESOURCE_NAME}" + ) + ) + @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset( self, From 8ace6c1245e331709dc25481a692363a7c680fe7 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Mon, 14 Jun 2021 18:57:39 -0500 Subject: [PATCH 03/10] Drop Dataset len, add more attrs, update docstrings --- google/cloud/aiplatform/base.py | 1 - google/cloud/aiplatform/datasets/dataset.py | 6 -- google/cloud/aiplatform/jobs.py | 24 +++++++ google/cloud/aiplatform/models.py | 69 +++++++++++++++++---- google/cloud/aiplatform/training_jobs.py | 14 +---- tests/unit/aiplatform/test_models.py | 47 ++++++++++++++ tests/unit/aiplatform/test_training_jobs.py | 27 -------- 7 files changed, 128 insertions(+), 60 deletions(-) diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index 1aee67013f..ac7cb00066 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -586,7 +586,6 @@ def error(self) -> Optional[status_pb2.Status]: Only populated when the Pipeline's state is `PIPELINE_STATE_FAILED` or `PIPELINE_STATE_CANCELLED` or when the Job's state is `JOB_STATE_FAILED` or `JOB_STATE_CANCELLED`.""" - self._sync_gca_resource() return getattr(self._gca_resource, "error") diff --git a/google/cloud/aiplatform/datasets/dataset.py b/google/cloud/aiplatform/datasets/dataset.py index fe09423c23..df402d0c99 100644 --- a/google/cloud/aiplatform/datasets/dataset.py +++ b/google/cloud/aiplatform/datasets/dataset.py @@ -81,12 +81,6 @@ def __init__( self._gca_resource = self._get_gca_resource(resource_name=dataset_name) self._validate_metadata_schema_uri() - # TODO(b/190008971): Use `data_item_count` once available - def __len__(self): - return list( - self.api_client.list_data_items(parent=self.resource_name) - ).__len__() - @property def metadata_schema_uri(self) -> str: """The metadata schema uri of this dataset resource.""" diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 89ea52097d..283f104a91 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -23,6 +23,8 @@ import time import logging +from google.rpc import status_pb2 + from google.cloud import storage from google.cloud import bigquery @@ -45,6 +47,7 @@ batch_prediction_job as gca_bp_job_compat, batch_prediction_job_v1 as gca_bp_job_v1, batch_prediction_job_v1beta1 as gca_bp_job_v1beta1, + completion_stats as gca_completion_stats, custom_job as gca_custom_job_compat, custom_job_v1beta1 as gca_custom_job_v1beta1, explanation_v1beta1 as gca_explanation_v1beta1, @@ -302,6 +305,27 @@ def __init__( credentials=credentials, ) + @property + def output_info(self): + """Information describing the output of this job, including output location + into which prediction output is written. + + This is only available for batch predicition jobs that have run successfully. + """ + return self._gca_resource.output_info + + @property + def partial_failures(self) -> Optional[Sequence[status_pb2.Status]]: + """Partial failures encountered. For example, single files that can't be read. + This field never exceeds 20 entries. Status details fields contain standard + GCP error details.""" + return getattr(self._gca_resource, "partial_failures") + + @property + def completion_stats(self) -> Optional[gca_completion_stats.CompletionStats]: + """Statistics on completed and failed prediction instances.""" + return getattr(self._gca_resource, "completion_stats") + @classmethod def create( cls, diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index b0669e2cf4..83a7805d5e 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -18,6 +18,7 @@ from typing import Dict, List, NamedTuple, Optional, Sequence, Tuple, Union from google.api_core import operation +from google.api_core import exceptions as api_exceptions from google.auth import credentials as auth_credentials from google.cloud import aiplatform @@ -121,7 +122,15 @@ def __init__( ) @property - def traffic_split(self): + def traffic_split(self) -> dict: + """A map from a DeployedModel's ID to the percentage of this Endpoint's + traffic that should be forwarded to that DeployedModel. + + If a DeployedModel's ID is not listed in this map, then it receives no traffic. + + The traffic percentage values must add up to 100, or map must be empty if + the Endpoint is to not accept any traffic at a moment. + """ self._sync_gca_resource() return dict(self._gca_resource.traffic_split) @@ -1255,19 +1264,37 @@ def supported_deployment_resources_types( @property def supported_input_storage_formats(self) -> List[str]: - """The formats this Model supports in the `input_config` field of a + """The formats this Model supports in the `input_config` field of a `BatchPredictionJob`. If `Model.predict_schemata.instance_schema_uri` exists, the instances should be given as per that schema. - + [Read the docs for more on batch prediction formats](https://cloud.google.com/vertex-ai/docs/predictions/batch-predictions#batch_request_input) - + If this Model doesn't support any of these formats it means it cannot be - used with a `BatchPredictionJob`. + used with a `BatchPredictionJob`. However, if it has + `supported_deployment_resources_types`, it could serve online predictions + by using `Endpoint.predict()` or `Endpoint.explain()`. """ return list(self._gca_resource.supported_input_storage_formats) @property def supported_output_storage_formats(self) -> List[str]: + """The formats this Model supports in the `output_config` field of a + `BatchPredictionJob`. + + If both `Model.predict_schemata.instance_schema_uri` and + `Model.predict_schemata.prediction_schema_uri` exist, the predictions + are returned together with their instances. In other words, the + prediction has the original instance data first, followed by the actual + prediction content (as per the schema). + + [Read the docs for more on batch prediction formats](https://cloud.google.com/vertex-ai/docs/predictions/batch-predictions) + + If this Model doesn't support any of these formats it means it cannot be + used with a `BatchPredictionJob`. However, if it has + `supported_deployment_resources_types`, it could serve online predictions + by using `Endpoint.predict()` or `Endpoint.explain()`. + """ return list(self._gca_resource.supported_output_storage_formats) @property @@ -1278,18 +1305,28 @@ def predict_schemata(self) -> Optional[aiplatform.gapic.PredictSchemata]: @property def training_job(self) -> Optional["aiplatform.training_jobs._TrainingJob"]: - """The TrainingJob that uploaded this Model, if any.""" - job_name = self._gca_resource.training_pipeline + """The TrainingJob that uploaded this Model, if any. + + Raises: + api_core.exceptions.NotFound: If the Model's training job resource + cannot be found on the Vertex service. + """ + job_name = getattr(self._gca_resource, "training_pipeline") if not job_name: return None - return aiplatform.training_jobs._TrainingJob._get_and_return_subclass( - resource_name=job_name, - project=self.project, - location=self.location, - credentials=self.credentials, - ) + try: + return aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + resource_name=job_name, + project=self.project, + location=self.location, + credentials=self.credentials, + ) + except api_exceptions.NotFound: + raise api_exceptions.NotFound( + f"The training job used to create this model could not be found: {job_name}" + ) @property def container_spec(self) -> Optional[aiplatform.gapic.ModelContainerSpec]: @@ -1297,6 +1334,12 @@ def container_spec(self) -> Optional[aiplatform.gapic.ModelContainerSpec]: this Model. Not present for AutoML Models.""" return getattr(self._gca_resource, "container_spec") + @property + def artifact_uri(self) -> Optional[str]: + """The path to the directory containing the Model artifact and any of its + supporting files. Not present for AutoML Models.""" + return getattr(self._gca_resource, "artifact_uri") + def __init__( self, model_name: str, diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 778b45d70e..00c9d0ca2d 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -226,10 +226,6 @@ def _get_and_return_subclass( Custom credentials to use to upload this model. Overrides credentials set in aiplatform.init. - Raises: - api_core.exceptions.NotFound: If the provided training job's resource - name cannot be found on the Vertex service. - Returns: An Vertex AI Training Job """ @@ -237,15 +233,7 @@ def _get_and_return_subclass( # Retrieve training pipeline resource before class construction client = cls._instantiate_client(location=location, credentials=credentials) - try: - gca_training_pipeline = getattr(client, cls._getter_method)( - name=resource_name - ) - except api_exceptions.NotFound: - raise api_exceptions.NotFound( - "The training job used to create this model could not be found:" - f" {resource_name}" - ) + gca_training_pipeline = getattr(client, cls._getter_method)(name=resource_name) schema_uri = gca_training_pipeline.training_task_definition diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index ad84fde65b..be4f7f61bd 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -19,8 +19,10 @@ from concurrent import futures import pytest from unittest import mock +from unittest.mock import patch from google.api_core import operation as ga_operation +from google.api_core import exceptions as api_exceptions from google.auth import credentials as auth_credentials from google.cloud import aiplatform @@ -58,6 +60,7 @@ from google.cloud.aiplatform_v1.services.model_service import ( client as model_service_client, ) +from google.cloud.aiplatform.compat.services import pipeline_service_client from google.cloud.aiplatform_v1.types import ( batch_prediction_job as gca_batch_prediction_job, io as gca_io, @@ -100,6 +103,10 @@ _TEST_STARTING_REPLICA_COUNT = 2 _TEST_MAX_REPLICA_COUNT = 12 +_TEST_PIPELINE_RESOURCE_NAME = ( + "projects/my-project/locations/us-central1/trainingPipeline/12345" +) + _TEST_BATCH_PREDICTION_GCS_SOURCE = "gs://example-bucket/folder/instance.jsonl" _TEST_BATCH_PREDICTION_GCS_SOURCE_LIST = [ "gs://example-bucket/folder/instance1.jsonl", @@ -252,6 +259,19 @@ def get_model_with_custom_project_mock(): yield get_model_mock +@pytest.fixture +def get_model_with_training_job(): + with mock.patch.object( + model_service_client.ModelServiceClient, "get_model" + ) as get_model_mock: + get_model_mock.return_value = gca_model.Model( + display_name=_TEST_MODEL_NAME, + name=_TEST_MODEL_RESOURCE_NAME_CUSTOM_PROJECT, + training_pipeline=_TEST_PIPELINE_RESOURCE_NAME, + ) + yield get_model_mock + + @pytest.fixture def get_model_with_supported_export_formats_image(): with mock.patch.object( @@ -457,6 +477,16 @@ def create_batch_prediction_job_with_explanations_mock(): yield create_batch_prediction_job_mock +@pytest.fixture +def get_training_job_non_existent_mock(): + with patch.object( + pipeline_service_client.PipelineServiceClient, "get_training_pipeline" + ) as get_training_job_non_existent_mock: + get_training_job_non_existent_mock.side_effect = api_exceptions.NotFound("404") + + yield get_training_job_non_existent_mock + + @pytest.fixture def create_client_mock(): with mock.patch.object( @@ -1384,3 +1414,20 @@ def test_export_model_as_artifact_with_invalid_args(self, export_model_mock, syn assert e.match( regexp=r"This model can not be exported as a container image." ) + + @pytest.mark.usefixtures( + "get_training_job_non_existent_mock", "get_model_with_training_job" + ) + def test_get_and_return_subclass_not_found(self): + test_model = models.Model(_TEST_ID) + + # Attempt to access Model's training job that no longer exists + with pytest.raises(api_exceptions.NotFound) as e: + test_model.training_job + + assert e.match( + regexp=( + r"The training job used to create this model could not be found: " + fr"{_TEST_PIPELINE_RESOURCE_NAME}" + ) + ) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 0d261d10b1..9616c1ace1 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -29,7 +29,6 @@ from unittest.mock import patch from google.auth import credentials as auth_credentials -from google.api_core import exceptions as api_core_exceptions from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import source_utils @@ -213,18 +212,6 @@ def get_training_job_tabular_mock(): yield get_training_job_tabular_mock -@pytest.fixture -def get_training_job_non_existent_mock(): - with patch.object( - pipeline_service_client.PipelineServiceClient, "get_training_pipeline" - ) as get_training_job_non_existent_mock: - get_training_job_non_existent_mock.side_effect = api_core_exceptions.NotFound( - "404" - ) - - yield get_training_job_non_existent_mock - - @pytest.fixture def mock_client_bucket(): with patch.object(storage.Client, "bucket") as mock_client_bucket: @@ -1579,20 +1566,6 @@ def test_get_and_return_subclass_custom(self): assert isinstance(subcls, aiplatform.training_jobs.CustomTrainingJob) - @pytest.mark.usefixtures("get_training_job_non_existent_mock") - def test_get_and_return_subclass_not_found(self): - with pytest.raises(api_core_exceptions.NotFound) as e: - aiplatform.training_jobs._TrainingJob._get_and_return_subclass( - resource_name=_TEST_PIPELINE_RESOURCE_NAME - ) - - assert e.match( - regexp=( - r"The training job used to create this model could not be found: " - fr"{_TEST_PIPELINE_RESOURCE_NAME}" - ) - ) - @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset( self, From e63778d3f1f608aefccdb9946ff4272eb383a6a1 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Mon, 14 Jun 2021 21:24:54 -0500 Subject: [PATCH 04/10] flake8 lint --- google/cloud/aiplatform/jobs.py | 2 +- google/cloud/aiplatform/models.py | 8 ++++---- google/cloud/aiplatform/training_jobs.py | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 283f104a91..359dc44eea 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -309,7 +309,7 @@ def __init__( def output_info(self): """Information describing the output of this job, including output location into which prediction output is written. - + This is only available for batch predicition jobs that have run successfully. """ return self._gca_resource.output_info diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 83a7805d5e..5704dc9048 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -125,9 +125,9 @@ def __init__( def traffic_split(self) -> dict: """A map from a DeployedModel's ID to the percentage of this Endpoint's traffic that should be forwarded to that DeployedModel. - + If a DeployedModel's ID is not listed in this map, then it receives no traffic. - + The traffic percentage values must add up to 100, or map must be empty if the Endpoint is to not accept any traffic at a moment. """ @@ -1281,8 +1281,8 @@ def supported_input_storage_formats(self) -> List[str]: def supported_output_storage_formats(self) -> List[str]: """The formats this Model supports in the `output_config` field of a `BatchPredictionJob`. - - If both `Model.predict_schemata.instance_schema_uri` and + + If both `Model.predict_schemata.instance_schema_uri` and `Model.predict_schemata.prediction_schema_uri` exist, the predictions are returned together with their instances. In other words, the prediction has the original instance data first, followed by the actual diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 00c9d0ca2d..ec8cd7b022 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -21,7 +21,6 @@ import abc from google.auth import credentials as auth_credentials -from google.api_core import exceptions as api_exceptions from google.cloud.aiplatform import base from google.cloud.aiplatform import constants from google.cloud.aiplatform import datasets From fb798e5164a8688943cdbd271b3328a638534520 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Tue, 15 Jun 2021 14:31:47 -0500 Subject: [PATCH 05/10] Address reviewer comments --- google/cloud/aiplatform/jobs.py | 2 +- google/cloud/aiplatform/models.py | 8 +------- google/cloud/aiplatform/training_jobs.py | 2 +- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 359dc44eea..c1b19dffa3 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -306,7 +306,7 @@ def __init__( ) @property - def output_info(self): + def output_info(self,) -> Optional[aiplatform.gapic.BatchPredictionJob.OutputInfo]: """Information describing the output of this job, including output location into which prediction output is written. diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 5704dc9048..8fb515443d 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -122,7 +122,7 @@ def __init__( ) @property - def traffic_split(self) -> dict: + def traffic_split(self) -> Dict[str, int]: """A map from a DeployedModel's ID to the percentage of this Endpoint's traffic that should be forwarded to that DeployedModel. @@ -1334,12 +1334,6 @@ def container_spec(self) -> Optional[aiplatform.gapic.ModelContainerSpec]: this Model. Not present for AutoML Models.""" return getattr(self._gca_resource, "container_spec") - @property - def artifact_uri(self) -> Optional[str]: - """The path to the directory containing the Model artifact and any of its - supporting files. Not present for AutoML Models.""" - return getattr(self._gca_resource, "artifact_uri") - def __init__( self, model_name: str, diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index ec8cd7b022..a27bc1e424 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -222,7 +222,7 @@ def _get_and_return_subclass( Optional location to retrieve dataset from. If not set, location set in aiplatform.init will be used. credentials (auth_credentials.Credentials): - Custom credentials to use to upload this model. Overrides + Optional. Custom credentials to use to upload this model. Overrides credentials set in aiplatform.init. Returns: From 08bea1af969bf661ac602e9c5e5e73cb536c7c93 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Tue, 15 Jun 2021 14:33:03 -0500 Subject: [PATCH 06/10] Switch 'an' to 'a' when referencing Vertex AI --- google/cloud/aiplatform/base.py | 2 +- google/cloud/aiplatform/initializer.py | 2 +- google/cloud/aiplatform/jobs.py | 8 ++-- google/cloud/aiplatform/training_jobs.py | 50 ++++++++++++------------ 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index ac7cb00066..620851918b 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -870,7 +870,7 @@ def _construct_sdk_resource_from_gapic( Args: gapic_resource (proto.Message): - A GAPIC representation of an Vertex AI resource, usually + A GAPIC representation of a Vertex AI resource, usually retrieved by a get_* or in a list_* API call. project (str): Optional. Project to construct SDK object from. If not set, diff --git a/google/cloud/aiplatform/initializer.py b/google/cloud/aiplatform/initializer.py index 4f57115fe7..ea1a51c8a7 100644 --- a/google/cloud/aiplatform/initializer.py +++ b/google/cloud/aiplatform/initializer.py @@ -267,7 +267,7 @@ def create_client( Args: client_class (utils.VertexAiServiceClientWithOverride): - (Required) An Vertex AI Service Client with optional overrides. + (Required) A Vertex AI Service Client with optional overrides. credentials (auth_credentials.Credentials): Custom auth credentials. If not provided will use the current config. location_override (str): Optional location override. diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index c1b19dffa3..4510f8bc3a 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -866,7 +866,7 @@ def get( location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, ) -> "_RunnableJob": - """Get an Vertex AI Job for the given resource_name. + """Get a Vertex AI Job for the given resource_name. Args: resource_name (str): @@ -882,7 +882,7 @@ def get( credentials set in aiplatform.init. Returns: - An Vertex AI Job. + A Vertex AI Job. """ self = cls._empty_constructor( project=project, @@ -1181,7 +1181,7 @@ def run( distributed training jobs that are not resilient to workers leaving and joining a job. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -1497,7 +1497,7 @@ def run( distributed training jobs that are not resilient to workers leaving and joining a job. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index a27bc1e424..767533935c 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -166,7 +166,7 @@ def get( doesn't match the custom training task definition. Returns: - An Vertex AI Training Job + A Vertex AI Training Job """ # Create job with dummy parameters @@ -226,7 +226,7 @@ def _get_and_return_subclass( credentials set in aiplatform.init. Returns: - An Vertex AI Training Job + A Vertex AI Training Job """ # Retrieve training pipeline resource before class construction @@ -651,7 +651,7 @@ def _get_model(self) -> Optional[models.Model]: """Helper method to get and instantiate the Model to Upload. Returns: - model: Vertex AI Model if training succeeded and produced an Vertex AI + model: Vertex AI Model if training succeeded and produced a Vertex AI Model. None otherwise. Raises: @@ -1163,7 +1163,7 @@ def _prepare_training_task_inputs_and_output_dir( Private services access must already be configured for the network. If left unspecified, the job is not peered with any network. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -1631,7 +1631,7 @@ def run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -1652,7 +1652,7 @@ def run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ worker_pool_specs, managed_model = self._prepare_and_validate_run( model_display_name=model_display_name, @@ -1801,7 +1801,7 @@ def _run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -1822,7 +1822,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ package_gcs_uri = python_packager.package_and_copy_to_gcs( gcs_staging_dir=self._staging_bucket, @@ -2246,7 +2246,7 @@ def run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -2267,7 +2267,7 @@ def run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError: If Training job has already been run, staging_bucket has not @@ -2410,7 +2410,7 @@ def _run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -2431,7 +2431,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ for spec in worker_pool_specs: @@ -2697,7 +2697,7 @@ def run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError: If Training job has already been run or is waiting to run. @@ -2815,7 +2815,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ training_task_definition = schema.training_job.definition.automl_tabular @@ -3099,7 +3099,7 @@ def run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError if Training job has already been run or is waiting to run. @@ -3284,7 +3284,7 @@ def _run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ training_task_definition = schema.training_job.definition.automl_forecasting @@ -3578,7 +3578,7 @@ def run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError: If Training job has already been run or is waiting to run. @@ -3679,7 +3679,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ # Retrieve the objective-specific training task schema based on prediction_type @@ -4115,7 +4115,7 @@ def run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -4136,7 +4136,7 @@ def run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ worker_pool_specs, managed_model = self._prepare_and_validate_run( model_display_name=model_display_name, @@ -4261,7 +4261,7 @@ def _run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -4282,7 +4282,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ for spec in worker_pool_specs: spec["python_package_spec"] = { @@ -4489,7 +4489,7 @@ def run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError: If Training job has already been run or is waiting to run. @@ -4553,7 +4553,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ # Retrieve the objective-specific training task schema based on prediction_type @@ -4831,7 +4831,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ if model_display_name is None: From 1653b9dfb96a6b3f38c7a5b812f4ac83f6fab6ab Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Wed, 16 Jun 2021 02:19:16 -0500 Subject: [PATCH 07/10] Address comments, move base attrs to subclasses --- google/cloud/aiplatform/base.py | 39 ------------------ google/cloud/aiplatform/jobs.py | 52 +++++++++++++++++++++++- google/cloud/aiplatform/models.py | 14 +++++++ google/cloud/aiplatform/training_jobs.py | 39 ++++++++++++++++++ 4 files changed, 104 insertions(+), 40 deletions(-) diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index 620851918b..237c92968a 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -564,45 +564,6 @@ def update_time(self) -> datetime.datetime: self._sync_gca_resource() return self._gca_resource.update_time - @property - def start_time(self) -> Optional[datetime.datetime]: - """Time when the Pipline or Job entered the `JOB_STATE_RUNNING` or - `PIPELINE_STATE_RUNNING` for the first time. Only for Job or Pipeline resources.""" - self._sync_gca_resource() - return getattr(self._gca_resource, "start_time") - - @property - def end_time(self) -> Optional[datetime.datetime]: - """Time when the Job resource entered the `JOB_STATE_SUCCEEDED`, - `JOB_STATE_FAILED`, `JOB_STATE_CANCELLED` state, or when the Pipeline resource - entered the `PIPELINE_STATE_SUCCEEDED`, `PIPELINE_STATE_FAILED`, - `PIPELINE_STATE_CANCELLED` state. Only for Job or Pipeline resources.""" - self._sync_gca_resource() - return getattr(self._gca_resource, "end_time") - - @property - def error(self) -> Optional[status_pb2.Status]: - """Detailed error info for Job or Pipeline resources. - Only populated when the Pipeline's state is `PIPELINE_STATE_FAILED` or - `PIPELINE_STATE_CANCELLED` or when the Job's state is `JOB_STATE_FAILED` - or `JOB_STATE_CANCELLED`.""" - self._sync_gca_resource() - return getattr(self._gca_resource, "error") - - @property - def network(self) -> Optional[str]: - """The full name of the Google Compute Engine - [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this - Vertex AI resource should be peered. - - Takes the format `projects/{project}/global/networks/{network}`. Where - {project} is a project number, as in `12345`, and {network} is a network name. - - Private services access must already be configured for the network. If left - unspecified, the resource is not peered with any network. - """ - return getattr(self._gca_resource, "network") - @property def encryption_spec(self) -> Optional[gca_encryption_spec.EncryptionSpec]: """Customer-managed encryption key options for this Vertex AI resource. diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 4510f8bc3a..8bc1d30461 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -19,6 +19,7 @@ import abc import copy +import datetime import sys import time import logging @@ -142,6 +143,27 @@ def state(self) -> gca_job_state.JobState: return self._gca_resource.state + @property + def start_time(self) -> Optional[datetime.datetime]: + """Time when the Job resource entered the `JOB_STATE_RUNNING` for the + first time.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "start_time") + + @property + def end_time(self) -> Optional[datetime.datetime]: + """Time when the Job resource entered the `JOB_STATE_SUCCEEDED`, + `JOB_STATE_FAILED`, or `JOB_STATE_CANCELLED` state.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "end_time") + + @property + def error(self) -> Optional[status_pb2.Status]: + """Detailed error info for this Job resource. Only populated when the + Job's state is `JOB_STATE_FAILED` or `JOB_STATE_CANCELLED`.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "error") + @property @abc.abstractmethod def _job_type(cls) -> str: @@ -911,7 +933,7 @@ class CustomJob(_RunnableJob): _resource_noun = "customJobs" _getter_method = "get_custom_job" - _list_method = "list_custom_job" + _list_method = "list_custom_jobs" _cancel_method = "cancel_custom_job" _delete_method = "delete_custom_job" _job_type = "training" @@ -1011,6 +1033,20 @@ def __init__( ), ) + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + CustomJob should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the CustomJob is not peered with any network. + """ + return getattr(self._gca_resource, "network") + @classmethod def from_local_script( cls, @@ -1468,6 +1504,20 @@ def __init__( ), ) + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + HyperparameterTuningJob should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the HyperparameterTuningJob is not peered with any network. + """ + return getattr(self._gca_resource.trial_job_spec, "network") + @base.optional_sync() def run( self, diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 8fb515443d..02d2510026 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -134,6 +134,20 @@ def traffic_split(self) -> Dict[str, int]: self._sync_gca_resource() return dict(self._gca_resource.traffic_split) + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + Endpoint should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the Endpoint is not peered with any network. + """ + return getattr(self._gca_resource, "network") + @classmethod def create( cls, diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 767533935c..99f4f088a5 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime import time from typing import Dict, List, Optional, Sequence, Tuple, Union @@ -46,6 +47,7 @@ ) from google.rpc import code_pb2 +from google.rpc import status_pb2 import proto @@ -138,6 +140,28 @@ def _supported_training_schemas(cls) -> Tuple[str]: """List of supported schemas for this training job.""" pass + @property + def start_time(self) -> Optional[datetime.datetime]: + """Time when the TrainingJob entered the `PIPELINE_STATE_RUNNING` for + the first time.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "start_time") + + @property + def end_time(self) -> Optional[datetime.datetime]: + """Time when the TrainingJob resource entered the `PIPELINE_STATE_SUCCEEDED`, + `PIPELINE_STATE_FAILED`, `PIPELINE_STATE_CANCELLED` state.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "end_time") + + @property + def error(self) -> Optional[status_pb2.Status]: + """Detailed error info for this TrainingJob resource. Only populated when + the TrainingJob's state is `PIPELINE_STATE_FAILED` or + `PIPELINE_STATE_CANCELLED`.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "error") + @classmethod def get( cls, @@ -1065,6 +1089,21 @@ def __init__( # this flags keeps that state so we don't log it multiple times self._has_logged_custom_job = False + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + CustomTrainingJob should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the CustomTrainingJob is not peered with any network. + """ + # Return `network` value in training task inputs if set in Map + return self._gca_resource.training_task_inputs.get("network") + def _prepare_and_validate_run( self, model_display_name: Optional[str] = None, From 88c076a4334892f21529d77064e728cd6d74c0c3 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Wed, 16 Jun 2021 11:17:30 -0500 Subject: [PATCH 08/10] Drop unused import --- google/cloud/aiplatform/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index 237c92968a..732e6b9acf 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -43,7 +43,6 @@ from google.cloud.aiplatform import initializer from google.cloud.aiplatform import utils from google.cloud.aiplatform.compat.types import encryption_spec as gca_encryption_spec -from google.rpc import status_pb2 logging.basicConfig(level=logging.INFO, stream=sys.stdout) From c46c68b8895f5790d86969f1162f12e349896602 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Wed, 16 Jun 2021 18:08:51 -0500 Subject: [PATCH 09/10] Add test to ensure supported training schemas are always unique --- tests/unit/aiplatform/test_training_jobs.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 9616c1ace1..b160204d7d 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -1550,6 +1550,24 @@ def test_get_training_job_with_project_and_alt_location(self): location=_TEST_ALT_LOCATION, ) + def test_unique_supported_training_schemas(self): + """Ensure that the `_supported_training_schemas` across AutoML training + classes and CustomTrainingJob contain unique values.""" + + schemas = [ + schema + for c in aiplatform.training_jobs._TrainingJob.__subclasses__() + for schema in c._supported_training_schemas + if c.__name__.startswith("AutoML") + ] + + schemas.extend( + aiplatform.training_jobs.CustomTrainingJob._supported_training_schemas + ) + + # Ensure all schemas across classes are unique + assert len(set(schemas)) == len(schemas) + @pytest.mark.usefixtures("get_training_job_tabular_mock") def test_get_and_return_subclass_automl(self): subcls = aiplatform.training_jobs._TrainingJob._get_and_return_subclass( From 0ed7704d8b8840dacb09a58f86c347d82fdeb795 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Wed, 16 Jun 2021 18:48:46 -0500 Subject: [PATCH 10/10] Address reviewer comments --- google/cloud/aiplatform/jobs.py | 3 +-- google/cloud/aiplatform/models.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 8bc1d30461..376c6245ba 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -24,13 +24,12 @@ import time import logging -from google.rpc import status_pb2 - from google.cloud import storage from google.cloud import bigquery from google.auth import credentials as auth_credentials from google.protobuf import duration_pb2 # type: ignore +from google.rpc import status_pb2 from google.cloud import aiplatform from google.cloud.aiplatform import base diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 02d2510026..b287581431 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -1274,6 +1274,20 @@ def supported_export_formats( def supported_deployment_resources_types( self, ) -> List[aiplatform.gapic.Model.DeploymentResourcesType]: + """List of deployment resource types accepted for this Model. + + When this Model is deployed, its prediction resources are described by + the `prediction_resources` field of the objects returned by + `Endpoint.list_models()`. Because not all Models support all resource + configuration types, the configuration types this Model supports are + listed here. + + If no configuration types are listed, the Model cannot be + deployed to an `Endpoint` and does not support online predictions + (`Endpoint.predict()` or `Endpoint.explain()`). Such a Model can serve + predictions by using a `BatchPredictionJob`, if it has at least one entry + each in `Model.supported_input_storage_formats` and + `Model.supported_output_storage_formats`.""" return list(self._gca_resource.supported_deployment_resources_types) @property