From aab9e589426331bfe7ac3f6efa97109e0bd0db0d Mon Sep 17 00:00:00 2001 From: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> Date: Tue, 18 May 2021 11:01:22 -0400 Subject: [PATCH] feat: add custom and hp tuning (#388) --- google/cloud/aiplatform/__init__.py | 12 +- google/cloud/aiplatform/base.py | 28 + google/cloud/aiplatform/compat/__init__.py | 2 + .../cloud/aiplatform/compat/types/__init__.py | 2 + .../cloud/aiplatform/hyperparameter_tuning.py | 215 ++++++ google/cloud/aiplatform/jobs.py | 713 +++++++++++++++++- google/cloud/aiplatform/training_jobs.py | 515 +------------ google/cloud/aiplatform/training_utils.py | 105 --- .../{utils.py => utils/__init__.py} | 67 ++ google/cloud/aiplatform/utils/source_utils.py | 233 ++++++ .../aiplatform/utils/worker_spec_utils.py | 199 +++++ tests/unit/aiplatform/test_custom_job.py | 323 ++++++++ tests/unit/aiplatform/test_end_to_end.py | 46 +- .../test_hyperparameter_tuning_job.py | 368 +++++++++ tests/unit/aiplatform/test_training_jobs.py | 575 +++++++------- tests/unit/aiplatform/test_training_utils.py | 144 ---- 16 files changed, 2501 insertions(+), 1046 deletions(-) create mode 100644 google/cloud/aiplatform/hyperparameter_tuning.py delete mode 100644 google/cloud/aiplatform/training_utils.py rename google/cloud/aiplatform/{utils.py => utils/__init__.py} (87%) create mode 100644 google/cloud/aiplatform/utils/source_utils.py create mode 100644 google/cloud/aiplatform/utils/worker_spec_utils.py create mode 100644 tests/unit/aiplatform/test_custom_job.py create mode 100644 tests/unit/aiplatform/test_hyperparameter_tuning_job.py delete mode 100644 tests/unit/aiplatform/test_training_utils.py diff --git a/google/cloud/aiplatform/__init__.py b/google/cloud/aiplatform/__init__.py index e56e57a2ad..6aa8f64161 100644 --- a/google/cloud/aiplatform/__init__.py +++ b/google/cloud/aiplatform/__init__.py @@ -26,9 +26,15 @@ TimeSeriesDataset, VideoDataset, ) +from google.cloud.aiplatform import hyperparameter_tuning +from google.cloud.aiplatform.metadata import metadata from google.cloud.aiplatform.models import Endpoint from google.cloud.aiplatform.models import Model -from google.cloud.aiplatform.jobs import BatchPredictionJob +from google.cloud.aiplatform.jobs import ( + BatchPredictionJob, + CustomJob, + HyperparameterTuningJob, +) from google.cloud.aiplatform.training_jobs import ( CustomTrainingJob, CustomContainerTrainingJob, @@ -39,7 +45,6 @@ AutoMLTextTrainingJob, AutoMLVideoTrainingJob, ) -from google.cloud.aiplatform.metadata import metadata """ Usage: @@ -60,6 +65,7 @@ "explain", "gapic", "init", + "hyperparameter_tuning", "log_params", "log_metrics", "get_experiment_df", @@ -71,11 +77,13 @@ "AutoMLTextTrainingJob", "AutoMLVideoTrainingJob", "BatchPredictionJob", + "CustomJob", "CustomTrainingJob", "CustomContainerTrainingJob", "CustomPythonPackageTrainingJob", "Endpoint", "ImageDataset", + "HyperparameterTuningJob", "Model", "TabularDataset", "TextDataset", diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index f46db9c47e..a9fcef24bd 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -101,6 +101,29 @@ def log_create_complete( f"{variable_name} = aiplatform.{cls.__name__}('{resource.name}')" ) + def log_create_complete_with_getter( + self, + cls: Type["AiPlatformResourceNoun"], + resource: proto.Message, + variable_name: str, + ): + """Logs create event is complete. + + Will also include code snippet to instantiate resource in SDK. + + Args: + cls (AiPlatformResourceNoun): + AI Platform Resource Noun class that is being created. + resource (proto.Message): + AI Platform Resourc proto.Message + variable_name (str): Name of variable to use for code snippet + """ + self._logger.info(f"{cls.__name__} created. Resource name: {resource.name}") + self._logger.info(f"To use this {cls.__name__} in another session:") + self._logger.info( + f"{variable_name} = aiplatform.{cls.__name__}.get('{resource.name}')" + ) + def log_action_start_against_resource( self, action: str, noun: str, resource_noun_obj: "AiPlatformResourceNoun" ): @@ -543,6 +566,11 @@ def update_time(self) -> datetime.datetime: self._sync_gca_resource() return self._gca_resource.update_time + @property + def gca_resource(self) -> proto.Message: + """The underlying resource proto represenation.""" + return self._gca_resource + def __repr__(self) -> str: return f"{object.__repr__(self)} \nresource name: {self.resource_name}" diff --git a/google/cloud/aiplatform/compat/__init__.py b/google/cloud/aiplatform/compat/__init__.py index 980c554fe1..55a72fea16 100644 --- a/google/cloud/aiplatform/compat/__init__.py +++ b/google/cloud/aiplatform/compat/__init__.py @@ -70,6 +70,7 @@ types.prediction_service = types.prediction_service_v1beta1 types.specialist_pool = types.specialist_pool_v1beta1 types.specialist_pool_service = types.specialist_pool_service_v1beta1 + types.study = types.study_v1beta1 types.training_pipeline = types.training_pipeline_v1beta1 types.metadata_service = types.metadata_service_v1beta1 types.tensorboard_service = types.tensorboard_service_v1beta1 @@ -120,6 +121,7 @@ types.prediction_service = types.prediction_service_v1 types.specialist_pool = types.specialist_pool_v1 types.specialist_pool_service = types.specialist_pool_service_v1 + types.study = types.study_v1 types.training_pipeline = types.training_pipeline_v1 __all__ = ( diff --git a/google/cloud/aiplatform/compat/types/__init__.py b/google/cloud/aiplatform/compat/types/__init__.py index f45bb2e11e..7bd512e7e8 100644 --- a/google/cloud/aiplatform/compat/types/__init__.py +++ b/google/cloud/aiplatform/compat/types/__init__.py @@ -49,6 +49,7 @@ prediction_service as prediction_service_v1beta1, specialist_pool as specialist_pool_v1beta1, specialist_pool_service as specialist_pool_service_v1beta1, + study as study_v1beta1, training_pipeline as training_pipeline_v1beta1, metadata_service as metadata_service_v1beta1, tensorboard_service as tensorboard_service_v1beta1, @@ -90,6 +91,7 @@ prediction_service as prediction_service_v1, specialist_pool as specialist_pool_v1, specialist_pool_service as specialist_pool_service_v1, + study as study_v1, training_pipeline as training_pipeline_v1, ) diff --git a/google/cloud/aiplatform/hyperparameter_tuning.py b/google/cloud/aiplatform/hyperparameter_tuning.py new file mode 100644 index 0000000000..a7a0e641cd --- /dev/null +++ b/google/cloud/aiplatform/hyperparameter_tuning.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- + +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc +from typing import Dict, List, Optional, Sequence, Tuple, Union + +import proto + +from google.cloud.aiplatform.compat.types import study as gca_study_compat + +_SCALE_TYPE_MAP = { + "linear": gca_study_compat.StudySpec.ParameterSpec.ScaleType.UNIT_LINEAR_SCALE, + "log": gca_study_compat.StudySpec.ParameterSpec.ScaleType.UNIT_LOG_SCALE, + "reverse_log": gca_study_compat.StudySpec.ParameterSpec.ScaleType.UNIT_REVERSE_LOG_SCALE, + "unspecified": gca_study_compat.StudySpec.ParameterSpec.ScaleType.SCALE_TYPE_UNSPECIFIED, +} + + +class _ParameterSpec(metaclass=abc.ABCMeta): + """Base class represents a single parameter to optimize.""" + + def __init__( + self, + conditional_parameter_spec: Optional[Dict[str, "_ParameterSpec"]] = None, + parent_values: Optional[List[Union[float, int, str]]] = None, + ): + + self.conditional_parameter_spec = conditional_parameter_spec + self.parent_values = parent_values + + @property + @classmethod + @abc.abstractmethod + def _proto_parameter_value_class(self) -> proto.Message: + """The proto representation of this parameter.""" + pass + + @property + @classmethod + @abc.abstractmethod + def _parameter_value_map(self) -> Tuple[Tuple[str, str]]: + """A Tuple map of parameter key to underlying proto key.""" + pass + + @property + @classmethod + @abc.abstractmethod + def _parameter_spec_value_key(self) -> Tuple[Tuple[str, str]]: + """The ParameterSpec key this parameter should be assigned.""" + pass + + @property + def _proto_parameter_value_spec(self) -> proto.Message: + """Converts this parameter to it's parameter value representation.""" + proto_parameter_value_spec = self._proto_parameter_value_class() + for self_attr_key, proto_attr_key in self._parameter_value_map: + setattr( + proto_parameter_value_spec, proto_attr_key, getattr(self, self_attr_key) + ) + return proto_parameter_value_spec + + def _to_parameter_spec( + self, parameter_id: str + ) -> gca_study_compat.StudySpec.ParameterSpec: + """Converts this parameter to ParameterSpec.""" + # TODO: Conditional parameters + parameter_spec = gca_study_compat.StudySpec.ParameterSpec( + parameter_id=parameter_id, + scale_type=_SCALE_TYPE_MAP.get(getattr(self, "scale", "unspecified")), + ) + + setattr( + parameter_spec, + self._parameter_spec_value_key, + self._proto_parameter_value_spec, + ) + + return parameter_spec + + +class DoubleParameterSpec(_ParameterSpec): + + _proto_parameter_value_class = ( + gca_study_compat.StudySpec.ParameterSpec.DoubleValueSpec + ) + _parameter_value_map = (("min", "min_value"), ("max", "max_value")) + _parameter_spec_value_key = "double_value_spec" + + def __init__( + self, min: float, max: float, scale: str, + ): + """ + Value specification for a parameter in ``DOUBLE`` type. + + Args: + min (float): + Required. Inclusive minimum value of the + parameter. + max (float): + Required. Inclusive maximum value of the + parameter. + scale (str): + Required. The type of scaling that should be applied to this parameter. + + Accepts: 'linear', 'log', 'reverse_log' + """ + + super().__init__() + + self.min = min + self.max = max + self.scale = scale + + +class IntegerParameterSpec(_ParameterSpec): + + _proto_parameter_value_class = ( + gca_study_compat.StudySpec.ParameterSpec.IntegerValueSpec + ) + _parameter_value_map = (("min", "min_value"), ("max", "max_value")) + _parameter_spec_value_key = "integer_value_spec" + + def __init__( + self, min: int, max: int, scale: str, + ): + """ + Value specification for a parameter in ``INTEGER`` type. + + Args: + min (float): + Required. Inclusive minimum value of the + parameter. + max (float): + Required. Inclusive maximum value of the + parameter. + scale (str): + Required. The type of scaling that should be applied to this parameter. + + Accepts: 'linear', 'log', 'reverse_log' + """ + + super().__init__() + + self.min = min + self.max = max + self.scale = scale + + +class CategoricalParameterSpec(_ParameterSpec): + + _proto_parameter_value_class = ( + gca_study_compat.StudySpec.ParameterSpec.CategoricalValueSpec + ) + _parameter_value_map = (("values", "values"),) + _parameter_spec_value_key = "categorical_value_spec" + + def __init__( + self, values: Sequence[str], + ): + """Value specification for a parameter in ``CATEGORICAL`` type. + + Args: + values (Sequence[str]): + Required. The list of possible categories. + """ + + super().__init__() + + self.values = values + + +class DiscreteParameterSpec(_ParameterSpec): + + _proto_parameter_value_class = ( + gca_study_compat.StudySpec.ParameterSpec.DiscreteValueSpec + ) + _parameter_value_map = (("values", "values"),) + _parameter_spec_value_key = "discrete_value_spec" + + def __init__( + self, values: Sequence[float], scale: str, + ): + """Value specification for a parameter in ``DISCRETE`` type. + + values (Sequence[float]): + Required. A list of possible values. + The list should be in increasing order and at + least 1e-10 apart. For instance, this parameter + might have possible settings of 1.5, 2.5, and + 4.0. This list should not contain more than + 1,000 values. + scale (str): + Required. The type of scaling that should be applied to this parameter. + + Accepts: 'linear', 'log', 'reverse_log' + """ + + super().__init__() + + self.values = values + self.scale = scale diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index ee6d46dde9..7b1f5cccc5 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -18,6 +18,7 @@ from typing import Iterable, Optional, Union, Sequence, Dict, List import abc +import copy import sys import time import logging @@ -26,25 +27,32 @@ from google.cloud import bigquery from google.auth import credentials as auth_credentials +from google.protobuf import duration_pb2 # type: ignore from google.cloud import aiplatform from google.cloud.aiplatform import base -from google.cloud.aiplatform import initializer from google.cloud.aiplatform import compat from google.cloud.aiplatform import constants +from google.cloud.aiplatform import initializer +from google.cloud.aiplatform import hyperparameter_tuning from google.cloud.aiplatform import utils +from google.cloud.aiplatform.utils import source_utils +from google.cloud.aiplatform.utils import worker_spec_utils from google.cloud.aiplatform.compat.services import job_service_client from google.cloud.aiplatform.compat.types import ( - io as gca_io_compat, - io_v1beta1 as gca_io_v1beta1, - job_state as gca_job_state, batch_prediction_job as gca_bp_job_compat, batch_prediction_job_v1 as gca_bp_job_v1, batch_prediction_job_v1beta1 as gca_bp_job_v1beta1, + custom_job as gca_custom_job_compat, + explanation_v1beta1 as gca_explanation_v1beta1, + io as gca_io_compat, + io_v1beta1 as gca_io_v1beta1, + job_state as gca_job_state, + hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat, machine_resources as gca_machine_resources_compat, machine_resources_v1beta1 as gca_machine_resources_v1beta1, - explanation_v1beta1 as gca_explanation_v1beta1, + study as gca_study_compat, ) logging.basicConfig(level=logging.INFO, stream=sys.stdout) @@ -173,15 +181,23 @@ def _block_until_complete(self): ) ) log_wait = min(log_wait * multiplier, max_wait) - previous_time = current_time + previous_time = current_time time.sleep(wait) - _LOGGER.log_action_completed_against_resource("", "run", self) - + _LOGGER.info( + "%s %s current state:\n%s" + % ( + self.__class__.__name__, + self._gca_resource.name, + self._gca_resource.state, + ) + ) # Error is only populated when the job state is # JOB_STATE_FAILED or JOB_STATE_CANCELLED. - if self.state in _JOB_ERROR_STATES: + if self._gca_resource.state in _JOB_ERROR_STATES: raise RuntimeError("Job failed with:\n%s" % self._gca_resource.error) + else: + _LOGGER.log_action_completed_against_resource("run", "completed", self) @classmethod def list( @@ -768,14 +784,89 @@ def iter_outputs( ) -class CustomJob(_Job): - _resource_noun = "customJobs" - _getter_method = "get_custom_job" - _list_method = "list_custom_job" - _cancel_method = "cancel_custom_job" - _delete_method = "delete_custom_job" - _job_type = "training" - pass +class _RunnableJob(_Job): + """ABC to interface job as a runnable training class.""" + + def __init__( + self, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ): + """Initializes job with project, location, and api_client. + + Args: + project(str): Project of the resource noun. + location(str): The location of the resource noun. + credentials(google.auth.crendentials.Crendentials): Optional custom + credentials to use when accessing interacting with resource noun. + """ + + base.AiPlatformResourceNounWithFutureManager.__init__( + self, project=project, location=location, credentials=credentials + ) + + self._parent = aiplatform.initializer.global_config.common_location_path( + project=project, location=location + ) + + @abc.abstractmethod + def run(self) -> None: + pass + + @property + def _has_run(self) -> bool: + """Property returns true if this class has a resource name.""" + return bool(self._gca_resource.name) + + @property + def state(self) -> gca_job_state.JobState: + """Current state of job. + + Raises: + RuntimeError if job run has not been called. + """ + if not self._has_run: + raise RuntimeError("Job has not run. No state available.") + + return super().state + + @classmethod + def get( + cls, + resource_name: str, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> "_RunnableJob": + """Get an AI Platform Job for the given resource_name. + + Args: + resource_name (str): + Required. A fully-qualified resource name or ID. + project (str): + Optional project to retrieve dataset from. If not set, project + set in aiplatform.init will be used. + location (str): + Optional location to retrieve dataset from. If not set, location + set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Custom credentials to use to upload this model. Overrides + credentials set in aiplatform.init. + + Returns: + An AI Platform Job. + """ + self = cls._empty_constructor( + project=project, + location=location, + credentials=credentials, + resource_name=resource_name, + ) + + self._gca_resource = self._get_gca_resource(resource_name=resource_name) + + return self class DataLabelingJob(_Job): @@ -788,10 +879,594 @@ class DataLabelingJob(_Job): pass -class HyperparameterTuningJob(_Job): +class CustomJob(_RunnableJob): + """AI Platform (Unified) Custom Job.""" + + _resource_noun = "customJobs" + _getter_method = "get_custom_job" + _list_method = "list_custom_job" + _cancel_method = "cancel_custom_job" + _delete_method = "delete_custom_job" + _job_type = "training" + + def __init__( + self, + display_name: str, + worker_pool_specs: Union[List[Dict], List[aiplatform.gapic.WorkerPoolSpec]], + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + encryption_spec_key_name: Optional[str] = None, + staging_bucket: Optional[str] = None, + ): + """Cosntruct a Custom Job with Worker Pool Specs. + + ``` + Example usage: + worker_pool_specs = [ + { + "machine_spec": { + "machine_type": "n1-standard-4", + "accelerator_type": "NVIDIA_TESLA_K80", + "accelerator_count": 1, + }, + "replica_count": 1, + "container_spec": { + "image_uri": container_image_uri, + "command": [], + "args": [], + }, + } + ] + + my_job = aiplatform.CustomJob( + display_name='my_job', + worker_pool_specs=worker_pool_specs + ) + + my_job.run() + ``` + + + For more information on configuring worker pool specs please visit: + https://cloud.google.com/ai-platform-unified/docs/training/create-custom-job + + + Args: + display_name (str): + Required. The user-defined name of the HyperparameterTuningJob. + The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + worker_pool_specs (Union[List[Dict], List[aiplatform.gapic.WorkerPoolSpec]]): + Required. The spec of the worker pools including machine type and Docker image. + Can provided as a list of dictionaries or list of WorkerPoolSpec proto messages. + project (str): + Optional.Project to run the custom job in. Overrides project set in aiplatform.init. + location (str): + Optional.Location to run the custom job in. Overrides location set in aiplatform.init. + credentials (auth_credentials.Credentials): + Optional.Custom credentials to use to run call custom job service. Overrides + credentials set in aiplatform.init. + encryption_spec_key_name (str): + Optional.Customer-managed encryption key name for a + CustomJob. If this is set, then all resources + created by the CustomJob will be encrypted with + the provided encryption key. + staging_bucket (str): + Optional. Bucket for produced custom job artifacts. Overrides + staging_bucket set in aiplatform.init. + + Raises: + RuntimeError is not staging bucket was set using aiplatfrom.init and a staging + bucket was not passed in. + """ + + super().__init__(project=project, location=location, credentials=credentials) + + staging_bucket = staging_bucket or initializer.global_config.staging_bucket + + if not staging_bucket: + raise RuntimeError( + "staging_bucket should be passed to CustomJob constructor or " + "should be set using aiplatform.init(staging_bucket='gs://my-bucket')" + ) + + self._gca_resource = gca_custom_job_compat.CustomJob( + display_name=display_name, + job_spec=gca_custom_job_compat.CustomJobSpec( + worker_pool_specs=worker_pool_specs, + base_output_directory=gca_io_compat.GcsDestination( + output_uri_prefix=staging_bucket + ), + ), + encryption_spec=initializer.global_config.get_encryption_spec( + encryption_spec_key_name=encryption_spec_key_name + ), + ) + + @classmethod + def from_local_script( + cls, + display_name: str, + script_path: str, + container_uri: str, + args: Optional[List[Union[str, float, int]]] = None, + requirements: Optional[Sequence[str]] = None, + environment_variables: Optional[Dict[str, str]] = None, + replica_count: int = 1, + machine_type: str = "n1-standard-4", + accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", + accelerator_count: int = 0, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + encryption_spec_key_name: Optional[str] = None, + staging_bucket: Optional[str] = None, + ) -> "CustomJob": + """Configures a custom job from a local script. + + Example usage: + ``` + job = aiplatform.CustomJob.from_local_script( + display_name="my-custom-job", + script_path="training_script.py", + container_uri="gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest", + requirements=["gcsfs==0.7.1"], + replica_count=1, + args=['--dataset', 'gs://my-bucket/my-dataset', + '--model_output_uri', 'gs://my-bucket/model'] + ) + + job.run() + ``` + + Args: + display_name (str): + Required. The user-defined name of this CustomJob. + script_path (str): + Required. Local path to training script. + container_uri (str): + Required: Uri of the training container image to use for custom job. + args (Optional[List[Union[str, float, int]]]): + Optional. Command line arguments to be passed to the Python task. + requirements (Sequence[str]): + Optional. List of python packages dependencies of script. + environment_variables (Dict[str, str]): + Optional. Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } + replica_count (int): + Optional. The number of worker replicas. If replica count = 1 then one chief + replica will be provisioned. If replica_count > 1 the remainder will be + provisioned as a worker replica pool. + machine_type (str): + Optional. The type of machine to use for training. + accelerator_type (str): + Optional. Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED, + NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, + NVIDIA_TESLA_T4 + accelerator_count (int): + Optional. The number of accelerators to attach to a worker replica. + project (str): + Optional. Project to run the custom job in. Overrides project set in aiplatform.init. + location (str): + Optional. Location to run the custom job in. Overrides location set in aiplatform.init. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to run call custom job service. Overrides + credentials set in aiplatform.init. + encryption_spec_key_name (str): + Optional. Customer-managed encryption key name for a + CustomJob. If this is set, then all resources + created by the CustomJob will be encrypted with + the provided encryption key. + staging_bucket (str): + Optional. Bucket for produced custom job artifacts. Overrides + staging_bucket set in aiplatform.init. + + Raises: + RuntimeError is not staging bucket was set using aiplatfrom.init and a staging + bucket was not passed in. + """ + + project = project or initializer.global_config.project + location = location or initializer.global_config.location + staging_bucket = staging_bucket or initializer.global_config.staging_bucket + + if not staging_bucket: + raise RuntimeError( + "staging_bucket should be passed to CustomJob.from_local_script or " + "should be set using aiplatform.init(staging_bucket='gs://my-bucket')" + ) + + worker_pool_specs = worker_spec_utils._DistributedTrainingSpec.chief_worker_pool( + replica_count=replica_count, + machine_type=machine_type, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type, + ).pool_specs + + python_packager = source_utils._TrainingScriptPythonPackager( + script_path=script_path, requirements=requirements + ) + + package_gcs_uri = python_packager.package_and_copy_to_gcs( + gcs_staging_dir=staging_bucket, project=project, credentials=credentials, + ) + + for spec in worker_pool_specs: + spec["python_package_spec"] = { + "executor_image_uri": container_uri, + "python_module": python_packager.module_name, + "package_uris": [package_gcs_uri], + } + + if args: + spec["python_package_spec"]["args"] = args + + if environment_variables: + spec["python_package_spec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] + + return cls( + display_name=display_name, + worker_pool_specs=worker_pool_specs, + project=project, + location=location, + credentials=credentials, + encryption_spec_key_name=encryption_spec_key_name, + staging_bucket=staging_bucket, + ) + + @base.optional_sync() + def run( + self, + service_account: Optional[str] = None, + network: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, + sync: bool = True, + ) -> None: + """Run this configured CustomJob. + + Args: + service_account (str): + Optional. Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. + network (str): + Optional. The full name of the Compute Engine network to which the job + should be peered. For example, projects/12345/global/networks/myVPC. + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. + sync (bool): + Whether to execute this method synchronously. If False, this method + will unblock and it will be executed in a concurrent Future. + """ + + if service_account: + self._gca_resource.job_spec.service_account = service_account + + if network: + self._gca_resource.job_spec.network = network + + if timeout or restart_job_on_worker_restart: + timeout = duration_pb2.Duration(seconds=timeout) if timeout else None + self._gca_resource.job_spec.scheduling = gca_custom_job_compat.Scheduling( + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, + ) + + _LOGGER.log_create_with_lro(self.__class__) + + self._gca_resource = self.api_client.create_custom_job( + parent=self._parent, custom_job=self._gca_resource + ) + + _LOGGER.log_create_complete_with_getter( + self.__class__, self._gca_resource, "custom_job" + ) + + _LOGGER.info("View Custom Job:\n%s" % self._dashboard_uri()) + + self._block_until_complete() + + @property + def job_spec(self): + return self._gca_resource.job_spec + + +_SEARCH_ALGORITHM_TO_PROTO_VALUE = { + "random": gca_study_compat.StudySpec.Algorithm.RANDOM_SEARCH, + "grid": gca_study_compat.StudySpec.Algorithm.GRID_SEARCH, + None: gca_study_compat.StudySpec.Algorithm.ALGORITHM_UNSPECIFIED, +} + +_MEASUREMENT_SELECTION_TO_PROTO_VALUE = { + "best": gca_study_compat.StudySpec.MeasurementSelectionType.BEST_MEASUREMENT, + "last": gca_study_compat.StudySpec.MeasurementSelectionType.LAST_MEASUREMENT, +} + + +class HyperparameterTuningJob(_RunnableJob): + """AI Platform (Unified) Hyperparameter Tuning Job.""" + _resource_noun = "hyperparameterTuningJobs" _getter_method = "get_hyperparameter_tuning_job" _list_method = "list_hyperparameter_tuning_jobs" _cancel_method = "cancel_hyperparameter_tuning_job" _delete_method = "delete_hyperparameter_tuning_job" - pass + _job_type = "training" + + def __init__( + self, + display_name: str, + custom_job: CustomJob, + metric_spec: Dict[str, str], + parameter_spec: Dict[str, hyperparameter_tuning._ParameterSpec], + max_trial_count: int, + parallel_trial_count: int, + max_failed_trial_count: int = 0, + search_algorithm: Optional[str] = None, + measurement_selection: Optional[str] = "best", + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + encryption_spec_key_name: Optional[str] = None, + ): + """ + Configures a HyperparameterTuning Job. + + Example usage: + + ``` + from google.cloud.aiplatform import hyperparameter_tuning as hpt + + worker_pool_specs = [ + { + "machine_spec": { + "machine_type": "n1-standard-4", + "accelerator_type": "NVIDIA_TESLA_K80", + "accelerator_count": 1, + }, + "replica_count": 1, + "container_spec": { + "image_uri": container_image_uri, + "command": [], + "args": [], + }, + } + ] + + custom_job = aiplatform.CustomJob( + display_name='my_job', + worker_pool_specs=worker_pool_specs + ) + + + hp_job = aiplatform.HyperparameterTuningJob( + display_name='hp-test', + custom_job=job, + metric_spec={ + 'loss': 'minimize', + }, + parameter_spec={ + 'lr': hpt.DoubleParameterSpec(min=0.001, max=0.1, scale='log'), + 'units': hpt.IntegerParameterSpec(min=4, max=128, scale='linear'), + 'activation': hpt.CategoricalParameterSpec(values=['relu', 'selu']), + 'batch_size': hpt.DiscreteParameterSpec(values=[128, 256], scale='linear') + }, + max_trial_count=128, + parallel_trial_count=8, + ) + + hp_job.run() + + print(hp_job.trials) + ``` + + + For more information on using hyperparameter tuning please visit: + https://cloud.google.com/ai-platform-unified/docs/training/using-hyperparameter-tuning + + Args: + display_name (str): + Required. The user-defined name of the HyperparameterTuningJob. + The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + custom_job (aiplatform.CustomJob): + Required. Configured CustomJob. The worker pool spec from this custom job + applies to the CustomJobs created in all the trials. + metric_spec: Dict[str, str] + Required. Dicionary representing metrics to optimize. The dictionary key is the metric_id, + which is reported by your training job, and the dictionary value is the + optimization goal of the metric('minimize' or 'maximize'). example: + + metric_spec = {'loss': 'minimize', 'accuracy': 'maximize'} + + parameter_spec (Dict[str, hyperparameter_tuning._ParameterSpec]): + Required. Dictionary representing parameters to optimize. The dictionary key is the metric_id, + which is passed into your training job as a command line key word arguemnt, and the + dictionary value is the parameter specification of the metric. + + + from google.cloud.aiplatform import hyperparameter_tuning as hpt + + parameter_spec={ + 'decay': hpt.DoubleParameterSpec(min=1e-7, max=1, scale='linear'), + 'learning_rate': hpt.DoubleParameterSpec(min=1e-7, max=1, scale='linear') + 'batch_size': hpt.DiscreteParamterSpec(values=[4, 8, 16, 32, 64, 128], scale='linear') + } + + Supported parameter specifications can be found until aiplatform.hyperparameter_tuning. + These parameter specification are currently supported: + DoubleParameterSpec, IntegerParameterSpec, CategoricalParameterSpace, DiscreteParameterSpec + + max_trial_count (int): + Reuired. The desired total number of Trials. + parallel_trial_count (int): + Required. The desired number of Trials to run in parallel. + max_failed_trial_count (int): + Optional. The number of failed Trials that need to be + seen before failing the HyperparameterTuningJob. + If set to 0, AI Platform decides how many Trials + must fail before the whole job fails. + search_algorithm (str): + The search algorithm specified for the Study. + Accepts one of the following: + `None` - If you do not specify an algorithm, your job uses + the default AI Platform algorithm. The default algorithm + applies Bayesian optimization to arrive at the optimal + solution with a more effective search over the parameter space. + + 'grid' - A simple grid search within the feasible space. This + option is particularly useful if you want to specify a quantity + of trials that is greater than the number of points in the + feasible space. In such cases, if you do not specify a grid + search, the AI Platform default algorithm may generate duplicate + suggestions. To use grid search, all parameter specs must be + of type `IntegerParameterSpec`, `CategoricalParameterSpace`, + or `DiscreteParameterSpec`. + + 'random' - A simple random search within the feasible space. + measurement_selection (str): + This indicates which measurement to use if/when the service + automatically selects the final measurement from previously reported + intermediate measurements. + + Accepts: 'best', 'last' + + Choose this based on two considerations: + A) Do you expect your measurements to monotonically improve? If so, + choose 'last'. On the other hand, if you're in a situation + where your system can "over-train" and you expect the performance to + get better for a while but then start declining, choose + 'best'. B) Are your measurements significantly noisy + and/or irreproducible? If so, 'best' will tend to be + over-optimistic, and it may be better to choose 'last'. If + both or neither of (A) and (B) apply, it doesn't matter which + selection type is chosen. + project (str): + Optional. Project to run the HyperparameterTuningjob in. Overrides project set in aiplatform.init. + location (str): + Optional. Location to run the HyperparameterTuning in. Overrides location set in aiplatform.init. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to run call HyperparameterTuning service. Overrides + credentials set in aiplatform.init. + encryption_spec_key_name (str): + Optional. Customer-managed encryption key options for a + HyperparameterTuningJob. If this is set, then + all resources created by the + HyperparameterTuningJob will be encrypted with + the provided encryption key. + """ + super().__init__(project=project, location=location, credentials=credentials) + + metrics = [ + gca_study_compat.StudySpec.MetricSpec( + metric_id=metric_id, goal=goal.upper() + ) + for metric_id, goal in metric_spec.items() + ] + + parameters = [ + parameter._to_parameter_spec(parameter_id=parameter_id) + for parameter_id, parameter in parameter_spec.items() + ] + + study_spec = gca_study_compat.StudySpec( + metrics=metrics, + parameters=parameters, + algorithm=_SEARCH_ALGORITHM_TO_PROTO_VALUE[search_algorithm], + measurement_selection_type=_MEASUREMENT_SELECTION_TO_PROTO_VALUE[ + measurement_selection + ], + ) + + self._gca_resource = gca_hyperparameter_tuning_job_compat.HyperparameterTuningJob( + display_name=display_name, + study_spec=study_spec, + max_trial_count=max_trial_count, + parallel_trial_count=parallel_trial_count, + max_failed_trial_count=max_failed_trial_count, + trial_job_spec=copy.deepcopy(custom_job.job_spec), + encryption_spec=initializer.global_config.get_encryption_spec( + encryption_spec_key_name=encryption_spec_key_name + ), + ) + + @base.optional_sync() + def run( + self, + service_account: Optional[str] = None, + network: Optional[str] = None, + timeout: Optional[int] = None, # seconds + restart_job_on_worker_restart: bool = False, + sync: bool = True, + ) -> None: + """Run this configured CustomJob. + + Args: + service_account (str): + Optional. Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. + network (str): + Optional. The full name of the Compute Engine network to which the job + should be peered. For example, projects/12345/global/networks/myVPC. + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network. + timeout (int): + Optional. The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. + sync (bool): + Whether to execute this method synchronously. If False, this method + will unblock and it will be executed in a concurrent Future. + """ + + if service_account: + self._gca_resource.trial_job_spec.service_account = service_account + + if network: + self._gca_resource.trial_job_spec.network = network + + if timeout or restart_job_on_worker_restart: + duration = duration_pb2.Duration(seconds=timeout) if timeout else None + self._gca_resource.trial_job_spec.scheduling = gca_custom_job_compat.Scheduling( + timeout=duration, + restart_job_on_worker_restart=restart_job_on_worker_restart, + ) + + _LOGGER.log_create_with_lro(self.__class__) + + self._gca_resource = self.api_client.create_hyperparameter_tuning_job( + parent=self._parent, hyperparameter_tuning_job=self._gca_resource + ) + + _LOGGER.log_create_complete_with_getter( + self.__class__, self._gca_resource, "hpt_job" + ) + + _LOGGER.info("View HyperparameterTuningJob:\n%s" % self._dashboard_uri()) + + self._block_until_complete() + + @property + def trials(self) -> List[gca_study_compat.Trial]: + return list(self._gca_resource.trials) diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 2912806a12..470e30bf56 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -15,16 +15,8 @@ # limitations under the License. # -import datetime -import functools -import logging -import pathlib -import shutil -import subprocess -import sys -import tempfile import time -from typing import Callable, Dict, List, Optional, NamedTuple, Sequence, Tuple, Union +from typing import Dict, List, Optional, Sequence, Tuple, Union import abc @@ -38,25 +30,25 @@ from google.cloud.aiplatform import utils from google.cloud.aiplatform.compat.types import ( - accelerator_type as gca_accelerator_type, env_var as gca_env_var, io as gca_io, model as gca_model, pipeline_state as gca_pipeline_state, training_pipeline as gca_training_pipeline, ) +from google.cloud.aiplatform.utils import _timestamped_gcs_dir +from google.cloud.aiplatform.utils import source_utils +from google.cloud.aiplatform.utils import worker_spec_utils from google.cloud.aiplatform.v1.schema.trainingjob import ( definition_v1 as training_job_inputs, ) -from google.cloud import storage from google.rpc import code_pb2 import proto -logging.basicConfig(level=logging.INFO, stream=sys.stdout) _LOGGER = base.Logger(__name__) _PIPELINE_COMPLETE_STATES = set( @@ -780,449 +772,6 @@ def cancel(self) -> None: self.api_client.cancel_training_pipeline(name=self.resource_name) -def _timestamped_gcs_dir(root_gcs_path: str, dir_name_prefix: str) -> str: - """Composes a timestamped GCS directory. - - Args: - root_gcs_path: GCS path to put the timestamped directory. - dir_name_prefix: Prefix to add the timestamped directory. - Returns: - Timestamped gcs directory path in root_gcs_path. - """ - timestamp = datetime.datetime.now().isoformat(sep="-", timespec="milliseconds") - dir_name = "-".join([dir_name_prefix, timestamp]) - if root_gcs_path.endswith("/"): - root_gcs_path = root_gcs_path[:-1] - gcs_path = "/".join([root_gcs_path, dir_name]) - if not gcs_path.startswith("gs://"): - return "gs://" + gcs_path - return gcs_path - - -def _timestamped_copy_to_gcs( - local_file_path: str, - gcs_dir: str, - project: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, -) -> str: - """Copies a local file to a GCS path. - - The file copied to GCS is the name of the local file prepended with an - "aiplatform-{timestamp}-" string. - - Args: - local_file_path (str): Required. Local file to copy to GCS. - gcs_dir (str): - Required. The GCS directory to copy to. - project (str): - Project that contains the staging bucket. Default will be used if not - provided. Model Builder callers should pass this in. - credentials (auth_credentials.Credentials): - Custom credentials to use with bucket. Model Builder callers should pass - this in. - Returns: - gcs_path (str): The path of the copied file in gcs. - """ - - gcs_bucket, gcs_blob_prefix = utils.extract_bucket_and_prefix_from_gcs_path(gcs_dir) - - local_file_name = pathlib.Path(local_file_path).name - timestamp = datetime.datetime.now().isoformat(sep="-", timespec="milliseconds") - blob_path = "-".join(["aiplatform", timestamp, local_file_name]) - - if gcs_blob_prefix: - blob_path = "/".join([gcs_blob_prefix, blob_path]) - - # TODO(b/171202993) add user agent - client = storage.Client(project=project, credentials=credentials) - bucket = client.bucket(gcs_bucket) - blob = bucket.blob(blob_path) - blob.upload_from_filename(local_file_path) - - gcs_path = "".join(["gs://", "/".join([blob.bucket.name, blob.name])]) - return gcs_path - - -def _get_python_executable() -> str: - """Returns Python executable. - - Returns: - Python executable to use for setuptools packaging. - Raises: - EnvironmentError: If Python executable is not found. - """ - - python_executable = sys.executable - - if not python_executable: - raise EnvironmentError("Cannot find Python executable for packaging.") - return python_executable - - -class _TrainingScriptPythonPackager: - """Converts a Python script into Python package suitable for aiplatform - training. - - Copies the script to specified location. - - Class Attributes: - _TRAINER_FOLDER: Constant folder name to build package. - _ROOT_MODULE: Constant root name of module. - _TEST_MODULE_NAME: Constant name of module that will store script. - _SETUP_PY_VERSION: Constant version of this created python package. - _SETUP_PY_TEMPLATE: Constant template used to generate setup.py file. - _SETUP_PY_SOURCE_DISTRIBUTION_CMD: - Constant command to generate the source distribution package. - - Attributes: - script_path: local path of script to package - requirements: list of Python dependencies to add to package - - Usage: - - packager = TrainingScriptPythonPackager('my_script.py', ['pandas', 'pytorch']) - gcs_path = packager.package_and_copy_to_gcs( - gcs_staging_dir='my-bucket', - project='my-prject') - module_name = packager.module_name - - The package after installed can be executed as: - python -m aiplatform_custom_trainer_script.task - """ - - _TRAINER_FOLDER = "trainer" - _ROOT_MODULE = "aiplatform_custom_trainer_script" - _TASK_MODULE_NAME = "task" - _SETUP_PY_VERSION = "0.1" - - _SETUP_PY_TEMPLATE = """from setuptools import find_packages -from setuptools import setup - -setup( - name='{name}', - version='{version}', - packages=find_packages(), - install_requires=({requirements}), - include_package_data=True, - description='My training application.' -)""" - - _SETUP_PY_SOURCE_DISTRIBUTION_CMD = "setup.py sdist --formats=gztar" - - # Module name that can be executed during training. ie. python -m - module_name = f"{_ROOT_MODULE}.{_TASK_MODULE_NAME}" - - def __init__(self, script_path: str, requirements: Optional[Sequence[str]] = None): - """Initializes packager. - - Args: - script_path (str): Required. Local path to script. - requirements (Sequence[str]): - List of python packages dependencies of script. - """ - - self.script_path = script_path - self.requirements = requirements or [] - - def make_package(self, package_directory: str) -> str: - """Converts script into a Python package suitable for python module - execution. - - Args: - package_directory (str): Directory to build package in. - Returns: - source_distribution_path (str): Path to built package. - Raises: - RunTimeError: If package creation fails. - """ - # The root folder to builder the package in - package_path = pathlib.Path(package_directory) - - # Root directory of the package - trainer_root_path = package_path / self._TRAINER_FOLDER - - # The root module of the python package - trainer_path = trainer_root_path / self._ROOT_MODULE - - # __init__.py path in root module - init_path = trainer_path / "__init__.py" - - # The module that will contain the script - script_out_path = trainer_path / f"{self._TASK_MODULE_NAME}.py" - - # The path to setup.py in the package. - setup_py_path = trainer_root_path / "setup.py" - - # The path to the generated source distribution. - source_distribution_path = ( - trainer_root_path - / "dist" - / f"{self._ROOT_MODULE}-{self._SETUP_PY_VERSION}.tar.gz" - ) - - trainer_root_path.mkdir() - trainer_path.mkdir() - - # Make empty __init__.py - with init_path.open("w"): - pass - - # Format the setup.py file. - setup_py_output = self._SETUP_PY_TEMPLATE.format( - name=self._ROOT_MODULE, - requirements=",".join(f'"{r}"' for r in self.requirements), - version=self._SETUP_PY_VERSION, - ) - - # Write setup.py - with setup_py_path.open("w") as fp: - fp.write(setup_py_output) - - # Copy script as module of python package. - shutil.copy(self.script_path, script_out_path) - - # Run setup.py to create the source distribution. - setup_cmd = [ - _get_python_executable() - ] + self._SETUP_PY_SOURCE_DISTRIBUTION_CMD.split() - - p = subprocess.Popen( - args=setup_cmd, - cwd=trainer_root_path, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - output, error = p.communicate() - - # Raise informative error if packaging fails. - if p.returncode != 0: - raise RuntimeError( - "Packaging of training script failed with code %d\n%s \n%s" - % (p.returncode, output.decode(), error.decode()) - ) - - return str(source_distribution_path) - - def package_and_copy(self, copy_method: Callable[[str], str]) -> str: - """Packages the script and executes copy with given copy_method. - - Args: - copy_method Callable[[str], str] - Takes a string path, copies to a desired location, and returns the - output path location. - Returns: - output_path str: Location of copied package. - """ - - with tempfile.TemporaryDirectory() as tmpdirname: - source_distribution_path = self.make_package(tmpdirname) - output_location = copy_method(source_distribution_path) - _LOGGER.info("Training script copied to:\n%s." % output_location) - return output_location - - def package_and_copy_to_gcs( - self, - gcs_staging_dir: str, - project: str = None, - credentials: Optional[auth_credentials.Credentials] = None, - ) -> str: - """Packages script in Python package and copies package to GCS bucket. - - Args - gcs_staging_dir (str): Required. GCS Staging directory. - project (str): Required. Project where GCS Staging bucket is located. - credentials (auth_credentials.Credentials): - Optional credentials used with GCS client. - Returns: - GCS location of Python package. - """ - - copy_method = functools.partial( - _timestamped_copy_to_gcs, - gcs_dir=gcs_staging_dir, - project=project, - credentials=credentials, - ) - return self.package_and_copy(copy_method=copy_method) - - -class _MachineSpec(NamedTuple): - """Specification container for Machine specs used for distributed training. - - Usage: - - spec = _MachineSpec( - replica_count=10, - machine_type='n1-standard-4', - accelerator_count=2, - accelerator_type='NVIDIA_TESLA_K80') - - Note that container and python package specs are not stored with this spec. - """ - - replica_count: int = 0 - machine_type: str = "n1-standard-4" - accelerator_count: int = 0 - accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED" - - def _get_accelerator_type(self) -> Optional[str]: - """Validates accelerator_type and returns the name of the accelerator. - - Returns: - None if no accelerator or valid accelerator name. - - Raise: - ValueError if accelerator type is invalid. - """ - - # Raises ValueError if invalid accelerator_type - utils.validate_accelerator_type(self.accelerator_type) - - accelerator_enum = getattr( - gca_accelerator_type.AcceleratorType, self.accelerator_type - ) - - if ( - accelerator_enum - != gca_accelerator_type.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED - ): - return self.accelerator_type - - @property - def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]: - """Return specification as a Dict.""" - spec = { - "machineSpec": {"machineType": self.machine_type}, - "replicaCount": self.replica_count, - } - accelerator_type = self._get_accelerator_type() - if accelerator_type and self.accelerator_count: - spec["machineSpec"]["acceleratorType"] = accelerator_type - spec["machineSpec"]["acceleratorCount"] = self.accelerator_count - - return spec - - @property - def is_empty(self) -> bool: - """Returns True is replica_count > 0 False otherwise.""" - return self.replica_count <= 0 - - -class _DistributedTrainingSpec(NamedTuple): - """Configuration for distributed training worker pool specs. - - AI Platform Training expects configuration in this order: - [ - chief spec, # can only have one replica - worker spec, - parameter server spec, - evaluator spec - ] - - Usage: - - dist_training_spec = _DistributedTrainingSpec( - chief_spec = _MachineSpec( - replica_count=1, - machine_type='n1-standard-4', - accelerator_count=2, - accelerator_type='NVIDIA_TESLA_K80' - ), - worker_spec = _MachineSpec( - replica_count=10, - machine_type='n1-standard-4', - accelerator_count=2, - accelerator_type='NVIDIA_TESLA_K80' - ) - ) - """ - - chief_spec: _MachineSpec = _MachineSpec() - worker_spec: _MachineSpec = _MachineSpec() - parameter_server_spec: _MachineSpec = _MachineSpec() - evaluator_spec: _MachineSpec = _MachineSpec() - - @property - def pool_specs( - self, - ) -> List[Dict[str, Union[int, str, Dict[str, Union[int, str]]]]]: - """Return each pools spec in correct order for AI Platform as a list of - dicts. - - Also removes specs if they are empty but leaves specs in if there unusual - specifications to not break the ordering in AI Platform Training. - ie. 0 chief replica, 10 worker replica, 3 ps replica - - Returns: - Order list of worker pool specs suitable for AI Platform Training. - """ - if self.chief_spec.replica_count > 1: - raise ValueError("Chief spec replica count cannot be greater than 1.") - - spec_order = [ - self.chief_spec, - self.worker_spec, - self.parameter_server_spec, - self.evaluator_spec, - ] - specs = [s.spec_dict for s in spec_order] - for i in reversed(range(len(spec_order))): - if spec_order[i].is_empty: - specs.pop() - else: - break - return specs - - @classmethod - def chief_worker_pool( - cls, - replica_count: int = 0, - machine_type: str = "n1-standard-4", - accelerator_count: int = 0, - accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", - ) -> "_DistributedTrainingSpec": - """Parameterizes Config to support only chief with worker replicas. - - For replica is assigned to chief and the remainder to workers. All spec have the - same machine type, accelerator count, and accelerator type. - - Args: - replica_count (int): - The number of worker replicas. Assigns 1 chief replica and - replica_count - 1 worker replicas. - machine_type (str): - The type of machine to use for training. - accelerator_type (str): - Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED, - NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, - NVIDIA_TESLA_T4 - accelerator_count (int): - The number of accelerators to attach to a worker replica. - - Returns: - _DistributedTrainingSpec representing one chief and n workers all of same - type. If replica_count <= 0 then an empty spec is returned. - """ - if replica_count <= 0: - return cls() - - chief_spec = _MachineSpec( - replica_count=1, - machine_type=machine_type, - accelerator_count=accelerator_count, - accelerator_type=accelerator_type, - ) - - worker_spec = _MachineSpec( - replica_count=replica_count - 1, - machine_type=machine_type, - accelerator_count=accelerator_count, - accelerator_type=accelerator_type, - ) - - return cls(chief_spec=chief_spec, worker_spec=worker_spec) - - class _CustomTrainingJob(_TrainingJob): """ABC for Custom Training Pipelines..""" @@ -1455,7 +1004,7 @@ def _prepare_and_validate_run( machine_type: str = "n1-standard-4", accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", accelerator_count: int = 0, - ) -> Tuple[_DistributedTrainingSpec, Optional[gca_model.Model]]: + ) -> Tuple[worker_spec_utils._DistributedTrainingSpec, Optional[gca_model.Model]]: """Create worker pool specs and managed model as well validating the run. @@ -1505,7 +1054,7 @@ def _prepare_and_validate_run( model_display_name = model_display_name or self._display_name + "-model" # validates args and will raise - worker_pool_specs = _DistributedTrainingSpec.chief_worker_pool( + worker_pool_specs = worker_spec_utils._DistributedTrainingSpec.chief_worker_pool( replica_count=replica_count, machine_type=machine_type, accelerator_count=accelerator_count, @@ -1523,7 +1072,7 @@ def _prepare_and_validate_run( def _prepare_training_task_inputs_and_output_dir( self, - worker_pool_specs: _DistributedTrainingSpec, + worker_pool_specs: worker_spec_utils._DistributedTrainingSpec, base_output_dir: Optional[str] = None, service_account: Optional[str] = None, network: Optional[str] = None, @@ -1531,7 +1080,7 @@ def _prepare_training_task_inputs_and_output_dir( """Prepares training task inputs and output directory for custom job. Args: - worker_pools_spec (_DistributedTrainingSpec): + worker_pools_spec (worker_spec_utils._DistributedTrainingSpec): Worker pools pecs required to run job. base_output_dir (str): GCS output directory of job. If not provided a @@ -1556,12 +1105,12 @@ def _prepare_training_task_inputs_and_output_dir( _LOGGER.info("Training Output directory:\n%s " % base_output_dir) training_task_inputs = { - "workerPoolSpecs": worker_pool_specs, - "baseOutputDirectory": {"output_uri_prefix": base_output_dir}, + "worker_pool_specs": worker_pool_specs, + "base_output_directory": {"output_uri_prefix": base_output_dir}, } if service_account: - training_task_inputs["serviceAccount"] = service_account + training_task_inputs["service_account"] = service_account if network: training_task_inputs["network"] = network @@ -1981,7 +1530,7 @@ def run( ) # make and copy package - python_packager = _TrainingScriptPythonPackager( + python_packager = source_utils._TrainingScriptPythonPackager( script_path=self._script_path, requirements=self._requirements ) @@ -2007,7 +1556,7 @@ def run( @base.optional_sync(construct_object_on_arg="managed_model") def _run( self, - python_packager: _TrainingScriptPythonPackager, + python_packager: source_utils._TrainingScriptPythonPackager, dataset: Optional[ Union[ datasets.ImageDataset, @@ -2017,7 +1566,7 @@ def _run( ] ], annotation_schema_uri: Optional[str], - worker_pool_specs: _DistributedTrainingSpec, + worker_pool_specs: worker_spec_utils._DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, environment_variables: Optional[Dict[str, str]] = None, @@ -2034,7 +1583,7 @@ def _run( """Packages local script and launches training_job. Args: - python_packager (_TrainingScriptPythonPackager): + python_packager (source_utils._TrainingScriptPythonPackager): Required. Python Packager pointing to training script locally. dataset ( Union[ @@ -2048,7 +1597,7 @@ def _run( annotation_schema_uri (str): Google Cloud Storage URI points to a YAML file describing annotation schema. - worker_pools_spec (_DistributedTrainingSpec): + worker_pools_spec (worker_spec_utils._DistributedTrainingSpec): Worker pools pecs required to run job. managed_model (gca_model.Model): Model proto if this script produces a Managed Model. @@ -2132,17 +1681,17 @@ def _run( ) for spec in worker_pool_specs: - spec["pythonPackageSpec"] = { - "executorImageUri": self._container_uri, - "pythonModule": python_packager.module_name, - "packageUris": [package_gcs_uri], + spec["python_package_spec"] = { + "executor_image_uri": self._container_uri, + "python_module": python_packager.module_name, + "package_uris": [package_gcs_uri], } if args: - spec["pythonPackageSpec"]["args"] = args + spec["python_package_spec"]["args"] = args if environment_variables: - spec["pythonPackageSpec"]["env"] = [ + spec["python_package_spec"]["env"] = [ {"name": key, "value": value} for key, value in environment_variables.items() ] @@ -2596,7 +2145,7 @@ def _run( ] ], annotation_schema_uri: Optional[str], - worker_pool_specs: _DistributedTrainingSpec, + worker_pool_specs: worker_spec_utils._DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, environment_variables: Optional[Dict[str, str]] = None, @@ -2624,7 +2173,7 @@ def _run( annotation_schema_uri (str): Google Cloud Storage URI points to a YAML file describing annotation schema. - worker_pools_spec (_DistributedTrainingSpec): + worker_pools_spec (worker_spec_utils._DistributedTrainingSpec): Worker pools pecs required to run job. managed_model (gca_model.Model): Model proto if this script produces a Managed Model. @@ -4393,7 +3942,7 @@ def _run( ] ], annotation_schema_uri: Optional[str], - worker_pool_specs: _DistributedTrainingSpec, + worker_pool_specs: worker_spec_utils._DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, environment_variables: Optional[Dict[str, str]] = None, @@ -4422,7 +3971,7 @@ def _run( annotation_schema_uri (str): Google Cloud Storage URI points to a YAML file describing annotation schema. - worker_pools_spec (_DistributedTrainingSpec): + worker_pools_spec (worker_spec_utils._DistributedTrainingSpec): Worker pools pecs required to run job. managed_model (gca_model.Model): Model proto if this script produces a Managed Model. @@ -4485,17 +4034,17 @@ def _run( produce an AI Platform Model. """ for spec in worker_pool_specs: - spec["pythonPackageSpec"] = { - "executorImageUri": self._container_uri, - "pythonModule": self._python_module, - "packageUris": [self._package_gcs_uri], + spec["python_package_spec"] = { + "executor_image_uri": self._container_uri, + "python_module": self._python_module, + "package_uris": [self._package_gcs_uri], } if args: - spec["pythonPackageSpec"]["args"] = args + spec["python_package_spec"]["args"] = args if environment_variables: - spec["pythonPackageSpec"]["env"] = [ + spec["python_package_spec"]["env"] = [ {"name": key, "value": value} for key, value in environment_variables.items() ] diff --git a/google/cloud/aiplatform/training_utils.py b/google/cloud/aiplatform/training_utils.py deleted file mode 100644 index fea60c5005..0000000000 --- a/google/cloud/aiplatform/training_utils.py +++ /dev/null @@ -1,105 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import os - -from typing import Dict, Optional - - -class EnvironmentVariables: - """Passes on OS' environment variables.""" - - @property - def training_data_uri(self) -> Optional[str]: - """ - Returns: - Cloud Storage URI of a directory intended for training data. None if - environment variable not set. - """ - return os.environ.get("AIP_TRAINING_DATA_URI") - - @property - def validation_data_uri(self) -> Optional[str]: - """ - Returns: - Cloud Storage URI of a directory intended for validation data. None - if environment variable not set. - """ - return os.environ.get("AIP_VALIDATION_DATA_URI") - - @property - def test_data_uri(self) -> Optional[str]: - """ - Returns: - Cloud Storage URI of a directory intended for test data. None if - environment variable not set. - """ - return os.environ.get("AIP_TEST_DATA_URI") - - @property - def model_dir(self) -> Optional[str]: - """ - Returns: - Cloud Storage URI of a directory intended for saving model artefacts. - None if environment variable not set. - """ - return os.environ.get("AIP_MODEL_DIR") - - @property - def checkpoint_dir(self) -> Optional[str]: - """ - Returns: - Cloud Storage URI of a directory intended for saving checkpoints. - None if environment variable not set. - """ - return os.environ.get("AIP_CHECKPOINT_DIR") - - @property - def tensorboard_log_dir(self) -> Optional[str]: - """ - Returns: - Cloud Storage URI of a directory intended for saving TensorBoard logs. - None if environment variable not set. - """ - return os.environ.get("AIP_TENSORBOARD_LOG_DIR") - - @property - def cluster_spec(self) -> Optional[Dict]: - """ - Returns: - json string as described in https://cloud.google.com/ai-platform-unified/docs/training/distributed-training#cluster-variables - None if environment variable not set. - """ - cluster_spec_env = os.environ.get("CLUSTER_SPEC") - if cluster_spec_env is not None: - return json.loads(cluster_spec_env) - else: - return None - - @property - def tf_config(self) -> Optional[Dict]: - """ - Returns: - json string as described in https://cloud.google.com/ai-platform-unified/docs/training/distributed-training#tf-config - None if environment variable not set. - """ - tf_config_env = os.environ.get("TF_CONFIG") - if tf_config_env is not None: - return json.loads(tf_config_env) - else: - return None diff --git a/google/cloud/aiplatform/utils.py b/google/cloud/aiplatform/utils/__init__.py similarity index 87% rename from google/cloud/aiplatform/utils.py rename to google/cloud/aiplatform/utils/__init__.py index ff86fc1cb8..22a4d985bb 100644 --- a/google/cloud/aiplatform/utils.py +++ b/google/cloud/aiplatform/utils/__init__.py @@ -17,6 +17,8 @@ import abc +import datetime +import pathlib from collections import namedtuple import logging import re @@ -25,6 +27,8 @@ from google.api_core import client_options from google.api_core import gapic_v1 from google.auth import credentials as auth_credentials +from google.cloud import storage + from google.cloud.aiplatform import compat from google.cloud.aiplatform import constants from google.cloud.aiplatform import initializer @@ -499,3 +503,66 @@ def __init__(self, warning_level: int): def filter(self, record): return record.levelname == self._warning_level + + +def _timestamped_gcs_dir(root_gcs_path: str, dir_name_prefix: str) -> str: + """Composes a timestamped GCS directory. + + Args: + root_gcs_path: GCS path to put the timestamped directory. + dir_name_prefix: Prefix to add the timestamped directory. + Returns: + Timestamped gcs directory path in root_gcs_path. + """ + timestamp = datetime.datetime.now().isoformat(sep="-", timespec="milliseconds") + dir_name = "-".join([dir_name_prefix, timestamp]) + if root_gcs_path.endswith("/"): + root_gcs_path = root_gcs_path[:-1] + gcs_path = "/".join([root_gcs_path, dir_name]) + if not gcs_path.startswith("gs://"): + return "gs://" + gcs_path + return gcs_path + + +def _timestamped_copy_to_gcs( + local_file_path: str, + gcs_dir: str, + project: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, +) -> str: + """Copies a local file to a GCS path. + + The file copied to GCS is the name of the local file prepended with an + "aiplatform-{timestamp}-" string. + + Args: + local_file_path (str): Required. Local file to copy to GCS. + gcs_dir (str): + Required. The GCS directory to copy to. + project (str): + Project that contains the staging bucket. Default will be used if not + provided. Model Builder callers should pass this in. + credentials (auth_credentials.Credentials): + Custom credentials to use with bucket. Model Builder callers should pass + this in. + Returns: + gcs_path (str): The path of the copied file in gcs. + """ + + gcs_bucket, gcs_blob_prefix = extract_bucket_and_prefix_from_gcs_path(gcs_dir) + + local_file_name = pathlib.Path(local_file_path).name + timestamp = datetime.datetime.now().isoformat(sep="-", timespec="milliseconds") + blob_path = "-".join(["aiplatform", timestamp, local_file_name]) + + if gcs_blob_prefix: + blob_path = "/".join([gcs_blob_prefix, blob_path]) + + # TODO(b/171202993) add user agent + client = storage.Client(project=project, credentials=credentials) + bucket = client.bucket(gcs_bucket) + blob = bucket.blob(blob_path) + blob.upload_from_filename(local_file_path) + + gcs_path = "".join(["gs://", "/".join([blob.bucket.name, blob.name])]) + return gcs_path diff --git a/google/cloud/aiplatform/utils/source_utils.py b/google/cloud/aiplatform/utils/source_utils.py new file mode 100644 index 0000000000..b7fcef806f --- /dev/null +++ b/google/cloud/aiplatform/utils/source_utils.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import functools +import pathlib +import shutil +import subprocess +import sys +import tempfile +from typing import Optional, Sequence, Callable + +from google.auth import credentials as auth_credentials +from google.cloud.aiplatform import base +from google.cloud.aiplatform import utils + +_LOGGER = base.Logger(__name__) + + +def _get_python_executable() -> str: + """Returns Python executable. + + Returns: + Python executable to use for setuptools packaging. + Raises: + EnvironmentError: If Python executable is not found. + """ + + python_executable = sys.executable + + if not python_executable: + raise EnvironmentError("Cannot find Python executable for packaging.") + return python_executable + + +class _TrainingScriptPythonPackager: + """Converts a Python script into Python package suitable for aiplatform + training. + + Copies the script to specified location. + + Class Attributes: + _TRAINER_FOLDER: Constant folder name to build package. + _ROOT_MODULE: Constant root name of module. + _TEST_MODULE_NAME: Constant name of module that will store script. + _SETUP_PY_VERSION: Constant version of this created python package. + _SETUP_PY_TEMPLATE: Constant template used to generate setup.py file. + _SETUP_PY_SOURCE_DISTRIBUTION_CMD: + Constant command to generate the source distribution package. + + Attributes: + script_path: local path of script to package + requirements: list of Python dependencies to add to package + + Usage: + + packager = TrainingScriptPythonPackager('my_script.py', ['pandas', 'pytorch']) + gcs_path = packager.package_and_copy_to_gcs( + gcs_staging_dir='my-bucket', + project='my-prject') + module_name = packager.module_name + + The package after installed can be executed as: + python -m aiplatform_custom_trainer_script.task + """ + + _TRAINER_FOLDER = "trainer" + _ROOT_MODULE = "aiplatform_custom_trainer_script" + _TASK_MODULE_NAME = "task" + _SETUP_PY_VERSION = "0.1" + + _SETUP_PY_TEMPLATE = """from setuptools import find_packages +from setuptools import setup + +setup( + name='{name}', + version='{version}', + packages=find_packages(), + install_requires=({requirements}), + include_package_data=True, + description='My training application.' +)""" + + _SETUP_PY_SOURCE_DISTRIBUTION_CMD = "setup.py sdist --formats=gztar" + + # Module name that can be executed during training. ie. python -m + module_name = f"{_ROOT_MODULE}.{_TASK_MODULE_NAME}" + + def __init__(self, script_path: str, requirements: Optional[Sequence[str]] = None): + """Initializes packager. + + Args: + script_path (str): Required. Local path to script. + requirements (Sequence[str]): + List of python packages dependencies of script. + """ + + self.script_path = script_path + self.requirements = requirements or [] + + def make_package(self, package_directory: str) -> str: + """Converts script into a Python package suitable for python module + execution. + + Args: + package_directory (str): Directory to build package in. + Returns: + source_distribution_path (str): Path to built package. + Raises: + RunTimeError: If package creation fails. + """ + # The root folder to builder the package in + package_path = pathlib.Path(package_directory) + + # Root directory of the package + trainer_root_path = package_path / self._TRAINER_FOLDER + + # The root module of the python package + trainer_path = trainer_root_path / self._ROOT_MODULE + + # __init__.py path in root module + init_path = trainer_path / "__init__.py" + + # The module that will contain the script + script_out_path = trainer_path / f"{self._TASK_MODULE_NAME}.py" + + # The path to setup.py in the package. + setup_py_path = trainer_root_path / "setup.py" + + # The path to the generated source distribution. + source_distribution_path = ( + trainer_root_path + / "dist" + / f"{self._ROOT_MODULE}-{self._SETUP_PY_VERSION}.tar.gz" + ) + + trainer_root_path.mkdir() + trainer_path.mkdir() + + # Make empty __init__.py + with init_path.open("w"): + pass + + # Format the setup.py file. + setup_py_output = self._SETUP_PY_TEMPLATE.format( + name=self._ROOT_MODULE, + requirements=",".join(f'"{r}"' for r in self.requirements), + version=self._SETUP_PY_VERSION, + ) + + # Write setup.py + with setup_py_path.open("w") as fp: + fp.write(setup_py_output) + + # Copy script as module of python package. + shutil.copy(self.script_path, script_out_path) + + # Run setup.py to create the source distribution. + setup_cmd = [ + _get_python_executable() + ] + self._SETUP_PY_SOURCE_DISTRIBUTION_CMD.split() + + p = subprocess.Popen( + args=setup_cmd, + cwd=trainer_root_path, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + output, error = p.communicate() + + # Raise informative error if packaging fails. + if p.returncode != 0: + raise RuntimeError( + "Packaging of training script failed with code %d\n%s \n%s" + % (p.returncode, output.decode(), error.decode()) + ) + + return str(source_distribution_path) + + def package_and_copy(self, copy_method: Callable[[str], str]) -> str: + """Packages the script and executes copy with given copy_method. + + Args: + copy_method Callable[[str], str] + Takes a string path, copies to a desired location, and returns the + output path location. + Returns: + output_path str: Location of copied package. + """ + + with tempfile.TemporaryDirectory() as tmpdirname: + source_distribution_path = self.make_package(tmpdirname) + output_location = copy_method(source_distribution_path) + _LOGGER.info("Training script copied to:\n%s." % output_location) + return output_location + + def package_and_copy_to_gcs( + self, + gcs_staging_dir: str, + project: str = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> str: + """Packages script in Python package and copies package to GCS bucket. + + Args + gcs_staging_dir (str): Required. GCS Staging directory. + project (str): Required. Project where GCS Staging bucket is located. + credentials (auth_credentials.Credentials): + Optional credentials used with GCS client. + Returns: + GCS location of Python package. + """ + + copy_method = functools.partial( + utils._timestamped_copy_to_gcs, + gcs_dir=gcs_staging_dir, + project=project, + credentials=credentials, + ) + return self.package_and_copy(copy_method=copy_method) diff --git a/google/cloud/aiplatform/utils/worker_spec_utils.py b/google/cloud/aiplatform/utils/worker_spec_utils.py new file mode 100644 index 0000000000..9a681d3b98 --- /dev/null +++ b/google/cloud/aiplatform/utils/worker_spec_utils.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import NamedTuple, Optional, Dict, Union, List + +from google.cloud.aiplatform import utils +from google.cloud.aiplatform.compat.types import ( + accelerator_type as gca_accelerator_type_compat, +) + + +class _MachineSpec(NamedTuple): + """Specification container for Machine specs used for distributed training. + + Usage: + + spec = _MachineSpec( + replica_count=10, + machine_type='n1-standard-4', + accelerator_count=2, + accelerator_type='NVIDIA_TESLA_K80') + + Note that container and python package specs are not stored with this spec. + """ + + replica_count: int = 0 + machine_type: str = "n1-standard-4" + accelerator_count: int = 0 + accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED" + + def _get_accelerator_type(self) -> Optional[str]: + """Validates accelerator_type and returns the name of the accelerator. + + Returns: + None if no accelerator or valid accelerator name. + + Raise: + ValueError if accelerator type is invalid. + """ + + # Raises ValueError if invalid accelerator_type + utils.validate_accelerator_type(self.accelerator_type) + + accelerator_enum = getattr( + gca_accelerator_type_compat.AcceleratorType, self.accelerator_type + ) + + if ( + accelerator_enum + != gca_accelerator_type_compat.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED + ): + return self.accelerator_type + + @property + def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]: + """Return specification as a Dict.""" + spec = { + "machine_spec": {"machine_type": self.machine_type}, + "replica_count": self.replica_count, + } + accelerator_type = self._get_accelerator_type() + if accelerator_type and self.accelerator_count: + spec["machine_spec"]["accelerator_type"] = accelerator_type + spec["machine_spec"]["accelerator_count"] = self.accelerator_count + + return spec + + @property + def is_empty(self) -> bool: + """Returns True is replica_count > 0 False otherwise.""" + return self.replica_count <= 0 + + +class _DistributedTrainingSpec(NamedTuple): + """Configuration for distributed training worker pool specs. + + AI Platform Training expects configuration in this order: + [ + chief spec, # can only have one replica + worker spec, + parameter server spec, + evaluator spec + ] + + Usage: + + dist_training_spec = _DistributedTrainingSpec( + chief_spec = _MachineSpec( + replica_count=1, + machine_type='n1-standard-4', + accelerator_count=2, + accelerator_type='NVIDIA_TESLA_K80' + ), + worker_spec = _MachineSpec( + replica_count=10, + machine_type='n1-standard-4', + accelerator_count=2, + accelerator_type='NVIDIA_TESLA_K80' + ) + ) + """ + + chief_spec: _MachineSpec = _MachineSpec() + worker_spec: _MachineSpec = _MachineSpec() + parameter_server_spec: _MachineSpec = _MachineSpec() + evaluator_spec: _MachineSpec = _MachineSpec() + + @property + def pool_specs( + self, + ) -> List[Dict[str, Union[int, str, Dict[str, Union[int, str]]]]]: + """Return each pools spec in correct order for AI Platform as a list of + dicts. + + Also removes specs if they are empty but leaves specs in if there unusual + specifications to not break the ordering in AI Platform Training. + ie. 0 chief replica, 10 worker replica, 3 ps replica + + Returns: + Order list of worker pool specs suitable for AI Platform Training. + """ + if self.chief_spec.replica_count > 1: + raise ValueError("Chief spec replica count cannot be greater than 1.") + + spec_order = [ + self.chief_spec, + self.worker_spec, + self.parameter_server_spec, + self.evaluator_spec, + ] + specs = [s.spec_dict for s in spec_order] + for i in reversed(range(len(spec_order))): + if spec_order[i].is_empty: + specs.pop() + else: + break + return specs + + @classmethod + def chief_worker_pool( + cls, + replica_count: int = 0, + machine_type: str = "n1-standard-4", + accelerator_count: int = 0, + accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", + ) -> "_DistributedTrainingSpec": + """Parameterizes Config to support only chief with worker replicas. + + For replica is assigned to chief and the remainder to workers. All spec have the + same machine type, accelerator count, and accelerator type. + + Args: + replica_count (int): + The number of worker replicas. Assigns 1 chief replica and + replica_count - 1 worker replicas. + machine_type (str): + The type of machine to use for training. + accelerator_type (str): + Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED, + NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, + NVIDIA_TESLA_T4 + accelerator_count (int): + The number of accelerators to attach to a worker replica. + + Returns: + _DistributedTrainingSpec representing one chief and n workers all of same + type. If replica_count <= 0 then an empty spec is returned. + """ + if replica_count <= 0: + return cls() + + chief_spec = _MachineSpec( + replica_count=1, + machine_type=machine_type, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type, + ) + + worker_spec = _MachineSpec( + replica_count=replica_count - 1, + machine_type=machine_type, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type, + ) + + return cls(chief_spec=chief_spec, worker_spec=worker_spec) diff --git a/tests/unit/aiplatform/test_custom_job.py b/tests/unit/aiplatform/test_custom_job.py new file mode 100644 index 0000000000..37c2ac3df0 --- /dev/null +++ b/tests/unit/aiplatform/test_custom_job.py @@ -0,0 +1,323 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +import copy +from importlib import reload +from unittest import mock +from unittest.mock import patch + +from google.protobuf import duration_pb2 # type: ignore +from google.rpc import status_pb2 + +import test_training_jobs +from test_training_jobs import mock_python_package_to_gcs # noqa: F401 + +from google.cloud import aiplatform +from google.cloud.aiplatform.compat.types import custom_job as gca_custom_job_compat +from google.cloud.aiplatform.compat.types import io as gca_io_compat +from google.cloud.aiplatform.compat.types import job_state as gca_job_state_compat +from google.cloud.aiplatform.compat.types import ( + encryption_spec as gca_encryption_spec_compat, +) +from google.cloud.aiplatform_v1.services.job_service import client as job_service_client + +_TEST_PROJECT = "test-project" +_TEST_LOCATION = "us-central1" +_TEST_ID = "1028944691210842416" +_TEST_DISPLAY_NAME = "my_job_1234" + +_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" + +_TEST_CUSTOM_JOB_NAME = f"{_TEST_PARENT}/customJobs/{_TEST_ID}" + +_TEST_TRAINING_CONTAINER_IMAGE = "gcr.io/test-training/container:image" + +_TEST_WORKER_POOL_SPEC = [ + { + "machine_spec": { + "machine_type": "n1-standard-4", + "accelerator_type": "NVIDIA_TESLA_K80", + "accelerator_count": 1, + }, + "replica_count": 1, + "container_spec": { + "image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "command": [], + "args": [], + }, + } +] + +_TEST_STAGING_BUCKET = "gs://test-staging-bucket" + +# CMEK encryption +_TEST_DEFAULT_ENCRYPTION_KEY_NAME = "key_default" +_TEST_DEFAULT_ENCRYPTION_SPEC = gca_encryption_spec_compat.EncryptionSpec( + kms_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME +) + +_TEST_SERVICE_ACCOUNT = "vinnys@my-project.iam.gserviceaccount.com" + + +_TEST_NETWORK = f"projects/{_TEST_PROJECT}/global/networks/{_TEST_ID}" + +_TEST_TIMEOUT = 8000 +_TEST_RESTART_JOB_ON_WORKER_RESTART = True + +_TEST_BASE_CUSTOM_JOB_PROTO = gca_custom_job_compat.CustomJob( + display_name=_TEST_DISPLAY_NAME, + job_spec=gca_custom_job_compat.CustomJobSpec( + worker_pool_specs=_TEST_WORKER_POOL_SPEC, + base_output_directory=gca_io_compat.GcsDestination( + output_uri_prefix=_TEST_STAGING_BUCKET + ), + scheduling=gca_custom_job_compat.Scheduling( + timeout=duration_pb2.Duration(seconds=_TEST_TIMEOUT), + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + ), + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ), + encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC, +) + + +def _get_custom_job_proto(state=None, name=None, error=None): + custom_job_proto = copy.deepcopy(_TEST_BASE_CUSTOM_JOB_PROTO) + custom_job_proto.name = name + custom_job_proto.state = state + custom_job_proto.error = error + return custom_job_proto + + +@pytest.fixture +def get_custom_job_mock(): + with patch.object( + job_service_client.JobServiceClient, "get_custom_job" + ) as get_custom_job_mock: + get_custom_job_mock.side_effect = [ + _get_custom_job_proto( + name=_TEST_CUSTOM_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_PENDING, + ), + _get_custom_job_proto( + name=_TEST_CUSTOM_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_RUNNING, + ), + _get_custom_job_proto( + name=_TEST_CUSTOM_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED, + ), + ] + yield get_custom_job_mock + + +@pytest.fixture +def get_custom_job_mock_with_fail(): + with patch.object( + job_service_client.JobServiceClient, "get_custom_job" + ) as get_custom_job_mock: + get_custom_job_mock.side_effect = [ + _get_custom_job_proto( + name=_TEST_CUSTOM_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_PENDING, + ), + _get_custom_job_proto( + name=_TEST_CUSTOM_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_RUNNING, + ), + _get_custom_job_proto( + name=_TEST_CUSTOM_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_FAILED, + error=status_pb2.Status(message="Test Error"), + ), + ] + yield get_custom_job_mock + + +@pytest.fixture +def create_custom_job_mock(): + with mock.patch.object( + job_service_client.JobServiceClient, "create_custom_job" + ) as create_custom_job_mock: + create_custom_job_mock.return_value = _get_custom_job_proto( + name=_TEST_CUSTOM_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_PENDING, + ) + yield create_custom_job_mock + + +class TestCustomJob: + def setup_method(self): + reload(aiplatform.initializer) + reload(aiplatform) + + def teardown_method(self): + aiplatform.initializer.global_pool.shutdown(wait=True) + + @pytest.mark.parametrize("sync", [True, False]) + def test_create_custom_job(self, create_custom_job_mock, get_custom_job_mock, sync): + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + job = aiplatform.CustomJob( + display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + ) + + job.run( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + sync=sync, + ) + + job.wait() + + expected_custom_job = _get_custom_job_proto() + + create_custom_job_mock.assert_called_once_with( + parent=_TEST_PARENT, custom_job=expected_custom_job + ) + + assert job.job_spec == expected_custom_job.job_spec + assert ( + job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED + ) + + @pytest.mark.parametrize("sync", [True, False]) + def test_run_custom_job_with_fail_raises( + self, create_custom_job_mock, get_custom_job_mock_with_fail, sync + ): + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + job = aiplatform.CustomJob( + display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + ) + + with pytest.raises(RuntimeError): + job.run( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + sync=sync, + ) + + job.wait() + + expected_custom_job = _get_custom_job_proto() + + create_custom_job_mock.assert_called_once_with( + parent=_TEST_PARENT, custom_job=expected_custom_job + ) + + assert job.job_spec == expected_custom_job.job_spec + assert job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_FAILED + + def test_custom_job_get_state_raises_without_run(self): + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + job = aiplatform.CustomJob( + display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + ) + + with pytest.raises(RuntimeError): + print(job.state) + + def test_no_staging_bucket_raises(self): + + aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) + + with pytest.raises(RuntimeError): + job = aiplatform.CustomJob( # noqa: F841 + display_name=_TEST_DISPLAY_NAME, + worker_pool_specs=_TEST_WORKER_POOL_SPEC, + ) + + def test_get_custom_job(self, get_custom_job_mock): + + job = aiplatform.CustomJob.get(_TEST_CUSTOM_JOB_NAME) + + get_custom_job_mock.assert_called_once_with(name=_TEST_CUSTOM_JOB_NAME) + assert ( + job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_PENDING + ) + assert job.job_spec == _TEST_BASE_CUSTOM_JOB_PROTO.job_spec + + @pytest.mark.usefixtures("mock_python_package_to_gcs") + @pytest.mark.parametrize("sync", [True, False]) + def test_create_from_local_script( + self, get_custom_job_mock, create_custom_job_mock, sync + ): + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + # configuration on this is tested in test_training_jobs.py + job = aiplatform.CustomJob.from_local_script( + display_name=_TEST_DISPLAY_NAME, + script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run(sync=sync) + + job.wait() + + assert ( + job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED + ) + + @pytest.mark.usefixtures("mock_python_package_to_gcs") + @pytest.mark.parametrize("sync", [True, False]) + def test_create_from_local_script_raises_with_no_staging_bucket( + self, get_custom_job_mock, create_custom_job_mock, sync + ): + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + with pytest.raises(RuntimeError): + + # configuration on this is tested in test_training_jobs.py + job = aiplatform.CustomJob.from_local_script( # noqa: F841 + display_name=_TEST_DISPLAY_NAME, + script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) diff --git a/tests/unit/aiplatform/test_end_to_end.py b/tests/unit/aiplatform/test_end_to_end.py index 69c5517a69..4aede65f08 100644 --- a/tests/unit/aiplatform/test_end_to_end.py +++ b/tests/unit/aiplatform/test_end_to_end.py @@ -19,11 +19,11 @@ from importlib import reload +from google.cloud.aiplatform.utils import source_utils from google.cloud import aiplatform from google.cloud.aiplatform import initializer from google.cloud.aiplatform import models from google.cloud.aiplatform import schema -from google.cloud.aiplatform import training_jobs from google.cloud.aiplatform_v1.types import ( dataset as gca_dataset, @@ -204,16 +204,16 @@ def test_dataset_create_to_model_predict( true_args = test_training_jobs._TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": test_training_jobs._TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": test_training_jobs._TEST_MACHINE_TYPE, - "acceleratorType": test_training_jobs._TEST_ACCELERATOR_TYPE, - "acceleratorCount": test_training_jobs._TEST_ACCELERATOR_COUNT, + "replica_count": test_training_jobs._TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": test_training_jobs._TEST_MACHINE_TYPE, + "accelerator_type": test_training_jobs._TEST_ACCELERATOR_TYPE, + "accelerator_count": test_training_jobs._TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": test_training_jobs._TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, - "packageUris": [test_training_jobs._TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": test_training_jobs._TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [test_training_jobs._TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, } @@ -248,8 +248,8 @@ def test_dataset_create_to_model_predict( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": { + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { "output_uri_prefix": test_training_jobs._TEST_BASE_OUTPUT_DIR }, }, @@ -385,16 +385,16 @@ def test_dataset_create_to_model_predict_with_pipeline_fail( true_args = test_training_jobs._TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": test_training_jobs._TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": test_training_jobs._TEST_MACHINE_TYPE, - "acceleratorType": test_training_jobs._TEST_ACCELERATOR_TYPE, - "acceleratorCount": test_training_jobs._TEST_ACCELERATOR_COUNT, + "replica_count": test_training_jobs._TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": test_training_jobs._TEST_MACHINE_TYPE, + "accelerator_type": test_training_jobs._TEST_ACCELERATOR_TYPE, + "accelerator_count": test_training_jobs._TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": test_training_jobs._TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, - "packageUris": [test_training_jobs._TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": test_training_jobs._TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [test_training_jobs._TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, } @@ -430,8 +430,8 @@ def test_dataset_create_to_model_predict_with_pipeline_fail( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": { + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { "output_uri_prefix": test_training_jobs._TEST_BASE_OUTPUT_DIR }, }, diff --git a/tests/unit/aiplatform/test_hyperparameter_tuning_job.py b/tests/unit/aiplatform/test_hyperparameter_tuning_job.py new file mode 100644 index 0000000000..fcd15f93ac --- /dev/null +++ b/tests/unit/aiplatform/test_hyperparameter_tuning_job.py @@ -0,0 +1,368 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +import copy +from importlib import reload +from unittest import mock +from unittest.mock import patch + +from google.rpc import status_pb2 + +from google.cloud import aiplatform +from google.cloud.aiplatform import hyperparameter_tuning as hpt +from google.cloud.aiplatform.compat.types import job_state as gca_job_state_compat +from google.cloud.aiplatform.compat.types import ( + encryption_spec as gca_encryption_spec_compat, +) +from google.cloud.aiplatform.compat.types import ( + hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat, +) +from google.cloud.aiplatform.compat.types import study as gca_study_compat +from google.cloud.aiplatform_v1.services.job_service import client as job_service_client + +import test_custom_job + +_TEST_PROJECT = "test-project" +_TEST_LOCATION = "us-central1" +_TEST_ID = "1028944691210842416" +_TEST_DISPLAY_NAME = "my_hp_job_1234" + +_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" + +_TEST_STAGING_BUCKET = test_custom_job._TEST_STAGING_BUCKET + +_TEST_HYPERPARAMETERTUNING_JOB_NAME = ( + f"{_TEST_PARENT}/hyperparameterTuningJobs/{_TEST_ID}" +) + +# CMEK encryption +_TEST_DEFAULT_ENCRYPTION_KEY_NAME = "key_default" +_TEST_DEFAULT_ENCRYPTION_SPEC = gca_encryption_spec_compat.EncryptionSpec( + kms_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME +) + +_TEST_SERVICE_ACCOUNT = "vinnys@my-project.iam.gserviceaccount.com" + + +_TEST_NETWORK = f"projects/{_TEST_PROJECT}/global/networks/{_TEST_ID}" + +_TEST_TIMEOUT = 8000 +_TEST_RESTART_JOB_ON_WORKER_RESTART = True + +_TEST_METRIC_SPEC_KEY = "test-metric" +_TEST_METRIC_SPEC_VALUE = "maximize" + +_TEST_PARALLEL_TRIAL_COUNT = 8 +_TEST_MAX_TRIAL_COUNT = 64 +_TEST_MAX_FAILED_TRIAL_COUNT = 4 +_TEST_SEARCH_ALGORITHM = "random" +_TEST_MEASUREMENT_SELECTION = "best" + + +_TEST_BASE_HYPERPARAMETER_TUNING_JOB_PROTO = gca_hyperparameter_tuning_job_compat.HyperparameterTuningJob( + display_name=_TEST_DISPLAY_NAME, + study_spec=gca_study_compat.StudySpec( + metrics=[ + gca_study_compat.StudySpec.MetricSpec( + metric_id=_TEST_METRIC_SPEC_KEY, goal=_TEST_METRIC_SPEC_VALUE.upper() + ) + ], + parameters=[ + gca_study_compat.StudySpec.ParameterSpec( + parameter_id="lr", + scale_type=gca_study_compat.StudySpec.ParameterSpec.ScaleType.UNIT_LOG_SCALE, + double_value_spec=gca_study_compat.StudySpec.ParameterSpec.DoubleValueSpec( + min_value=0.001, max_value=0.1 + ), + ), + gca_study_compat.StudySpec.ParameterSpec( + parameter_id="units", + scale_type=gca_study_compat.StudySpec.ParameterSpec.ScaleType.UNIT_LINEAR_SCALE, + integer_value_spec=gca_study_compat.StudySpec.ParameterSpec.IntegerValueSpec( + min_value=4, max_value=1028 + ), + ), + gca_study_compat.StudySpec.ParameterSpec( + parameter_id="activation", + categorical_value_spec=gca_study_compat.StudySpec.ParameterSpec.CategoricalValueSpec( + values=["relu", "sigmoid", "elu", "selu", "tanh"] + ), + ), + gca_study_compat.StudySpec.ParameterSpec( + parameter_id="batch_size", + scale_type=gca_study_compat.StudySpec.ParameterSpec.ScaleType.UNIT_LINEAR_SCALE, + discrete_value_spec=gca_study_compat.StudySpec.ParameterSpec.DiscreteValueSpec( + values=[16, 32] + ), + ), + ], + algorithm=gca_study_compat.StudySpec.Algorithm.RANDOM_SEARCH, + measurement_selection_type=gca_study_compat.StudySpec.MeasurementSelectionType.BEST_MEASUREMENT, + ), + parallel_trial_count=_TEST_PARALLEL_TRIAL_COUNT, + max_trial_count=_TEST_MAX_TRIAL_COUNT, + max_failed_trial_count=_TEST_MAX_FAILED_TRIAL_COUNT, + trial_job_spec=test_custom_job._TEST_BASE_CUSTOM_JOB_PROTO.job_spec, + encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC, +) + + +def _get_hyperparameter_tuning_job_proto(state=None, name=None, error=None): + custom_job_proto = copy.deepcopy(_TEST_BASE_HYPERPARAMETER_TUNING_JOB_PROTO) + custom_job_proto.name = name + custom_job_proto.state = state + custom_job_proto.error = error + return custom_job_proto + + +@pytest.fixture +def get_hyperparameter_tuning_job_mock(): + with patch.object( + job_service_client.JobServiceClient, "get_hyperparameter_tuning_job" + ) as get_hyperparameter_tuning_job_mock: + get_hyperparameter_tuning_job_mock.side_effect = [ + _get_hyperparameter_tuning_job_proto( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_PENDING, + ), + _get_hyperparameter_tuning_job_proto( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_RUNNING, + ), + _get_hyperparameter_tuning_job_proto( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED, + ), + ] + yield get_hyperparameter_tuning_job_mock + + +@pytest.fixture +def get_hyperparameter_tuning_job_mock_with_fail(): + with patch.object( + job_service_client.JobServiceClient, "get_hyperparameter_tuning_job" + ) as get_hyperparameter_tuning_job_mock: + get_hyperparameter_tuning_job_mock.side_effect = [ + _get_hyperparameter_tuning_job_proto( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_PENDING, + ), + _get_hyperparameter_tuning_job_proto( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_RUNNING, + ), + _get_hyperparameter_tuning_job_proto( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_FAILED, + error=status_pb2.Status(message="Test Error"), + ), + ] + yield get_hyperparameter_tuning_job_mock + + +@pytest.fixture +def create_hyperparameter_tuning_job_mock(): + with mock.patch.object( + job_service_client.JobServiceClient, "create_hyperparameter_tuning_job" + ) as create_hyperparameter_tuning_job_mock: + create_hyperparameter_tuning_job_mock.return_value = _get_hyperparameter_tuning_job_proto( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME, + state=gca_job_state_compat.JobState.JOB_STATE_PENDING, + ) + yield create_hyperparameter_tuning_job_mock + + +class TestCustomJob: + def setup_method(self): + reload(aiplatform.initializer) + reload(aiplatform) + + def teardown_method(self): + aiplatform.initializer.global_pool.shutdown(wait=True) + + @pytest.mark.parametrize("sync", [True, False]) + def test_create_hyperparameter_tuning_job( + self, + create_hyperparameter_tuning_job_mock, + get_hyperparameter_tuning_job_mock, + sync, + ): + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + custom_job = aiplatform.CustomJob( + display_name=test_custom_job._TEST_DISPLAY_NAME, + worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + ) + + job = aiplatform.HyperparameterTuningJob( + display_name=_TEST_DISPLAY_NAME, + custom_job=custom_job, + metric_spec={_TEST_METRIC_SPEC_KEY: _TEST_METRIC_SPEC_VALUE}, + parameter_spec={ + "lr": hpt.DoubleParameterSpec(min=0.001, max=0.1, scale="log"), + "units": hpt.IntegerParameterSpec(min=4, max=1028, scale="linear"), + "activation": hpt.CategoricalParameterSpec( + values=["relu", "sigmoid", "elu", "selu", "tanh"] + ), + "batch_size": hpt.DiscreteParameterSpec( + values=[16, 32], scale="linear" + ), + }, + parallel_trial_count=_TEST_PARALLEL_TRIAL_COUNT, + max_trial_count=_TEST_MAX_TRIAL_COUNT, + max_failed_trial_count=_TEST_MAX_FAILED_TRIAL_COUNT, + search_algorithm=_TEST_SEARCH_ALGORITHM, + measurement_selection=_TEST_MEASUREMENT_SELECTION, + ) + + job.run( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + sync=sync, + ) + + job.wait() + + expected_hyperparameter_tuning_job = _get_hyperparameter_tuning_job_proto() + + create_hyperparameter_tuning_job_mock.assert_called_once_with( + parent=_TEST_PARENT, + hyperparameter_tuning_job=expected_hyperparameter_tuning_job, + ) + + assert ( + job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED + ) + + @pytest.mark.parametrize("sync", [True, False]) + def test_run_hyperparameter_tuning_job_with_fail_raises( + self, + create_hyperparameter_tuning_job_mock, + get_hyperparameter_tuning_job_mock_with_fail, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + custom_job = aiplatform.CustomJob( + display_name=test_custom_job._TEST_DISPLAY_NAME, + worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + ) + + job = aiplatform.HyperparameterTuningJob( + display_name=_TEST_DISPLAY_NAME, + custom_job=custom_job, + metric_spec={_TEST_METRIC_SPEC_KEY: _TEST_METRIC_SPEC_VALUE}, + parameter_spec={ + "lr": hpt.DoubleParameterSpec(min=0.001, max=0.1, scale="log"), + "units": hpt.IntegerParameterSpec(min=4, max=1028, scale="linear"), + "activation": hpt.CategoricalParameterSpec( + values=["relu", "sigmoid", "elu", "selu", "tanh"] + ), + "batch_size": hpt.DiscreteParameterSpec( + values=[16, 32], scale="linear" + ), + }, + parallel_trial_count=_TEST_PARALLEL_TRIAL_COUNT, + max_trial_count=_TEST_MAX_TRIAL_COUNT, + max_failed_trial_count=_TEST_MAX_FAILED_TRIAL_COUNT, + search_algorithm=_TEST_SEARCH_ALGORITHM, + measurement_selection=_TEST_MEASUREMENT_SELECTION, + ) + + with pytest.raises(RuntimeError): + job.run( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + sync=sync, + ) + + job.wait() + + expected_hyperparameter_tuning_job = _get_hyperparameter_tuning_job_proto() + + create_hyperparameter_tuning_job_mock.assert_called_once_with( + parent=_TEST_PARENT, + hyperparameter_tuning_job=expected_hyperparameter_tuning_job, + ) + + assert job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_FAILED + + def test_hyperparameter_tuning_job_get_state_raises_without_run(self): + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + custom_job = aiplatform.CustomJob( + display_name=test_custom_job._TEST_DISPLAY_NAME, + worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + ) + + job = aiplatform.HyperparameterTuningJob( + display_name=_TEST_DISPLAY_NAME, + custom_job=custom_job, + metric_spec={_TEST_METRIC_SPEC_KEY: _TEST_METRIC_SPEC_VALUE}, + parameter_spec={ + "lr": hpt.DoubleParameterSpec(min=0.001, max=0.1, scale="log"), + "units": hpt.IntegerParameterSpec(min=4, max=1028, scale="linear"), + "activation": hpt.CategoricalParameterSpec( + values=["relu", "sigmoid", "elu", "selu", "tanh"] + ), + "batch_size": hpt.DiscreteParameterSpec( + values=[16, 32, 64], scale="linear" + ), + }, + parallel_trial_count=_TEST_PARALLEL_TRIAL_COUNT, + max_trial_count=_TEST_MAX_TRIAL_COUNT, + max_failed_trial_count=_TEST_MAX_FAILED_TRIAL_COUNT, + search_algorithm=_TEST_SEARCH_ALGORITHM, + measurement_selection=_TEST_MEASUREMENT_SELECTION, + ) + + with pytest.raises(RuntimeError): + print(job.state) + + def test_get_hyperparameter_tuning_job(self, get_hyperparameter_tuning_job_mock): + + job = aiplatform.HyperparameterTuningJob.get( + _TEST_HYPERPARAMETERTUNING_JOB_NAME + ) + + get_hyperparameter_tuning_job_mock.assert_called_once_with( + name=_TEST_HYPERPARAMETERTUNING_JOB_NAME + ) + assert ( + job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_PENDING + ) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 8fd82c7727..75478263e8 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -30,6 +30,9 @@ from google.auth import credentials as auth_credentials +from google.cloud.aiplatform import utils +from google.cloud.aiplatform.utils import source_utils +from google.cloud.aiplatform.utils import worker_spec_utils from google.cloud import aiplatform from google.cloud.aiplatform import datasets @@ -234,7 +237,7 @@ def test_timestamp_copy_to_gcs_calls_gcs_client_with_bucket( mock_client_bucket, mock_blob = mock_client_bucket - gcs_path = training_jobs._timestamped_copy_to_gcs( + gcs_path = utils._timestamped_copy_to_gcs( local_file_path=_TEST_LOCAL_SCRIPT_FILE_PATH, gcs_dir=_TEST_BUCKET_NAME, project=_TEST_PROJECT, @@ -261,7 +264,7 @@ def test_timestamp_copy_to_gcs_calls_gcs_client_with_gcs_path( mock_client_bucket, mock_blob = mock_client_bucket - gcs_path = training_jobs._timestamped_copy_to_gcs( + gcs_path = utils._timestamped_copy_to_gcs( local_file_path=_TEST_LOCAL_SCRIPT_FILE_PATH, gcs_dir=_TEST_GCS_PATH_WITH_TRAILING_SLASH, project=_TEST_PROJECT, @@ -289,7 +292,7 @@ def test_timestamp_copy_to_gcs_calls_gcs_client_with_trailing_slash( mock_client_bucket, mock_blob = mock_client_bucket - gcs_path = training_jobs._timestamped_copy_to_gcs( + gcs_path = utils._timestamped_copy_to_gcs( local_file_path=_TEST_LOCAL_SCRIPT_FILE_PATH, gcs_dir=_TEST_GCS_PATH, project=_TEST_PROJECT, @@ -315,7 +318,7 @@ def test_timestamp_copy_to_gcs_calls_gcs_client(self, mock_client_bucket): mock_client_bucket, mock_blob = mock_client_bucket - gcs_path = training_jobs._timestamped_copy_to_gcs( + gcs_path = utils._timestamped_copy_to_gcs( local_file_path=_TEST_LOCAL_SCRIPT_FILE_PATH, gcs_dir=_TEST_BUCKET_NAME, project=_TEST_PROJECT, @@ -332,10 +335,10 @@ def test_timestamp_copy_to_gcs_calls_gcs_client(self, mock_client_bucket): def test_get_python_executable_raises_if_None(self): with patch.object(sys, "executable", new=None): with pytest.raises(EnvironmentError): - training_jobs._get_python_executable() + source_utils._get_python_executable() def test_get_python_executable_returns_python_executable(self): - assert "python" in training_jobs._get_python_executable().lower() + assert "python" in source_utils._get_python_executable().lower() class TestTrainingScriptPythonPackager: @@ -347,7 +350,7 @@ def setup_method(self): def teardown_method(self): pathlib.Path(_TEST_LOCAL_SCRIPT_FILE_NAME).unlink() - python_package_file = f"{training_jobs._TrainingScriptPythonPackager._ROOT_MODULE}-{training_jobs._TrainingScriptPythonPackager._SETUP_PY_VERSION}.tar.gz" + python_package_file = f"{source_utils._TrainingScriptPythonPackager._ROOT_MODULE}-{source_utils._TrainingScriptPythonPackager._SETUP_PY_VERSION}.tar.gz" if pathlib.Path(python_package_file).is_file(): pathlib.Path(python_package_file).unlink() subprocess.check_output( @@ -355,34 +358,34 @@ def teardown_method(self): "pip3", "uninstall", "-y", - training_jobs._TrainingScriptPythonPackager._ROOT_MODULE, + source_utils._TrainingScriptPythonPackager._ROOT_MODULE, ] ) def test_packager_creates_and_copies_python_package(self): - tsp = training_jobs._TrainingScriptPythonPackager(_TEST_LOCAL_SCRIPT_FILE_NAME) + tsp = source_utils._TrainingScriptPythonPackager(_TEST_LOCAL_SCRIPT_FILE_NAME) tsp.package_and_copy(copy_method=local_copy_method) assert pathlib.Path( f"{tsp._ROOT_MODULE}-{tsp._SETUP_PY_VERSION}.tar.gz" ).is_file() def test_created_package_module_is_installable_and_can_be_run(self): - tsp = training_jobs._TrainingScriptPythonPackager(_TEST_LOCAL_SCRIPT_FILE_NAME) + tsp = source_utils._TrainingScriptPythonPackager(_TEST_LOCAL_SCRIPT_FILE_NAME) source_dist_path = tsp.package_and_copy(copy_method=local_copy_method) subprocess.check_output(["pip3", "install", source_dist_path]) module_output = subprocess.check_output( - [training_jobs._get_python_executable(), "-m", tsp.module_name] + [source_utils._get_python_executable(), "-m", tsp.module_name] ) assert "hello world" in module_output.decode() def test_requirements_are_in_package(self): - tsp = training_jobs._TrainingScriptPythonPackager( + tsp = source_utils._TrainingScriptPythonPackager( _TEST_LOCAL_SCRIPT_FILE_NAME, requirements=_TEST_REQUIREMENTS ) source_dist_path = tsp.package_and_copy(copy_method=local_copy_method) with tarfile.open(source_dist_path) as tf: with tempfile.TemporaryDirectory() as tmpdirname: - setup_py_path = f"{training_jobs._TrainingScriptPythonPackager._ROOT_MODULE}-{training_jobs._TrainingScriptPythonPackager._SETUP_PY_VERSION}/setup.py" + setup_py_path = f"{source_utils._TrainingScriptPythonPackager._ROOT_MODULE}-{source_utils._TrainingScriptPythonPackager._SETUP_PY_VERSION}/setup.py" tf.extract(setup_py_path, path=tmpdirname) setup_py = core.run_setup( pathlib.Path(tmpdirname, setup_py_path), stop_after="init" @@ -395,7 +398,7 @@ def test_packaging_fails_whith_RuntimeError(self): mock_subprocess.communicate.return_value = (b"", b"") mock_subprocess.returncode = 1 mock_popen.return_value = mock_subprocess - tsp = training_jobs._TrainingScriptPythonPackager( + tsp = source_utils._TrainingScriptPythonPackager( _TEST_LOCAL_SCRIPT_FILE_NAME ) with pytest.raises(RuntimeError): @@ -404,7 +407,7 @@ def test_packaging_fails_whith_RuntimeError(self): def test_package_and_copy_to_gcs_copies_to_gcs(self, mock_client_bucket): mock_client_bucket, mock_blob = mock_client_bucket - tsp = training_jobs._TrainingScriptPythonPackager(_TEST_LOCAL_SCRIPT_FILE_NAME) + tsp = source_utils._TrainingScriptPythonPackager(_TEST_LOCAL_SCRIPT_FILE_NAME) gcs_path = tsp.package_and_copy_to_gcs( gcs_staging_dir=_TEST_BUCKET_NAME, project=_TEST_PROJECT @@ -512,7 +515,7 @@ def mock_model_service_get(): @pytest.fixture def mock_python_package_to_gcs(): with mock.patch.object( - training_jobs._TrainingScriptPythonPackager, "package_and_copy_to_gcs" + source_utils._TrainingScriptPythonPackager, "package_and_copy_to_gcs" ) as mock_package_to_copy_gcs: mock_package_to_copy_gcs.return_value = _TEST_OUTPUT_PYTHON_PACKAGE_PATH yield mock_package_to_copy_gcs @@ -630,16 +633,16 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( ] true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, "env": true_env, }, @@ -699,9 +702,11 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, - "serviceAccount": _TEST_SERVICE_ACCOUNT, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, struct_pb2.Value(), @@ -789,16 +794,16 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( ] true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, "env": true_env, }, @@ -858,8 +863,10 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -1064,16 +1071,16 @@ def test_run_call_pipeline_service_create_with_no_dataset( ] true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, "env": true_env, }, @@ -1094,8 +1101,10 @@ def test_run_call_pipeline_service_create_with_no_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -1316,31 +1325,31 @@ def test_run_call_pipeline_service_create_distributed_training( true_worker_pool_spec = [ { - "replicaCount": 1, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, "env": true_env, }, }, { - "replicaCount": 9, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, "env": true_env, }, @@ -1382,8 +1391,10 @@ def test_run_call_pipeline_service_create_distributed_training( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": true_worker_pool_spec, - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -1544,16 +1555,16 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( true_args = _TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, } @@ -1609,8 +1620,10 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -1784,11 +1797,11 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( ] true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, "containerSpec": { "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, @@ -1852,8 +1865,10 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -1932,11 +1947,11 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( true_args = _TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, "containerSpec": { "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, @@ -1999,8 +2014,10 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -2189,11 +2206,11 @@ def test_run_call_pipeline_service_create_with_no_dataset( true_args = _TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, "containerSpec": { "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, @@ -2217,8 +2234,10 @@ def test_run_call_pipeline_service_create_with_no_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -2418,11 +2437,11 @@ def test_run_call_pipeline_service_create_distributed_training( true_worker_pool_spec = [ { - "replicaCount": 1, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, "containerSpec": { "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, @@ -2431,11 +2450,11 @@ def test_run_call_pipeline_service_create_distributed_training( }, }, { - "replicaCount": 9, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, "containerSpec": { "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, @@ -2480,8 +2499,10 @@ def test_run_call_pipeline_service_create_distributed_training( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": true_worker_pool_spec, - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -2558,11 +2579,11 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( true_args = _TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, "containerSpec": { "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, @@ -2622,9 +2643,11 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, - "serviceAccount": _TEST_SERVICE_ACCOUNT, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, struct_pb2.Value(), @@ -2689,7 +2712,7 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset_raises_if_anno class Test_MachineSpec: def test_machine_spec_return_spec_dict(self): - test_spec = training_jobs._MachineSpec( + test_spec = worker_spec_utils._MachineSpec( replica_count=_TEST_REPLICA_COUNT, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -2697,18 +2720,18 @@ def test_machine_spec_return_spec_dict(self): ) true_spec_dict = { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": _TEST_REPLICA_COUNT, + "replica_count": _TEST_REPLICA_COUNT, } assert test_spec.spec_dict == true_spec_dict def test_machine_spec_return_spec_dict_with_no_accelerator(self): - test_spec = training_jobs._MachineSpec( + test_spec = worker_spec_utils._MachineSpec( replica_count=_TEST_REPLICA_COUNT, machine_type=_TEST_MACHINE_TYPE, accelerator_count=0, @@ -2716,14 +2739,14 @@ def test_machine_spec_return_spec_dict_with_no_accelerator(self): ) true_spec_dict = { - "machineSpec": {"machineType": _TEST_MACHINE_TYPE}, - "replicaCount": _TEST_REPLICA_COUNT, + "machine_spec": {"machine_type": _TEST_MACHINE_TYPE}, + "replica_count": _TEST_REPLICA_COUNT, } assert test_spec.spec_dict == true_spec_dict def test_machine_spec_spec_dict_raises_invalid_accelerator(self): - test_spec = training_jobs._MachineSpec( + test_spec = worker_spec_utils._MachineSpec( replica_count=_TEST_REPLICA_COUNT, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -2734,7 +2757,7 @@ def test_machine_spec_spec_dict_raises_invalid_accelerator(self): test_spec.spec_dict def test_machine_spec_spec_dict_is_empty(self): - test_spec = training_jobs._MachineSpec( + test_spec = worker_spec_utils._MachineSpec( replica_count=0, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -2744,7 +2767,7 @@ def test_machine_spec_spec_dict_is_empty(self): assert test_spec.is_empty def test_machine_spec_spec_dict_is_not_empty(self): - test_spec = training_jobs._MachineSpec( + test_spec = worker_spec_utils._MachineSpec( replica_count=_TEST_REPLICA_COUNT, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -2757,26 +2780,26 @@ def test_machine_spec_spec_dict_is_not_empty(self): class Test_DistributedTrainingSpec: def test_machine_spec_returns_pool_spec(self): - spec = training_jobs._DistributedTrainingSpec( - chief_spec=training_jobs._MachineSpec( + spec = worker_spec_utils._DistributedTrainingSpec( + chief_spec=worker_spec_utils._MachineSpec( replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, accelerator_type=_TEST_ACCELERATOR_TYPE, ), - worker_spec=training_jobs._MachineSpec( + worker_spec=worker_spec_utils._MachineSpec( replica_count=10, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, accelerator_type=_TEST_ACCELERATOR_TYPE, ), - parameter_server_spec=training_jobs._MachineSpec( + parameter_server_spec=worker_spec_utils._MachineSpec( replica_count=3, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, accelerator_type=_TEST_ACCELERATOR_TYPE, ), - evaluator_spec=training_jobs._MachineSpec( + evaluator_spec=worker_spec_utils._MachineSpec( replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -2786,36 +2809,36 @@ def test_machine_spec_returns_pool_spec(self): true_pool_spec = [ { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 1, + "replica_count": 1, }, { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 10, + "replica_count": 10, }, { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 3, + "replica_count": 3, }, { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 1, + "replica_count": 1, }, ] @@ -2823,7 +2846,7 @@ def test_machine_spec_returns_pool_spec(self): def test_chief_worker_pool_returns_spec(self): - chief_worker_spec = training_jobs._DistributedTrainingSpec.chief_worker_pool( + chief_worker_spec = worker_spec_utils._DistributedTrainingSpec.chief_worker_pool( replica_count=10, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -2832,20 +2855,20 @@ def test_chief_worker_pool_returns_spec(self): true_pool_spec = [ { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 1, + "replica_count": 1, }, { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 9, + "replica_count": 9, }, ] @@ -2853,7 +2876,7 @@ def test_chief_worker_pool_returns_spec(self): def test_chief_worker_pool_returns_just_chief(self): - chief_worker_spec = training_jobs._DistributedTrainingSpec.chief_worker_pool( + chief_worker_spec = worker_spec_utils._DistributedTrainingSpec.chief_worker_pool( replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -2862,12 +2885,12 @@ def test_chief_worker_pool_returns_just_chief(self): true_pool_spec = [ { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 1, + "replica_count": 1, } ] @@ -2875,8 +2898,8 @@ def test_chief_worker_pool_returns_just_chief(self): def test_machine_spec_raise_with_more_than_one_chief_replica(self): - spec = training_jobs._DistributedTrainingSpec( - chief_spec=training_jobs._MachineSpec( + spec = worker_spec_utils._DistributedTrainingSpec( + chief_spec=worker_spec_utils._MachineSpec( replica_count=2, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -2889,40 +2912,40 @@ def test_machine_spec_raise_with_more_than_one_chief_replica(self): def test_machine_spec_handles_missing_pools(self): - spec = training_jobs._DistributedTrainingSpec( - chief_spec=training_jobs._MachineSpec( + spec = worker_spec_utils._DistributedTrainingSpec( + chief_spec=worker_spec_utils._MachineSpec( replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, accelerator_type=_TEST_ACCELERATOR_TYPE, ), - worker_spec=training_jobs._MachineSpec(replica_count=0), - parameter_server_spec=training_jobs._MachineSpec( + worker_spec=worker_spec_utils._MachineSpec(replica_count=0), + parameter_server_spec=worker_spec_utils._MachineSpec( replica_count=3, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, accelerator_type=_TEST_ACCELERATOR_TYPE, ), - evaluator_spec=training_jobs._MachineSpec(replica_count=0), + evaluator_spec=worker_spec_utils._MachineSpec(replica_count=0), ) true_pool_spec = [ { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 1, + "replica_count": 1, }, - {"machineSpec": {"machineType": "n1-standard-4"}, "replicaCount": 0}, + {"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 0}, { - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replicaCount": 3, + "replica_count": 3, }, ] @@ -2999,16 +3022,16 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( ] true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": _TEST_PYTHON_MODULE_NAME, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, "env": true_env, }, @@ -3068,9 +3091,11 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, - "serviceAccount": _TEST_SERVICE_ACCOUNT, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, struct_pb2.Value(), @@ -3152,16 +3177,16 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_without_model_dis true_args = _TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": _TEST_PYTHON_MODULE_NAME, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, } @@ -3220,8 +3245,10 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_without_model_dis training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -3301,16 +3328,16 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( true_args = _TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": _TEST_PYTHON_MODULE_NAME, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, } @@ -3369,8 +3396,10 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -3563,16 +3592,16 @@ def test_run_call_pipeline_service_create_with_no_dataset( true_args = _TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": _TEST_PYTHON_MODULE_NAME, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, } @@ -3592,8 +3621,10 @@ def test_run_call_pipeline_service_create_with_no_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -3799,30 +3830,30 @@ def test_run_call_pipeline_service_create_distributed_training( true_worker_pool_spec = [ { - "replicaCount": 1, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": _TEST_PYTHON_MODULE_NAME, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, }, { - "replicaCount": 9, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": _TEST_PYTHON_MODULE_NAME, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, }, @@ -3863,8 +3894,10 @@ def test_run_call_pipeline_service_create_distributed_training( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": true_worker_pool_spec, - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), @@ -3940,16 +3973,16 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( true_args = _TEST_RUN_ARGS true_worker_pool_spec = { - "replicaCount": _TEST_REPLICA_COUNT, - "machineSpec": { - "machineType": _TEST_MACHINE_TYPE, - "acceleratorType": _TEST_ACCELERATOR_TYPE, - "acceleratorCount": _TEST_ACCELERATOR_COUNT, + "replica_count": _TEST_REPLICA_COUNT, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "pythonPackageSpec": { - "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, - "pythonModule": _TEST_PYTHON_MODULE_NAME, - "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, }, } @@ -4005,8 +4038,10 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [true_worker_pool_spec], - "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "worker_pool_specs": [true_worker_pool_spec], + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, }, struct_pb2.Value(), ), diff --git a/tests/unit/aiplatform/test_training_utils.py b/tests/unit/aiplatform/test_training_utils.py deleted file mode 100644 index 1d4b839151..0000000000 --- a/tests/unit/aiplatform/test_training_utils.py +++ /dev/null @@ -1,144 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import os -import pytest - -from google.cloud.aiplatform import training_utils -from unittest import mock - -_TEST_TRAINING_DATA_URI = "gs://training-data-uri" -_TEST_VALIDATION_DATA_URI = "gs://test-validation-data-uri" -_TEST_TEST_DATA_URI = "gs://test-data-uri" -_TEST_MODEL_DIR = "gs://test-model-dir" -_TEST_CHECKPOINT_DIR = "gs://test-checkpoint-dir" -_TEST_TENSORBOARD_LOG_DIR = "gs://test-tensorboard-log-dir" -_TEST_CLUSTER_SPEC = """{ - "cluster": { - "worker_pools":[ - { - "index":0, - "replicas":[ - "training-workerpool0-ab-0:2222" - ] - }, - { - "index":1, - "replicas":[ - "training-workerpool1-ab-0:2222", - "training-workerpool1-ab-1:2222" - ] - } - ] - }, - "environment": "cloud", - "task": { - "worker_pool_index":0, - "replica_index":0, - "trial":"TRIAL_ID" - } -}""" - - -class TestTrainingUtils: - @pytest.fixture - def mock_environment(self): - env_vars = { - "AIP_TRAINING_DATA_URI": _TEST_TRAINING_DATA_URI, - "AIP_VALIDATION_DATA_URI": _TEST_VALIDATION_DATA_URI, - "AIP_TEST_DATA_URI": _TEST_TEST_DATA_URI, - "AIP_MODEL_DIR": _TEST_MODEL_DIR, - "AIP_CHECKPOINT_DIR": _TEST_CHECKPOINT_DIR, - "AIP_TENSORBOARD_LOG_DIR": _TEST_TENSORBOARD_LOG_DIR, - "CLUSTER_SPEC": _TEST_CLUSTER_SPEC, - "TF_CONFIG": _TEST_CLUSTER_SPEC, - } - with mock.patch.dict(os.environ, env_vars): - yield - - @pytest.mark.usefixtures("mock_environment") - def test_training_data_uri(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.training_data_uri == _TEST_TRAINING_DATA_URI - - def test_training_data_uri_none(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.training_data_uri is None - - @pytest.mark.usefixtures("mock_environment") - def test_validation_data_uri(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.validation_data_uri == _TEST_VALIDATION_DATA_URI - - def test_validation_data_uri_none(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.validation_data_uri is None - - @pytest.mark.usefixtures("mock_environment") - def test_test_data_uri(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.test_data_uri == _TEST_TEST_DATA_URI - - def test_test_data_uri_none(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.test_data_uri is None - - @pytest.mark.usefixtures("mock_environment") - def test_model_dir(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.model_dir == _TEST_MODEL_DIR - - def test_model_dir_none(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.model_dir is None - - @pytest.mark.usefixtures("mock_environment") - def test_checkpoint_dir(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.checkpoint_dir == _TEST_CHECKPOINT_DIR - - def test_checkpoint_dir_none(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.checkpoint_dir is None - - @pytest.mark.usefixtures("mock_environment") - def test_tensorboard_log_dir(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.tensorboard_log_dir == _TEST_TENSORBOARD_LOG_DIR - - def test_tensorboard_log_dir_none(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.tensorboard_log_dir is None - - @pytest.mark.usefixtures("mock_environment") - def test_cluster_spec(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.cluster_spec == json.loads(_TEST_CLUSTER_SPEC) - - def test_cluster_spec_none(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.cluster_spec is None - - @pytest.mark.usefixtures("mock_environment") - def test_tf_config(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.tf_config == json.loads(_TEST_CLUSTER_SPEC) - - def test_tf_config_none(self): - env_vars = training_utils.EnvironmentVariables() - assert env_vars.tf_config is None