Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose additional attributes into Vertex SDK to close gap with GAPIC #477

Merged
merged 11 commits into from Jun 17, 2021
59 changes: 58 additions & 1 deletion google/cloud/aiplatform/base.py
Expand Up @@ -42,7 +42,8 @@
from google.auth import credentials as auth_credentials
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import utils

from google.cloud.aiplatform.compat.types import encryption_spec as gca_encryption_spec
from google.rpc import status_pb2

logging.basicConfig(level=logging.INFO, stream=sys.stdout)

Expand Down Expand Up @@ -563,6 +564,62 @@ def update_time(self) -> datetime.datetime:
self._sync_gca_resource()
return self._gca_resource.update_time

@property
def start_time(self) -> Optional[datetime.datetime]:
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved
"""Time when the Pipline or Job entered the `JOB_STATE_RUNNING` or
`PIPELINE_STATE_RUNNING` for the first time. Only for Job or Pipeline resources."""
self._sync_gca_resource()
return getattr(self._gca_resource, "start_time")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the getattr access pattern necessary? It seems like if the field doesn't exist in the proto, it will default to return None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, if we are certain the field will always be present then we should return it directly. But I went with getattr for fields that are Optional and we know won't be populated sometimes. However they do appear to default to None, any preference on your end?


@property
def end_time(self) -> Optional[datetime.datetime]:
"""Time when the Job resource entered the `JOB_STATE_SUCCEEDED`,
`JOB_STATE_FAILED`, `JOB_STATE_CANCELLED` state, or when the Pipeline resource
entered the `PIPELINE_STATE_SUCCEEDED`, `PIPELINE_STATE_FAILED`,
`PIPELINE_STATE_CANCELLED` state. Only for Job or Pipeline resources."""
self._sync_gca_resource()
return getattr(self._gca_resource, "end_time")

@property
def error(self) -> Optional[status_pb2.Status]:
"""Detailed error info for Job or Pipeline resources.
Only populated when the Pipeline's state is `PIPELINE_STATE_FAILED` or
`PIPELINE_STATE_CANCELLED` or when the Job's state is `JOB_STATE_FAILED`
or `JOB_STATE_CANCELLED`."""
self._sync_gca_resource()
return getattr(self._gca_resource, "error")

@property
def network(self) -> Optional[str]:
"""The full name of the Google Compute Engine
[network](https://cloud.google.com/vpc/docs/vpc#networks) to which this
Vertex AI resource should be peered.

Takes the format `projects/{project}/global/networks/{network}`. Where
{project} is a project number, as in `12345`, and {network} is a network name.

Private services access must already be configured for the network. If left
unspecified, the resource is not peered with any network.
"""
return getattr(self._gca_resource, "network")

@property
def encryption_spec(self) -> Optional[gca_encryption_spec.EncryptionSpec]:
"""Customer-managed encryption key options for this Vertex AI resource.

If this is set, then all resources created by this Vertex AI resource will
be encrypted with the provided encryption key.
"""
return getattr(self._gca_resource, "encryption_spec")

@property
def labels(self) -> Dict[str, str]:
"""User-defined labels containing metadata about this resource.

Read more about labels at https://goo.gl/xmQnxf
"""
return self._gca_resource.labels

@property
def gca_resource(self) -> proto.Message:
"""The underlying resource proto represenation."""
Expand Down
24 changes: 24 additions & 0 deletions google/cloud/aiplatform/jobs.py
Expand Up @@ -23,6 +23,8 @@
import time
import logging

from google.rpc import status_pb2
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved

from google.cloud import storage
from google.cloud import bigquery

Expand All @@ -45,6 +47,7 @@
batch_prediction_job as gca_bp_job_compat,
batch_prediction_job_v1 as gca_bp_job_v1,
batch_prediction_job_v1beta1 as gca_bp_job_v1beta1,
completion_stats as gca_completion_stats,
custom_job as gca_custom_job_compat,
custom_job_v1beta1 as gca_custom_job_v1beta1,
explanation_v1beta1 as gca_explanation_v1beta1,
Expand Down Expand Up @@ -302,6 +305,27 @@ def __init__(
credentials=credentials,
)

