From 8ef0ded034db797adb4d458eba43537992d822bd Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 20 Oct 2021 13:48:43 -0700 Subject: [PATCH] feat: enable reduction server (#741) * feat: enable reduction server * fix: remove optional for reduction_server_replica_count, add comment for _SPEC_ORDERS --- google/cloud/aiplatform/jobs.py | 54 ++- google/cloud/aiplatform/training_jobs.py | 172 ++++++-- .../aiplatform/utils/worker_spec_utils.py | 39 +- tests/unit/aiplatform/test_custom_job.py | 3 + tests/unit/aiplatform/test_training_jobs.py | 375 +++++++++++++++++- 5 files changed, 574 insertions(+), 69 deletions(-) diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index aaadb6e4d7..275f654683 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -1061,6 +1061,9 @@ def from_local_script( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, base_output_dir: Optional[str] = None, project: Optional[str] = None, location: Optional[str] = None, @@ -1127,6 +1130,13 @@ def from_local_script( boot_disk_size_gb (int): Optional. Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + Optional. The type of machine to use for reduction server. + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. + See details: https://cloud.google.com/vertex-ai/docs/training/distributed-training#reduce_training_time_with_reduction_server base_output_dir (str): Optional. GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. @@ -1181,6 +1191,8 @@ def from_local_script( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ).pool_specs python_packager = source_utils._TrainingScriptPythonPackager( @@ -1191,21 +1203,33 @@ def from_local_script( gcs_staging_dir=staging_bucket, project=project, credentials=credentials, ) - for spec in worker_pool_specs: - spec["python_package_spec"] = { - "executor_image_uri": container_uri, - "python_module": python_packager.module_name, - "package_uris": [package_gcs_uri], - } - - if args: - spec["python_package_spec"]["args"] = args - - if environment_variables: - spec["python_package_spec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + for spec_order, spec in enumerate(worker_pool_specs): + + if not spec: + continue + + if ( + spec_order == worker_spec_utils._SPEC_ORDERS["server_spec"] + and reduction_server_replica_count > 0 + ): + spec["container_spec"] = { + "image_uri": reduction_server_container_uri, + } + else: + spec["python_package_spec"] = { + "executor_image_uri": container_uri, + "python_module": python_packager.module_name, + "package_uris": [package_gcs_uri], + } + + if args: + spec["python_package_spec"]["args"] = args + + if environment_variables: + spec["python_package_spec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] return cls( display_name=display_name, diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 87b0fa6d02..eb7680946b 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -1278,6 +1278,8 @@ def _prepare_and_validate_run( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: Optional[str] = None, ) -> Tuple[worker_spec_utils._DistributedTrainingSpec, Optional[gca_model.Model]]: """Create worker pool specs and managed model as well validating the run. @@ -1318,6 +1320,10 @@ def _prepare_and_validate_run( boot_disk_size_gb (int): Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + Optional. The type of machine to use for reduction server. Returns: Worker pools specs and managed model for run. @@ -1352,6 +1358,8 @@ def _prepare_and_validate_run( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ).pool_specs managed_model = self._managed_model @@ -1736,6 +1744,9 @@ def run( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, training_fraction_split: Optional[float] = None, validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, @@ -1907,6 +1918,13 @@ def run( boot_disk_size_gb (int): Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + Optional. The type of machine to use for reduction server. + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. + See details: https://cloud.google.com/vertex-ai/docs/training/distributed-training#reduce_training_time_with_reduction_server training_fraction_split (float): Optional. The fraction of the input data that is to be used to train the Model. This is ignored if Dataset is not provided. @@ -1989,6 +2007,8 @@ def run( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ) # make and copy package @@ -2017,6 +2037,9 @@ def run( predefined_split_column_name=predefined_split_column_name, timestamp_split_column_name=timestamp_split_column_name, tensorboard=tensorboard, + reduction_server_container_uri=reduction_server_container_uri + if reduction_server_replica_count > 0 + else None, sync=sync, ) @@ -2050,6 +2073,7 @@ def _run( predefined_split_column_name: Optional[str] = None, timestamp_split_column_name: Optional[str] = None, tensorboard: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Packages local script and launches training_job. @@ -2182,6 +2206,8 @@ def _run( `service_account` is required with provided `tensorboard`. For more information on configuring your service account please visit: https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2197,21 +2223,33 @@ def _run( credentials=self.credentials, ) - for spec in worker_pool_specs: - spec["python_package_spec"] = { - "executor_image_uri": self._container_uri, - "python_module": python_packager.module_name, - "package_uris": [package_gcs_uri], - } + for spec_order, spec in enumerate(worker_pool_specs): - if args: - spec["python_package_spec"]["args"] = args + if not spec: + continue - if environment_variables: - spec["python_package_spec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + if ( + spec_order == worker_spec_utils._SPEC_ORDERS["server_spec"] + and reduction_server_container_uri + ): + spec["container_spec"] = { + "image_uri": reduction_server_container_uri, + } + else: + spec["python_package_spec"] = { + "executor_image_uri": self._container_uri, + "python_module": python_packager.module_name, + "package_uris": [package_gcs_uri], + } + + if args: + spec["python_package_spec"]["args"] = args + + if environment_variables: + spec["python_package_spec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] ( training_task_inputs, @@ -2498,6 +2536,9 @@ def run( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, training_fraction_split: Optional[float] = None, validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, @@ -2662,6 +2703,13 @@ def run( boot_disk_size_gb (int): Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + Optional. The type of machine to use for reduction server. + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. + See details: https://cloud.google.com/vertex-ai/docs/training/distributed-training#reduce_training_time_with_reduction_server training_fraction_split (float): Optional. The fraction of the input data that is to be used to train the Model. This is ignored if Dataset is not provided. @@ -2749,6 +2797,8 @@ def run( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ) return self._run( @@ -2771,6 +2821,9 @@ def run( predefined_split_column_name=predefined_split_column_name, timestamp_split_column_name=timestamp_split_column_name, tensorboard=tensorboard, + reduction_server_container_uri=reduction_server_container_uri + if reduction_server_replica_count > 0 + else None, sync=sync, ) @@ -2803,6 +2856,7 @@ def _run( predefined_split_column_name: Optional[str] = None, timestamp_split_column_name: Optional[str] = None, tensorboard: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Packages local script and launches training_job. @@ -2931,6 +2985,8 @@ def _run( `service_account` is required with provided `tensorboard`. For more information on configuring your service account please visit: https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2941,20 +2997,32 @@ def _run( produce a Vertex AI Model. """ - for spec in worker_pool_specs: - spec["containerSpec"] = {"imageUri": self._container_uri} + for spec_order, spec in enumerate(worker_pool_specs): - if self._command: - spec["containerSpec"]["command"] = self._command + if not spec: + continue - if args: - spec["containerSpec"]["args"] = args + if ( + spec_order == worker_spec_utils._SPEC_ORDERS["server_spec"] + and reduction_server_container_uri + ): + spec["container_spec"] = { + "image_uri": reduction_server_container_uri, + } + else: + spec["containerSpec"] = {"imageUri": self._container_uri} - if environment_variables: - spec["containerSpec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + if self._command: + spec["containerSpec"]["command"] = self._command + + if args: + spec["containerSpec"]["args"] = args + + if environment_variables: + spec["containerSpec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] ( training_task_inputs, @@ -5231,6 +5299,9 @@ def run( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, training_fraction_split: Optional[float] = None, validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, @@ -5395,6 +5466,13 @@ def run( boot_disk_size_gb (int): Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + Optional. The type of machine to use for reduction server. + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. + See details: https://cloud.google.com/vertex-ai/docs/training/distributed-training#reduce_training_time_with_reduction_server training_fraction_split (float): Optional. The fraction of the input data that is to be used to train the Model. This is ignored if Dataset is not provided. @@ -5477,6 +5555,8 @@ def run( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ) return self._run( @@ -5499,6 +5579,9 @@ def run( timestamp_split_column_name=timestamp_split_column_name, bigquery_destination=bigquery_destination, tensorboard=tensorboard, + reduction_server_container_uri=reduction_server_container_uri + if reduction_server_replica_count > 0 + else None, sync=sync, ) @@ -5531,6 +5614,7 @@ def _run( timestamp_split_column_name: Optional[str] = None, bigquery_destination: Optional[str] = None, tensorboard: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, sync=True, ) -> Optional[models.Model]: """Packages local script and launches training_job. @@ -5646,6 +5730,8 @@ def _run( `service_account` is required with provided `tensorboard`. For more information on configuring your service account please visit: https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -5655,21 +5741,33 @@ def _run( model: The trained Vertex AI Model resource or None if training did not produce a Vertex AI Model. """ - for spec in worker_pool_specs: - spec["python_package_spec"] = { - "executor_image_uri": self._container_uri, - "python_module": self._python_module, - "package_uris": [self._package_gcs_uri], - } + for spec_order, spec in enumerate(worker_pool_specs): - if args: - spec["python_package_spec"]["args"] = args + if not spec: + continue - if environment_variables: - spec["python_package_spec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + if ( + spec_order == worker_spec_utils._SPEC_ORDERS["server_spec"] + and reduction_server_container_uri + ): + spec["container_spec"] = { + "image_uri": reduction_server_container_uri, + } + else: + spec["python_package_spec"] = { + "executor_image_uri": self._container_uri, + "python_module": self._python_module, + "package_uris": [self._package_gcs_uri], + } + + if args: + spec["python_package_spec"]["args"] = args + + if environment_variables: + spec["python_package_spec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] ( training_task_inputs, diff --git a/google/cloud/aiplatform/utils/worker_spec_utils.py b/google/cloud/aiplatform/utils/worker_spec_utils.py index 1c0b60540f..6cad7d562f 100644 --- a/google/cloud/aiplatform/utils/worker_spec_utils.py +++ b/google/cloud/aiplatform/utils/worker_spec_utils.py @@ -21,6 +21,17 @@ accelerator_type as gca_accelerator_type_compat, ) +# `_SPEC_ORDERS` contains the worker pool spec type and its order in the `_WorkerPoolSpec`. +# The `server_spec` supports either reduction server or parameter server, each +# with different configuration for its `container_spec`. This mapping will be +# used during configuration of `container_spec` for all worker pool specs. +_SPEC_ORDERS = { + "chief_spec": 0, + "worker_spec": 1, + "server_spec": 2, + "evaluator_spec": 3, +} + class _WorkerPoolSpec(NamedTuple): """Specification container for Worker Pool specs used for distributed training. @@ -129,7 +140,7 @@ class _DistributedTrainingSpec(NamedTuple): chief_spec: _WorkerPoolSpec = _WorkerPoolSpec() worker_spec: _WorkerPoolSpec = _WorkerPoolSpec() - parameter_server_spec: _WorkerPoolSpec = _WorkerPoolSpec() + server_spec: _WorkerPoolSpec = _WorkerPoolSpec() evaluator_spec: _WorkerPoolSpec = _WorkerPoolSpec() @property @@ -152,10 +163,10 @@ def pool_specs( spec_order = [ self.chief_spec, self.worker_spec, - self.parameter_server_spec, + self.server_spec, self.evaluator_spec, ] - specs = [s.spec_dict for s in spec_order] + specs = [{} if s.is_empty else s.spec_dict for s in spec_order] for i in reversed(range(len(spec_order))): if spec_order[i].is_empty: specs.pop() @@ -172,6 +183,8 @@ def chief_worker_pool( accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: str = None, ) -> "_DistributedTrainingSpec": """Parameterizes Config to support only chief with worker replicas. @@ -197,10 +210,15 @@ def chief_worker_pool( boot_disk_size_gb (int): Size in GB of the boot disk (default is 100GB). boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + The type of machine to use for reduction server, default is `n1-highcpu-16`. Returns: - _DistributedTrainingSpec representing one chief and n workers all of same - type. If replica_count <= 0 then an empty spec is returned. + _DistributedTrainingSpec representing one chief and n workers all of + same type, optional with reduction server(s). If replica_count <= 0 + then an empty spec is returned. """ if replica_count <= 0: return cls() @@ -223,4 +241,13 @@ def chief_worker_pool( boot_disk_size_gb=boot_disk_size_gb, ) - return cls(chief_spec=chief_spec, worker_spec=worker_spec) + reduction_server_spec = _WorkerPoolSpec( + replica_count=reduction_server_replica_count, + machine_type=reduction_server_machine_type, + ) + + return cls( + chief_spec=chief_spec, + worker_spec=worker_spec, + server_spec=reduction_server_spec, + ) diff --git a/tests/unit/aiplatform/test_custom_job.py b/tests/unit/aiplatform/test_custom_job.py index f44a1471cc..040ce3e69a 100644 --- a/tests/unit/aiplatform/test_custom_job.py +++ b/tests/unit/aiplatform/test_custom_job.py @@ -520,6 +520,9 @@ def test_create_from_local_script_with_all_args( accelerator_count=test_training_jobs._TEST_ACCELERATOR_COUNT, boot_disk_type=test_training_jobs._TEST_BOOT_DISK_TYPE, boot_disk_size_gb=test_training_jobs._TEST_BOOT_DISK_SIZE_GB, + reduction_server_replica_count=test_training_jobs._TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=test_training_jobs._TEST_REDUCTION_SERVER_MACHINE_TYPE, + reduction_server_container_uri=test_training_jobs._TEST_REDUCTION_SERVER_CONTAINER_URI, base_output_dir=_TEST_BASE_OUTPUT_DIR, labels=_TEST_LABELS, ) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 1a919f1635..34b7b63617 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -93,6 +93,11 @@ _TEST_RUN_ARGS = ["-v", 0.1, "--test=arg"] _TEST_REPLICA_COUNT = 1 _TEST_MACHINE_TYPE = "n1-standard-4" +_TEST_REDUCTION_SERVER_REPLICA_COUNT = 1 +_TEST_REDUCTION_SERVER_MACHINE_TYPE = "n1-highcpu-16" +_TEST_REDUCTION_SERVER_CONTAINER_URI = ( + "us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest" +) _TEST_ACCELERATOR_TYPE = "NVIDIA_TESLA_K80" _TEST_INVALID_ACCELERATOR_TYPE = "NVIDIA_DOES_NOT_EXIST" _TEST_ACCELERATOR_COUNT = 1 @@ -464,6 +469,12 @@ def make_training_pipeline(state, add_training_task_metadata=True): ) +def make_training_pipeline_with_no_model_upload(state): + return gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, state=state, + ) + + @pytest.fixture def mock_pipeline_service_get(): with mock.patch.object( @@ -1520,6 +1531,132 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( + self, + mock_pipeline_service_create_with_no_model_to_upload, + mock_pipeline_service_get_with_no_model_to_upload, + mock_python_package_to_gcs, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomTrainingJob( + display_name=_TEST_DISPLAY_NAME, + script_path=_TEST_LOCAL_SCRIPT_FILE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + reduction_server_container_uri=_TEST_REDUCTION_SERVER_CONTAINER_URI, + sync=sync, + ) + + if not sync: + job.wait() + + mock_python_package_to_gcs.assert_called_once_with( + gcs_staging_dir=_TEST_BUCKET_NAME, + project=_TEST_PROJECT, + credentials=initializer.global_config.credentials, + ) + + true_args = _TEST_RUN_ARGS + true_env = [ + {"name": key, "value": value} + for key, value in _TEST_ENVIRONMENT_VARIABLES.items() + ] + + true_worker_pool_spec = [ + { + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + "env": true_env, + }, + }, + { + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + "env": true_env, + }, + }, + { + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + }, + ] + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.custom_task, + training_task_inputs=json_format.ParseDict( + { + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + }, + struct_pb2.Value(), + ), + ) + + mock_pipeline_service_create_with_no_model_to_upload.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource == make_training_pipeline_with_no_model_upload( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.usefixtures("get_training_job_custom_mock") def test_get_training_job(self, get_training_job_custom_mock): aiplatform.init(project=_TEST_PROJECT) @@ -2719,6 +2856,116 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( + self, + mock_pipeline_service_create_with_no_model_to_upload, + mock_pipeline_service_get_with_no_model_to_upload, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomContainerTrainingJob( + display_name=_TEST_DISPLAY_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + command=_TEST_TRAINING_CONTAINER_CMD, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + reduction_server_container_uri=_TEST_REDUCTION_SERVER_CONTAINER_URI, + sync=sync, + ) + + if not sync: + job.wait() + + true_args = _TEST_RUN_ARGS + + true_worker_pool_spec = [ + { + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "containerSpec": { + "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, + "command": _TEST_TRAINING_CONTAINER_CMD, + "args": true_args, + }, + }, + { + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "containerSpec": { + "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, + "command": _TEST_TRAINING_CONTAINER_CMD, + "args": true_args, + }, + }, + { + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + }, + ] + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.custom_task, + training_task_inputs=json_format.ParseDict( + { + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + }, + struct_pb2.Value(), + ), + ) + + mock_pipeline_service_create_with_no_model_to_upload.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource == make_training_pipeline_with_no_model_upload( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset( self, @@ -2938,7 +3185,7 @@ def test_machine_spec_return_spec_dict(self): assert test_spec.spec_dict == true_spec_dict - def test_machine_spec_return_spec_with_boot_disk_dict(self): + def test_machine_spec_return_spec_dict_with_boot_disk(self): test_spec = worker_spec_utils._WorkerPoolSpec( replica_count=_TEST_REPLICA_COUNT, machine_type=_TEST_MACHINE_TYPE, @@ -3030,7 +3277,7 @@ def test_machine_spec_returns_pool_spec(self): accelerator_count=_TEST_ACCELERATOR_COUNT, accelerator_type=_TEST_ACCELERATOR_TYPE, ), - parameter_server_spec=worker_spec_utils._WorkerPoolSpec( + server_spec=worker_spec_utils._WorkerPoolSpec( replica_count=3, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -3185,7 +3432,7 @@ def test_machine_spec_handles_missing_pools(self): accelerator_type=_TEST_ACCELERATOR_TYPE, ), worker_spec=worker_spec_utils._WorkerPoolSpec(replica_count=0), - parameter_server_spec=worker_spec_utils._WorkerPoolSpec( + server_spec=worker_spec_utils._WorkerPoolSpec( replica_count=3, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -3207,14 +3454,7 @@ def test_machine_spec_handles_missing_pools(self): "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, }, }, - { - "machine_spec": {"machine_type": "n1-standard-4"}, - "replica_count": 0, - "disk_spec": { - "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, - "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, - }, - }, + {}, { "machine_spec": { "machine_type": _TEST_MACHINE_TYPE, @@ -4246,6 +4486,119 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( + self, + mock_pipeline_service_create_with_no_model_to_upload, + mock_pipeline_service_get_with_no_model_to_upload, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomPythonPackageTrainingJob( + display_name=_TEST_DISPLAY_NAME, + python_package_gcs_uri=_TEST_OUTPUT_PYTHON_PACKAGE_PATH, + python_module_name=_TEST_PYTHON_MODULE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + reduction_server_container_uri=_TEST_REDUCTION_SERVER_CONTAINER_URI, + sync=sync, + ) + + if not sync: + job.wait() + + true_args = _TEST_RUN_ARGS + + true_worker_pool_spec = [ + { + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + }, + }, + { + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + }, + }, + { + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + }, + ] + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.custom_task, + training_task_inputs=json_format.ParseDict( + { + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + }, + struct_pb2.Value(), + ), + ) + + mock_pipeline_service_create_with_no_model_to_upload.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource == make_training_pipeline_with_no_model_upload( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset_without_model_display_name_nor_model_labels( self,