diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 2ce48adc53..87af6b16bf 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -14,7 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import pathlib import proto +import re +import shutil +import tempfile from typing import Dict, List, NamedTuple, Optional, Sequence, Tuple, Union from google.api_core import operation @@ -28,6 +32,7 @@ from google.cloud.aiplatform import jobs from google.cloud.aiplatform import models from google.cloud.aiplatform import utils +from google.cloud.aiplatform.utils import gcs_utils from google.cloud.aiplatform.compat.services import endpoint_service_client @@ -49,6 +54,15 @@ _LOGGER = base.Logger(__name__) +_SUPPORTED_MODEL_FILE_NAMES = [ + "model.pkl", + "model.joblib", + "model.bst", + "saved_model.pb", + "saved_model.pbtxt", +] + + class Prediction(NamedTuple): """Prediction class envelopes returned Model predictions and the Model id. @@ -1492,6 +1506,7 @@ def upload( credentials: Optional[auth_credentials.Credentials] = None, labels: Optional[Dict[str, str]] = None, encryption_spec_key_name: Optional[str] = None, + staging_bucket: Optional[str] = None, sync=True, ) -> "Model": """Uploads a model and returns a Model representing the uploaded Model @@ -1635,11 +1650,15 @@ def upload( If set, this Model and all sub-resources of this Model will be secured by this key. Overrides encryption_spec_key_name set in aiplatform.init. + staging_bucket (str): + Optional. Bucket to stage local model artifacts. Overrides + staging_bucket set in aiplatform.init. Returns: model: Instantiated representation of the uploaded model resource. Raises: ValueError: If only `explanation_metadata` or `explanation_parameters` is specified. + Also if model directory does not contain a supported model file. """ utils.validate_display_name(display_name) if labels: @@ -1697,6 +1716,36 @@ def upload( encryption_spec=encryption_spec, ) + if artifact_uri and not artifact_uri.startswith("gs://"): + model_dir = pathlib.Path(artifact_uri) + # Validating the model directory + if not model_dir.exists(): + raise ValueError(f"artifact_uri path does not exist: '{artifact_uri}'") + PREBUILT_IMAGE_RE = "(us|europe|asia)-docker.pkg.dev/vertex-ai/prediction/" + if re.match(PREBUILT_IMAGE_RE, serving_container_image_uri): + if not model_dir.is_dir(): + raise ValueError( + f"artifact_uri path must be a directory: '{artifact_uri}' when using prebuilt image '{serving_container_image_uri}'" + ) + if not any( + (model_dir / file_name).exists() + for file_name in _SUPPORTED_MODEL_FILE_NAMES + ): + raise ValueError( + "artifact_uri directory does not contain any supported model files. " + f"When using a prebuilt serving image, the upload method only supports the following model files: '{_SUPPORTED_MODEL_FILE_NAMES}'" + ) + + # Uploading the model + staged_data_uri = gcs_utils.stage_local_data_in_gcs( + data_path=str(model_dir), + staging_gcs_dir=staging_bucket, + project=project, + location=location, + credentials=credentials, + ) + artifact_uri = staged_data_uri + if artifact_uri: managed_model.artifact_uri = artifact_uri @@ -2389,3 +2438,558 @@ def export_model( _LOGGER.log_action_completed_against_resource("model", "exported", self) return json_format.MessageToDict(operation_future.metadata.output_info._pb) + + @classmethod + @base.optional_sync() + def upload_xgboost_model_file( + cls, + model_file_path: str, + xgboost_version: str = "1.4", + display_name: str = "XGBoost model", + description: Optional[str] = None, + instance_schema_uri: Optional[str] = None, + parameters_schema_uri: Optional[str] = None, + prediction_schema_uri: Optional[str] = None, + explanation_metadata: Optional[explain.ExplanationMetadata] = None, + explanation_parameters: Optional[explain.ExplanationParameters] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + labels: Optional[Dict[str, str]] = None, + encryption_spec_key_name: Optional[str] = None, + staging_bucket: Optional[str] = None, + sync=True, + ) -> "Model": + """Uploads a model and returns a Model representing the uploaded Model + resource. + + Note: This function is *experimental* and can be changed in the future. + + Example usage:: + + my_model = Model.upload_xgboost_model_file( + model_file_path="iris.xgboost_model.bst" + ) + + Args: + model_file_path (str): Required. Local file path of the model. + xgboost_version (str): Optional. The version of the XGBoost serving container. + Supported versions: ["0.82", "0.90", "1.1", "1.2", "1.3", "1.4"]. + If the version is not specified, the latest version is used. + display_name (str): + Optional. The display name of the Model. The name can be up to 128 + characters long and can be consist of any UTF-8 characters. + description (str): + The description of the model. + instance_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the format of a single instance, which + are used in + ``PredictRequest.instances``, + ``ExplainRequest.instances`` + and + ``BatchPredictionJob.input_config``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform. Note: The URI given on output will be immutable + and probably different, including the URI scheme, than the + one given on input. The output URI will point to a location + where the user only has a read access. + parameters_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the parameters of prediction and + explanation via + ``PredictRequest.parameters``, + ``ExplainRequest.parameters`` + and + ``BatchPredictionJob.model_parameters``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform, if no parameters are supported it is set to an + empty string. Note: The URI given on output will be + immutable and probably different, including the URI scheme, + than the one given on input. The output URI will point to a + location where the user only has a read access. + prediction_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the format of a single prediction + produced by this Model, which are returned via + ``PredictResponse.predictions``, + ``ExplainResponse.explanations``, + and + ``BatchPredictionJob.output_config``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform. Note: The URI given on output will be immutable + and probably different, including the URI scheme, than the + one given on input. The output URI will point to a location + where the user only has a read access. + explanation_metadata (explain.ExplanationMetadata): + Optional. Metadata describing the Model's input and output for explanation. + Both `explanation_metadata` and `explanation_parameters` must be + passed together when used. For more details, see + `Ref docs ` + explanation_parameters (explain.ExplanationParameters): + Optional. Parameters to configure explaining for Model's predictions. + For more details, see `Ref docs ` + project: Optional[str]=None, + Project to upload this model to. Overrides project set in + aiplatform.init. + location: Optional[str]=None, + Location to upload this model to. Overrides location set in + aiplatform.init. + credentials: Optional[auth_credentials.Credentials]=None, + Custom credentials to use to upload this model. Overrides credentials + set in aiplatform.init. + labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize your Models. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the model. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If set, this Model and all sub-resources of this Model will be secured by this key. + + Overrides encryption_spec_key_name set in aiplatform.init. + staging_bucket (str): + Optional. Bucket to stage local model artifacts. Overrides + staging_bucket set in aiplatform.init. + Returns: + model: Instantiated representation of the uploaded model resource. + Raises: + ValueError: If only `explanation_metadata` or `explanation_parameters` + is specified. + Also if model directory does not contain a supported model file. + """ + XGBOOST_SUPPORTED_MODEL_FILE_EXTENSIONS = [ + ".pkl", + ".joblib", + ".bst", + ] + + container_image_uri = aiplatform.helpers.get_prebuilt_prediction_container_uri( + region=location, + framework="xgboost", + framework_version=xgboost_version, + accelerator="cpu", + ) + + model_file_path_obj = pathlib.Path(model_file_path) + if not model_file_path_obj.is_file(): + raise ValueError( + f"model_file_path path must point to a file: '{model_file_path}'" + ) + + model_file_extension = model_file_path_obj.suffix + if model_file_extension not in XGBOOST_SUPPORTED_MODEL_FILE_EXTENSIONS: + _LOGGER.warning( + f"Only the following XGBoost model file extensions are currently supported: '{XGBOOST_SUPPORTED_MODEL_FILE_EXTENSIONS}'" + ) + _LOGGER.warning( + "Treating the model file as a binary serialized XGBoost Booster." + ) + model_file_extension = ".bst" + + # Preparing model directory + # We cannot clean up the directory immediately after calling Model.upload since + # that call may be asynchronous and return before the model file has been read. + # To work around this, we make this method asynchronous (decorate with @base.optional_sync) + # but call Model.upload with sync=True. + with tempfile.TemporaryDirectory() as prepared_model_dir: + prepared_model_file_path = pathlib.Path(prepared_model_dir) / ( + "model" + model_file_extension + ) + shutil.copy(model_file_path_obj, prepared_model_file_path) + + return cls.upload( + serving_container_image_uri=container_image_uri, + artifact_uri=prepared_model_dir, + display_name=display_name, + description=description, + instance_schema_uri=instance_schema_uri, + parameters_schema_uri=parameters_schema_uri, + prediction_schema_uri=prediction_schema_uri, + explanation_metadata=explanation_metadata, + explanation_parameters=explanation_parameters, + project=project, + location=location, + credentials=credentials, + labels=labels, + encryption_spec_key_name=encryption_spec_key_name, + staging_bucket=staging_bucket, + sync=True, + ) + + @classmethod + @base.optional_sync() + def upload_scikit_learn_model_file( + cls, + model_file_path: str, + sklearn_version: str = "1.0", + display_name: str = "Scikit-learn model", + description: Optional[str] = None, + instance_schema_uri: Optional[str] = None, + parameters_schema_uri: Optional[str] = None, + prediction_schema_uri: Optional[str] = None, + explanation_metadata: Optional[explain.ExplanationMetadata] = None, + explanation_parameters: Optional[explain.ExplanationParameters] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + labels: Optional[Dict[str, str]] = None, + encryption_spec_key_name: Optional[str] = None, + staging_bucket: Optional[str] = None, + sync=True, + ) -> "Model": + """Uploads a model and returns a Model representing the uploaded Model + resource. + + Note: This function is *experimental* and can be changed in the future. + + Example usage:: + + my_model = Model.upload_scikit_learn_model_file( + model_file_path="iris.sklearn_model.joblib" + ) + + Args: + model_file_path (str): Required. Local file path of the model. + sklearn_version (str): + Optional. The version of the Scikit-learn serving container. + Supported versions: ["0.20", "0.22", "0.23", "0.24", "1.0"]. + If the version is not specified, the latest version is used. + display_name (str): + Optional. The display name of the Model. The name can be up to 128 + characters long and can be consist of any UTF-8 characters. + description (str): + The description of the model. + instance_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the format of a single instance, which + are used in + ``PredictRequest.instances``, + ``ExplainRequest.instances`` + and + ``BatchPredictionJob.input_config``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform. Note: The URI given on output will be immutable + and probably different, including the URI scheme, than the + one given on input. The output URI will point to a location + where the user only has a read access. + parameters_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the parameters of prediction and + explanation via + ``PredictRequest.parameters``, + ``ExplainRequest.parameters`` + and + ``BatchPredictionJob.model_parameters``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform, if no parameters are supported it is set to an + empty string. Note: The URI given on output will be + immutable and probably different, including the URI scheme, + than the one given on input. The output URI will point to a + location where the user only has a read access. + prediction_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the format of a single prediction + produced by this Model, which are returned via + ``PredictResponse.predictions``, + ``ExplainResponse.explanations``, + and + ``BatchPredictionJob.output_config``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform. Note: The URI given on output will be immutable + and probably different, including the URI scheme, than the + one given on input. The output URI will point to a location + where the user only has a read access. + explanation_metadata (explain.ExplanationMetadata): + Optional. Metadata describing the Model's input and output for explanation. + Both `explanation_metadata` and `explanation_parameters` must be + passed together when used. For more details, see + `Ref docs ` + explanation_parameters (explain.ExplanationParameters): + Optional. Parameters to configure explaining for Model's predictions. + For more details, see `Ref docs ` + project: Optional[str]=None, + Project to upload this model to. Overrides project set in + aiplatform.init. + location: Optional[str]=None, + Location to upload this model to. Overrides location set in + aiplatform.init. + credentials: Optional[auth_credentials.Credentials]=None, + Custom credentials to use to upload this model. Overrides credentials + set in aiplatform.init. + labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize your Models. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the model. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If set, this Model and all sub-resources of this Model will be secured by this key. + + Overrides encryption_spec_key_name set in aiplatform.init. + staging_bucket (str): + Optional. Bucket to stage local model artifacts. Overrides + staging_bucket set in aiplatform.init. + Returns: + model: Instantiated representation of the uploaded model resource. + Raises: + ValueError: If only `explanation_metadata` or `explanation_parameters` + is specified. + Also if model directory does not contain a supported model file. + """ + SKLEARN_SUPPORTED_MODEL_FILE_EXTENSIONS = [ + ".pkl", + ".joblib", + ] + + container_image_uri = aiplatform.helpers.get_prebuilt_prediction_container_uri( + region=location, + framework="sklearn", + framework_version=sklearn_version, + accelerator="cpu", + ) + + model_file_path_obj = pathlib.Path(model_file_path) + if not model_file_path_obj.is_file(): + raise ValueError( + f"model_file_path path must point to a file: '{model_file_path}'" + ) + + model_file_extension = model_file_path_obj.suffix + if model_file_extension not in SKLEARN_SUPPORTED_MODEL_FILE_EXTENSIONS: + _LOGGER.warning( + f"Only the following Scikit-learn model file extensions are currently supported: '{SKLEARN_SUPPORTED_MODEL_FILE_EXTENSIONS}'" + ) + _LOGGER.warning( + "Treating the model file as a pickle serialized Scikit-learn model." + ) + model_file_extension = ".pkl" + + # Preparing model directory + # We cannot clean up the directory immediately after calling Model.upload since + # that call may be asynchronous and return before the model file has been read. + # To work around this, we make this method asynchronous (decorate with @base.optional_sync) + # but call Model.upload with sync=True. + with tempfile.TemporaryDirectory() as prepared_model_dir: + prepared_model_file_path = pathlib.Path(prepared_model_dir) / ( + "model" + model_file_extension + ) + shutil.copy(model_file_path_obj, prepared_model_file_path) + + return cls.upload( + serving_container_image_uri=container_image_uri, + artifact_uri=prepared_model_dir, + display_name=display_name, + description=description, + instance_schema_uri=instance_schema_uri, + parameters_schema_uri=parameters_schema_uri, + prediction_schema_uri=prediction_schema_uri, + explanation_metadata=explanation_metadata, + explanation_parameters=explanation_parameters, + project=project, + location=location, + credentials=credentials, + labels=labels, + encryption_spec_key_name=encryption_spec_key_name, + staging_bucket=staging_bucket, + sync=True, + ) + + @classmethod + def upload_tensorflow_saved_model( + cls, + saved_model_dir: str, + tensorflow_version: str = "2.7", + use_gpu: bool = False, + display_name: str = "Tensorflow model", + description: Optional[str] = None, + instance_schema_uri: Optional[str] = None, + parameters_schema_uri: Optional[str] = None, + prediction_schema_uri: Optional[str] = None, + explanation_metadata: Optional[explain.ExplanationMetadata] = None, + explanation_parameters: Optional[explain.ExplanationParameters] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + labels: Optional[Dict[str, str]] = None, + encryption_spec_key_name: Optional[str] = None, + staging_bucket: Optional[str] = None, + sync=True, + ) -> "Model": + """Uploads a model and returns a Model representing the uploaded Model + resource. + + Note: This function is *experimental* and can be changed in the future. + + Example usage:: + + my_model = Model.upload_scikit_learn_model_file( + model_file_path="iris.tensorflow_model.SavedModel" + ) + + Args: + saved_model_dir (str): Required. + Local directory of the Tensorflow SavedModel. + tensorflow_version (str): + Optional. The version of the Tensorflow serving container. + Supported versions: ["0.15", "2.1", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7"]. + If the version is not specified, the latest version is used. + use_gpu (bool): Whether to use GPU for model serving. + display_name (str): + Optional. The display name of the Model. The name can be up to 128 + characters long and can be consist of any UTF-8 characters. + description (str): + The description of the model. + instance_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the format of a single instance, which + are used in + ``PredictRequest.instances``, + ``ExplainRequest.instances`` + and + ``BatchPredictionJob.input_config``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform. Note: The URI given on output will be immutable + and probably different, including the URI scheme, than the + one given on input. The output URI will point to a location + where the user only has a read access. + parameters_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the parameters of prediction and + explanation via + ``PredictRequest.parameters``, + ``ExplainRequest.parameters`` + and + ``BatchPredictionJob.model_parameters``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform, if no parameters are supported it is set to an + empty string. Note: The URI given on output will be + immutable and probably different, including the URI scheme, + than the one given on input. The output URI will point to a + location where the user only has a read access. + prediction_schema_uri (str): + Optional. Points to a YAML file stored on Google Cloud + Storage describing the format of a single prediction + produced by this Model, which are returned via + ``PredictResponse.predictions``, + ``ExplainResponse.explanations``, + and + ``BatchPredictionJob.output_config``. + The schema is defined as an OpenAPI 3.0.2 `Schema + Object `__. + AutoML Models always have this field populated by AI + Platform. Note: The URI given on output will be immutable + and probably different, including the URI scheme, than the + one given on input. The output URI will point to a location + where the user only has a read access. + explanation_metadata (explain.ExplanationMetadata): + Optional. Metadata describing the Model's input and output for explanation. + Both `explanation_metadata` and `explanation_parameters` must be + passed together when used. For more details, see + `Ref docs ` + explanation_parameters (explain.ExplanationParameters): + Optional. Parameters to configure explaining for Model's predictions. + For more details, see `Ref docs ` + project: Optional[str]=None, + Project to upload this model to. Overrides project set in + aiplatform.init. + location: Optional[str]=None, + Location to upload this model to. Overrides location set in + aiplatform.init. + credentials: Optional[auth_credentials.Credentials]=None, + Custom credentials to use to upload this model. Overrides credentials + set in aiplatform.init. + labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize your Models. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the model. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If set, this Model and all sub-resources of this Model will be secured by this key. + + Overrides encryption_spec_key_name set in aiplatform.init. + staging_bucket (str): + Optional. Bucket to stage local model artifacts. Overrides + staging_bucket set in aiplatform.init. + Returns: + model: Instantiated representation of the uploaded model resource. + Raises: + ValueError: If only `explanation_metadata` or `explanation_parameters` + is specified. + Also if model directory does not contain a supported model file. + """ + container_image_uri = aiplatform.helpers.get_prebuilt_prediction_container_uri( + region=location, + framework="tensorflow", + framework_version=tensorflow_version, + accelerator="gpu" if use_gpu else "cpu", + ) + + return cls.upload( + serving_container_image_uri=container_image_uri, + artifact_uri=saved_model_dir, + display_name=display_name, + description=description, + instance_schema_uri=instance_schema_uri, + parameters_schema_uri=parameters_schema_uri, + prediction_schema_uri=prediction_schema_uri, + explanation_metadata=explanation_metadata, + explanation_parameters=explanation_parameters, + project=project, + location=location, + credentials=credentials, + labels=labels, + encryption_spec_key_name=encryption_spec_key_name, + staging_bucket=staging_bucket, + sync=sync, + ) diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py new file mode 100644 index 0000000000..b7fc2d9291 --- /dev/null +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- + +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime +import glob +import logging +import pathlib +from typing import Optional + +from google.auth import credentials as auth_credentials +from google.cloud import storage + +from google.cloud.aiplatform import initializer + + +_logger = logging.getLogger(__name__) + + +def upload_to_gcs( + source_path: str, + destination_uri: str, + project: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, +): + """Uploads local files to GCS. + + After upload the `destination_uri` will contain the same data as the `source_path`. + + Args: + source_path: Required. Path of the local data to copy to GCS. + destination_uri: Required. GCS URI where the data should be uploaded. + project: Optional. Google Cloud Project that contains the staging bucket. + credentials: The custom credentials to use when making API calls. + If not provided, default credentials will be used. + + Raises: + RuntimeError: When source_path does not exist. + GoogleCloudError: When the upload process fails. + """ + source_path_obj = pathlib.Path(source_path) + if not source_path_obj.exists(): + raise RuntimeError(f"Source path does not exist: {source_path}") + + project = project or initializer.global_config.project + credentials = credentials or initializer.global_config.credentials + + storage_client = storage.Client(project=project, credentials=credentials) + if source_path_obj.is_dir(): + source_file_paths = glob.glob( + pathname=str(source_path_obj / "**"), recursive=True + ) + for source_file_path in source_file_paths: + source_file_path_obj = pathlib.Path(source_file_path) + if source_file_path_obj.is_dir(): + continue + source_file_relative_path_obj = source_file_path_obj.relative_to( + source_path_obj + ) + source_file_relative_posix_path = source_file_relative_path_obj.as_posix() + destination_file_uri = ( + destination_uri.rstrip("/") + "/" + source_file_relative_posix_path + ) + _logger.debug(f'Uploading "{source_file_path}" to "{destination_file_uri}"') + destination_blob = storage.Blob.from_string( + destination_file_uri, client=storage_client + ) + destination_blob.upload_from_filename(filename=source_file_path) + else: + source_file_path = source_path + destination_file_uri = destination_uri + _logger.debug(f'Uploading "{source_file_path}" to "{destination_file_uri}"') + destination_blob = storage.Blob.from_string( + destination_file_uri, client=storage_client + ) + destination_blob.upload_from_filename(filename=source_file_path) + + +def stage_local_data_in_gcs( + data_path: str, + staging_gcs_dir: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, +) -> str: + """Stages a local data in GCS. + + The file copied to GCS is the name of the local file prepended with an + "aiplatform-{timestamp}-" string. + + Args: + data_path: Required. Path of the local data to copy to GCS. + staging_gcs_dir: + Optional. Google Cloud Storage bucket to be used for data staging. + project: Optional. Google Cloud Project that contains the staging bucket. + location: Optional. Google Cloud location to use for the staging bucket. + credentials: The custom credentials to use when making API calls. + If not provided, default credentials will be used. + + Returns: + Google Cloud Storage URI of the staged data. + + Raises: + RuntimeError: When source_path does not exist. + GoogleCloudError: When the upload process fails. + """ + data_path_obj = pathlib.Path(data_path) + + if not data_path_obj.exists(): + raise RuntimeError(f"Local data does not exist: data_path='{data_path}'") + + staging_gcs_dir = staging_gcs_dir or initializer.global_config.staging_bucket + if not staging_gcs_dir: + project = project or initializer.global_config.project + location = location or initializer.global_config.location + credentials = credentials or initializer.global_config.credentials + # Creating the bucket if it does not exist. + # Currently we only do this when staging_gcs_dir is not specified. + # The buckets that we create are regional. + # This prevents errors when some service required regional bucket. + # E.g. "FailedPrecondition: 400 The Cloud Storage bucket of `gs://...` is in location `us`. It must be in the same regional location as the service location `us-central1`." + # We are making the bucket name region-specific since the bucket is regional. + staging_bucket_name = project + "-vertex-staging-" + location + client = storage.Client(project=project, credentials=credentials) + staging_bucket = storage.Bucket(client=client, name=staging_bucket_name) + if not staging_bucket.exists(): + _logger.info(f'Creating staging GCS bucket "{staging_bucket_name}"') + staging_bucket = client.create_bucket( + bucket_or_name=staging_bucket, project=project, location=location, + ) + staging_gcs_dir = "gs://" + staging_bucket_name + + timestamp = datetime.datetime.now().isoformat(sep="-", timespec="milliseconds") + staging_gcs_subdir = ( + staging_gcs_dir.rstrip("/") + "/vertex_ai_auto_staging/" + timestamp + ) + + staged_data_uri = staging_gcs_subdir + if data_path_obj.is_file(): + staged_data_uri = staging_gcs_subdir + "/" + data_path_obj.name + + _logger.info(f'Uploading "{data_path}" to "{staged_data_uri}"') + upload_to_gcs( + source_path=data_path, + destination_uri=staged_data_uri, + project=project, + credentials=credentials, + ) + + return staged_data_uri diff --git a/tests/system/aiplatform/test_model_upload.py b/tests/system/aiplatform/test_model_upload.py new file mode 100644 index 0000000000..90816b3cb6 --- /dev/null +++ b/tests/system/aiplatform/test_model_upload.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- + +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import tempfile + +import pytest + +from google import auth as google_auth +from google.cloud import aiplatform +from google.cloud import storage + +from tests.system.aiplatform import e2e_base + +# TODO(vinnys): Replace with env var `BUILD_SPECIFIC_GCP_PROJECT` once supported +_, _TEST_PROJECT = google_auth.default() +_TEST_LOCATION = "us-central1" + +_XGBOOST_MODEL_URI = "gs://ucaip-test-us-central1/models/iris_xgboost/model.bst" + + +@pytest.mark.usefixtures("delete_staging_bucket", "teardown") +class TestModel(e2e_base.TestEndToEnd): + _temp_prefix = f"{_TEST_PROJECT}-vertex-staging-{_TEST_LOCATION}" + + def test_upload_and_deploy_xgboost_model(self, shared_state): + """Upload XGBoost model from local file and deploy it for prediction.""" + + aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) + + storage_client = storage.Client(project=_TEST_PROJECT) + model_blob = storage.Blob.from_string( + uri=_XGBOOST_MODEL_URI, client=storage_client + ) + model_path = tempfile.mktemp() + ".my_model.xgb" + model_blob.download_to_filename(filename=model_path) + + model = aiplatform.Model.upload_xgboost_model_file(model_file_path=model_path,) + shared_state["resources"] = [model] + + staging_bucket = storage.Blob.from_string( + uri=model.uri, client=storage_client + ).bucket + # Checking that the bucket is auto-generated + assert "-vertex-staging-" in staging_bucket.name + + shared_state["bucket"] = staging_bucket + + # Currently we need to explicitly specify machine type. + # See https://github.com/googleapis/python-aiplatform/issues/773 + endpoint = model.deploy(machine_type="n1-standard-2") + shared_state["resources"].append(endpoint) + predict_response = endpoint.predict(instances=[[0, 0, 0]]) + assert len(predict_response.predictions) == 1 diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index 0c9a649ad8..d7cd694223 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -17,6 +17,7 @@ import importlib from concurrent import futures +import pathlib import pytest from unittest import mock from unittest.mock import patch @@ -423,6 +424,16 @@ def create_client_mock(): yield create_client_mock +@pytest.fixture +def mock_storage_blob_upload_from_filename(): + with patch( + "google.cloud.storage.Blob.upload_from_filename" + ) as mock_blob_upload_from_filename, patch( + "google.cloud.storage.Bucket.exists", return_value=True + ): + yield mock_blob_upload_from_filename + + class TestModel: def setup_method(self): importlib.reload(initializer) @@ -1442,3 +1453,213 @@ def test_get_and_return_subclass_not_found(self): fr"{_TEST_PIPELINE_RESOURCE_NAME}" ) ) + + @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "model_file_name", + ["my_model.xgb", "my_model.pkl", "my_model.joblib", "my_model.bst"], + ) + def test_upload_xgboost_model_file_uploads_and_gets_model( + self, + tmp_path: pathlib.Path, + model_file_name: str, + mock_storage_blob_upload_from_filename, + upload_model_mock, + get_model_mock, + sync: bool, + ): + model_file_path = tmp_path / model_file_name + model_file_path.touch() + + my_model = models.Model.upload_xgboost_model_file( + model_file_path=str(model_file_path), + xgboost_version="1.4", + display_name=_TEST_MODEL_NAME, + project=_TEST_PROJECT, + location=_TEST_LOCATION, + sync=sync, + ) + + if not sync: + my_model.wait() + + upload_model_mock.assert_called_once() + upload_model_call_kwargs = upload_model_mock.call_args.kwargs + upload_model_model = upload_model_call_kwargs["model"] + + # Verifying the container image selection + assert ( + upload_model_model.container_spec.image_uri + == "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest" + ) + + # Verifying the staging bucket name generation + assert upload_model_model.artifact_uri.startswith( + f"gs://{_TEST_PROJECT}-vertex-staging-{_TEST_LOCATION}" + ) + assert "/vertex_ai_auto_staging/" in upload_model_model.artifact_uri + + # Verifying that the model was renamed to a file name that is acceptable for Model.upload + staged_model_file_path = mock_storage_blob_upload_from_filename.call_args.kwargs[ + "filename" + ] + staged_model_file_name = staged_model_file_path.split("/")[-1] + assert staged_model_file_name in ["model.bst", "model.pkl", "model.joblib"] + + @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "model_file_name", + [ + "model.bst", + "model.pkl", + "model.joblib", + "saved_model.pb", + "saved_model.pbtxt", + ], + ) + def test_upload_stages_data_uploads_and_gets_model( + self, + tmp_path: pathlib.Path, + model_file_name: str, + mock_storage_blob_upload_from_filename, + upload_model_mock, + get_model_mock, + sync: bool, + ): + model_file_path = tmp_path / model_file_name + model_file_path.touch() + + my_model = models.Model.upload( + artifact_uri=str(tmp_path), + serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest", + display_name=_TEST_MODEL_NAME, + project=_TEST_PROJECT, + location=_TEST_LOCATION, + sync=sync, + ) + + if not sync: + my_model.wait() + + upload_model_mock.assert_called_once() + upload_model_call_kwargs = upload_model_mock.call_args.kwargs + upload_model_model = upload_model_call_kwargs["model"] + + # Verifying the staging bucket name generation + assert upload_model_model.artifact_uri.startswith( + f"gs://{_TEST_PROJECT}-vertex-staging-{_TEST_LOCATION}" + ) + assert "/vertex_ai_auto_staging/" in upload_model_model.artifact_uri + + # Verifying that the model was renamed to a file name that is acceptable for Model.upload + staged_model_file_path = mock_storage_blob_upload_from_filename.call_args.kwargs[ + "filename" + ] + staged_model_file_name = staged_model_file_path.split("/")[-1] + assert staged_model_file_name in [ + "model.bst", + "model.pkl", + "model.joblib", + "saved_model.pb", + "saved_model.pbtxt", + ] + + @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "model_file_name", ["my_model.pkl", "my_model.joblib"], + ) + def test_upload_scikit_learn_model_file_uploads_and_gets_model( + self, + tmp_path: pathlib.Path, + model_file_name: str, + mock_storage_blob_upload_from_filename, + upload_model_mock, + get_model_mock, + sync: bool, + ): + model_file_path = tmp_path / model_file_name + model_file_path.touch() + + my_model = models.Model.upload_scikit_learn_model_file( + model_file_path=str(model_file_path), + sklearn_version="0.24", + display_name=_TEST_MODEL_NAME, + project=_TEST_PROJECT, + location=_TEST_LOCATION, + sync=sync, + ) + + if not sync: + my_model.wait() + + upload_model_mock.assert_called_once() + upload_model_call_kwargs = upload_model_mock.call_args.kwargs + upload_model_model = upload_model_call_kwargs["model"] + + # Verifying the container image selection + assert ( + upload_model_model.container_spec.image_uri + == "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest" + ) + + # Verifying the staging bucket name generation + assert upload_model_model.artifact_uri.startswith( + f"gs://{_TEST_PROJECT}-vertex-staging-{_TEST_LOCATION}" + ) + assert "/vertex_ai_auto_staging/" in upload_model_model.artifact_uri + + # Verifying that the model was renamed to a file name that is acceptable for Model.upload + staged_model_file_path = mock_storage_blob_upload_from_filename.call_args.kwargs[ + "filename" + ] + staged_model_file_name = staged_model_file_path.split("/")[-1] + assert staged_model_file_name in ["model.pkl", "model.joblib"] + + @pytest.mark.parametrize("sync", [True, False]) + def test_upload_tensorflow_saved_model_uploads_and_gets_model( + self, + tmp_path: pathlib.Path, + mock_storage_blob_upload_from_filename, + upload_model_mock, + get_model_mock, + sync: bool, + ): + saved_model_dir = tmp_path / "saved_model" + saved_model_dir.mkdir() + (saved_model_dir / "saved_model.pb").touch() + + my_model = models.Model.upload_tensorflow_saved_model( + saved_model_dir=str(saved_model_dir), + tensorflow_version="2.6", + use_gpu=True, + display_name=_TEST_MODEL_NAME, + project=_TEST_PROJECT, + location=_TEST_LOCATION, + sync=sync, + ) + + if not sync: + my_model.wait() + + upload_model_mock.assert_called_once() + upload_model_call_kwargs = upload_model_mock.call_args.kwargs + upload_model_model = upload_model_call_kwargs["model"] + + # Verifying the container image selection + assert ( + upload_model_model.container_spec.image_uri + == "us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest" + ) + + # Verifying the staging bucket name generation + assert upload_model_model.artifact_uri.startswith( + f"gs://{_TEST_PROJECT}-vertex-staging-{_TEST_LOCATION}" + ) + assert "/vertex_ai_auto_staging/" in upload_model_model.artifact_uri + + # Verifying that the model files were uploaded + staged_model_file_path = mock_storage_blob_upload_from_filename.call_args.kwargs[ + "filename" + ] + staged_model_file_name = staged_model_file_path.split("/")[-1] + assert staged_model_file_name in ["saved_model.pb", "saved_model.pbtxt"]