@property
def output_info(self):
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved
"""Information describing the output of this job, including output location
into which prediction output is written.

This is only available for batch predicition jobs that have run successfully.
"""
return self._gca_resource.output_info

@property
def partial_failures(self) -> Optional[Sequence[status_pb2.Status]]:
"""Partial failures encountered. For example, single files that can't be read.
This field never exceeds 20 entries. Status details fields contain standard
GCP error details."""
return getattr(self._gca_resource, "partial_failures")

@property
def completion_stats(self) -> Optional[gca_completion_stats.CompletionStats]:
"""Statistics on completed and failed prediction instances."""
return getattr(self._gca_resource, "completion_stats")

@classmethod
def create(
cls,
Expand Down
108 changes: 104 additions & 4 deletions google/cloud/aiplatform/models.py
Expand Up @@ -18,8 +18,10 @@
from typing import Dict, List, NamedTuple, Optional, Sequence, Tuple, Union

from google.api_core import operation
from google.api_core import exceptions as api_exceptions
from google.auth import credentials as auth_credentials

from google.cloud import aiplatform
from google.cloud.aiplatform import base
from google.cloud.aiplatform import compat
from google.cloud.aiplatform import explain
Expand Down Expand Up @@ -119,6 +121,19 @@ def __init__(
credentials=credentials,
)

@property
def traffic_split(self) -> dict:
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved
"""A map from a DeployedModel's ID to the percentage of this Endpoint's
traffic that should be forwarded to that DeployedModel.

If a DeployedModel's ID is not listed in this map, then it receives no traffic.

The traffic percentage values must add up to 100, or map must be empty if
the Endpoint is to not accept any traffic at a moment.
"""
self._sync_gca_resource()
return dict(self._gca_resource.traffic_split)

@classmethod
def create(
cls,
Expand Down Expand Up @@ -1211,12 +1226,13 @@ class Model(base.VertexAiResourceNounWithFutureManager):
_delete_method = "delete_model"

@property
def uri(self):
"""Uri of the model."""
return self._gca_resource.artifact_uri
def uri(self) -> Optional[str]:
"""Path to the directory containing the Model artifact and any of its
supporting files. Not present for AutoML Models."""
return self._gca_resource.artifact_uri or None

@property
def description(self):
def description(self) -> str:
"""Description of the model."""
return self._gca_resource.description

Expand All @@ -1240,6 +1256,90 @@ def supported_export_formats(
for export_format in self._gca_resource.supported_export_formats
}

@property
def supported_deployment_resources_types(
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved
self,
) -> List[aiplatform.gapic.Model.DeploymentResourcesType]:
return list(self._gca_resource.supported_deployment_resources_types)

@property
def supported_input_storage_formats(self) -> List[str]:
"""The formats this Model supports in the `input_config` field of a
`BatchPredictionJob`. If `Model.predict_schemata.instance_schema_uri`
exists, the instances should be given as per that schema.

[Read the docs for more on batch prediction formats](https://cloud.google.com/vertex-ai/docs/predictions/batch-predictions#batch_request_input)

If this Model doesn't support any of these formats it means it cannot be
used with a `BatchPredictionJob`. However, if it has
`supported_deployment_resources_types`, it could serve online predictions
by using `Endpoint.predict()` or `Endpoint.explain()`.
"""
return list(self._gca_resource.supported_input_storage_formats)

@property
def supported_output_storage_formats(self) -> List[str]:
"""The formats this Model supports in the `output_config` field of a
`BatchPredictionJob`.

If both `Model.predict_schemata.instance_schema_uri` and
`Model.predict_schemata.prediction_schema_uri` exist, the predictions
are returned together with their instances. In other words, the
prediction has the original instance data first, followed by the actual
prediction content (as per the schema).

[Read the docs for more on batch prediction formats](https://cloud.google.com/vertex-ai/docs/predictions/batch-predictions)

If this Model doesn't support any of these formats it means it cannot be
used with a `BatchPredictionJob`. However, if it has
`supported_deployment_resources_types`, it could serve online predictions
by using `Endpoint.predict()` or `Endpoint.explain()`.
"""
return list(self._gca_resource.supported_output_storage_formats)

@property
def predict_schemata(self) -> Optional[aiplatform.gapic.PredictSchemata]:
"""The schemata that describe formats of the Model's predictions and
explanations, if available."""
return getattr(self._gca_resource, "predict_schemata")

@property
def training_job(self) -> Optional["aiplatform.training_jobs._TrainingJob"]:
"""The TrainingJob that uploaded this Model, if any.

Raises:
api_core.exceptions.NotFound: If the Model's training job resource
cannot be found on the Vertex service.
"""
job_name = getattr(self._gca_resource, "training_pipeline")

if not job_name:
return None

try:
return aiplatform.training_jobs._TrainingJob._get_and_return_subclass(
resource_name=job_name,
project=self.project,
location=self.location,
credentials=self.credentials,
)
except api_exceptions.NotFound:
raise api_exceptions.NotFound(
f"The training job used to create this model could not be found: {job_name}"
)

@property
def container_spec(self) -> Optional[aiplatform.gapic.ModelContainerSpec]:
"""The specification of the container that is to be used when deploying
this Model. Not present for AutoML Models."""
return getattr(self._gca_resource, "container_spec")

@property
def artifact_uri(self) -> Optional[str]:
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved
"""The path to the directory containing the Model artifact and any of its
supporting files. Not present for AutoML Models."""
return getattr(self._gca_resource, "artifact_uri")

def __init__(
self,
model_name: str,
Expand Down
64 changes: 60 additions & 4 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -136,7 +136,6 @@ def __init__(
@abc.abstractmethod
def _supported_training_schemas(cls) -> Tuple[str]:
"""List of supported schemas for this training job."""

pass

@classmethod
Expand All @@ -153,10 +152,10 @@ def get(
resource_name (str):
Required. A fully-qualified resource name or ID.
project (str):
Optional project to retrieve dataset from. If not set, project
Optional project to retrieve training job from. If not set, project
set in aiplatform.init will be used.
location (str):
Optional location to retrieve dataset from. If not set, location
Optional location to retrieve training job from. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Custom credentials to use to upload this model. Overrides
Expand Down Expand Up @@ -194,11 +193,68 @@ def get(

return self

@classmethod
def _get_and_return_subclass(
cls,
resource_name: str,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
) -> "_TrainingJob":
"""Retrieve Training Job subclass for the given resource_name without
knowing the training_task_definition.

Example usage:
```
aiplatform.training_jobs._TrainingJob._get_and_return_subclass(
'projects/.../locations/.../trainingPipelines/12345'
)
# Returns: <google.cloud.aiplatform.training_jobs.AutoMLImageTrainingJob>
```

Args:
resource_name (str):
Required. A fully-qualified resource name or ID.
project (str):
Optional project to retrieve dataset from. If not set, project
set in aiplatform.init will be used.
location (str):
Optional location to retrieve dataset from. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Custom credentials to use to upload this model. Overrides
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved
credentials set in aiplatform.init.

Returns:
An Vertex AI Training Job
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved
"""

# Retrieve training pipeline resource before class construction
client = cls._instantiate_client(location=location, credentials=credentials)

gca_training_pipeline = getattr(client, cls._getter_method)(name=resource_name)

schema_uri = gca_training_pipeline.training_task_definition

# Collect all AutoML training job classes and CustomTrainingJob
class_list = [
vinnysenthil marked this conversation as resolved.
Show resolved Hide resolved
c for c in cls.__subclasses__() if c.__name__.startswith("AutoML")
] + [CustomTrainingJob]

# Identify correct training job subclass, construct and return object
for c in class_list:
if schema_uri in c._supported_training_schemas:
return c._empty_constructor(
project=project,
location=location,
credentials=credentials,
resource_name=resource_name,
)

@property
@abc.abstractmethod
def _model_upload_fail_string(self) -> str:
"""Helper property for model upload failure."""

pass

@abc.abstractmethod
Expand Down