diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index aaadb6e4d7..d1dee3913a 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -1061,6 +1061,8 @@ def from_local_script( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: str = "n1-highcpu-16", base_output_dir: Optional[str] = None, project: Optional[str] = None, location: Optional[str] = None, @@ -1127,6 +1129,10 @@ def from_local_script( boot_disk_size_gb (int): Optional. Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + The type of machine to use for reduction server, default is `n1-highcpu-16`. base_output_dir (str): Optional. GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. @@ -1181,7 +1187,15 @@ def from_local_script( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, - ).pool_specs + ) + + if reduction_server_replica_count > 0: + worker_pool_specs = worker_pool_specs.reduction_server_pool( + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, + ) + + worker_pool_specs = worker_pool_specs.pool_specs python_packager = source_utils._TrainingScriptPythonPackager( script_path=script_path, requirements=requirements @@ -1192,20 +1206,21 @@ def from_local_script( ) for spec in worker_pool_specs: - spec["python_package_spec"] = { - "executor_image_uri": container_uri, - "python_module": python_packager.module_name, - "package_uris": [package_gcs_uri], - } - - if args: - spec["python_package_spec"]["args"] = args - - if environment_variables: - spec["python_package_spec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + if spec and "container_spec" not in spec: + spec["python_package_spec"] = { + "executor_image_uri": container_uri, + "python_module": python_packager.module_name, + "package_uris": [package_gcs_uri], + } + + if args: + spec["python_package_spec"]["args"] = args + + if environment_variables: + spec["python_package_spec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] return cls( display_name=display_name, diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 8d8583f850..70edbe3ba1 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -1278,6 +1278,8 @@ def _prepare_and_validate_run( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: str = "n1-highcpu-16", ) -> Tuple[worker_spec_utils._DistributedTrainingSpec, Optional[gca_model.Model]]: """Create worker pool specs and managed model as well validating the run. @@ -1318,6 +1320,10 @@ def _prepare_and_validate_run( boot_disk_size_gb (int): Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + The type of machine to use for reduction server, default is `n1-highcpu-16`. Returns: Worker pools specs and managed model for run. @@ -1352,7 +1358,15 @@ def _prepare_and_validate_run( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, - ).pool_specs + ) + + if reduction_server_replica_count > 0: + worker_pool_specs = worker_pool_specs.reduction_server_pool( + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, + ) + + worker_pool_specs = worker_pool_specs.pool_specs managed_model = self._managed_model if model_display_name: @@ -1736,6 +1750,8 @@ def run( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: str = "n1-highcpu-16", training_fraction_split: Optional[float] = None, validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, @@ -1907,6 +1923,10 @@ def run( boot_disk_size_gb (int): Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + The type of machine to use for reduction server, default is `n1-highcpu-16`. training_fraction_split (float): Optional. The fraction of the input data that is to be used to train the Model. This is ignored if Dataset is not provided. @@ -1989,6 +2009,8 @@ def run( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ) # make and copy package @@ -2198,20 +2220,21 @@ def _run( ) for spec in worker_pool_specs: - spec["python_package_spec"] = { - "executor_image_uri": self._container_uri, - "python_module": python_packager.module_name, - "package_uris": [package_gcs_uri], - } + if spec and "container_spec" not in spec: + spec["python_package_spec"] = { + "executor_image_uri": self._container_uri, + "python_module": python_packager.module_name, + "package_uris": [package_gcs_uri], + } - if args: - spec["python_package_spec"]["args"] = args + if args: + spec["python_package_spec"]["args"] = args - if environment_variables: - spec["python_package_spec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + if environment_variables: + spec["python_package_spec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] ( training_task_inputs, @@ -2498,6 +2521,8 @@ def run( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: str = "n1-highcpu-16", training_fraction_split: Optional[float] = None, validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, @@ -2662,6 +2687,10 @@ def run( boot_disk_size_gb (int): Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + The type of machine to use for reduction server, default is `n1-highcpu-16`. training_fraction_split (float): Optional. The fraction of the input data that is to be used to train the Model. This is ignored if Dataset is not provided. @@ -2749,6 +2778,8 @@ def run( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ) return self._run( @@ -2942,19 +2973,20 @@ def _run( """ for spec in worker_pool_specs: - spec["containerSpec"] = {"imageUri": self._container_uri} + if spec and "container_spec" not in spec: + spec["containerSpec"] = {"imageUri": self._container_uri} - if self._command: - spec["containerSpec"]["command"] = self._command + if self._command: + spec["containerSpec"]["command"] = self._command - if args: - spec["containerSpec"]["args"] = args + if args: + spec["containerSpec"]["args"] = args - if environment_variables: - spec["containerSpec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + if environment_variables: + spec["containerSpec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] ( training_task_inputs, @@ -5102,6 +5134,8 @@ def run( accelerator_count: int = 0, boot_disk_type: str = "pd-ssd", boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: str = "n1-highcpu-16", training_fraction_split: Optional[float] = None, validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, @@ -5266,6 +5300,10 @@ def run( boot_disk_size_gb (int): Size in GB of the boot disk, default is 100GB. boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + The type of machine to use for reduction server, default is `n1-highcpu-16`. training_fraction_split (float): Optional. The fraction of the input data that is to be used to train the Model. This is ignored if Dataset is not provided. @@ -5348,6 +5386,8 @@ def run( accelerator_type=accelerator_type, boot_disk_type=boot_disk_type, boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ) return self._run( @@ -5527,20 +5567,21 @@ def _run( produce a Vertex AI Model. """ for spec in worker_pool_specs: - spec["python_package_spec"] = { - "executor_image_uri": self._container_uri, - "python_module": self._python_module, - "package_uris": [self._package_gcs_uri], - } + if spec and "container_spec" not in spec: + spec["python_package_spec"] = { + "executor_image_uri": self._container_uri, + "python_module": self._python_module, + "package_uris": [self._package_gcs_uri], + } - if args: - spec["python_package_spec"]["args"] = args + if args: + spec["python_package_spec"]["args"] = args - if environment_variables: - spec["python_package_spec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + if environment_variables: + spec["python_package_spec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] ( training_task_inputs, diff --git a/google/cloud/aiplatform/utils/worker_spec_utils.py b/google/cloud/aiplatform/utils/worker_spec_utils.py index 1c0b60540f..2a7e7eaacf 100644 --- a/google/cloud/aiplatform/utils/worker_spec_utils.py +++ b/google/cloud/aiplatform/utils/worker_spec_utils.py @@ -21,6 +21,10 @@ accelerator_type as gca_accelerator_type_compat, ) +REDUCTION_SERVER_CONTAINER_URI = ( + "us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest" +) + class _WorkerPoolSpec(NamedTuple): """Specification container for Worker Pool specs used for distributed training. @@ -45,6 +49,7 @@ class _WorkerPoolSpec(NamedTuple): accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED" boot_disk_type: str = "pd-ssd" boot_disk_size_gb: int = 100 + container_uri: str = None def _get_accelerator_type(self) -> Optional[str]: """Validates accelerator_type and returns the name of the accelerator. @@ -86,6 +91,10 @@ def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]: spec["machine_spec"]["accelerator_type"] = accelerator_type spec["machine_spec"]["accelerator_count"] = self.accelerator_count + if self.container_uri == REDUCTION_SERVER_CONTAINER_URI: + spec["container_spec"] = { + "image_uri": self.container_uri, + } return spec @property @@ -129,7 +138,7 @@ class _DistributedTrainingSpec(NamedTuple): chief_spec: _WorkerPoolSpec = _WorkerPoolSpec() worker_spec: _WorkerPoolSpec = _WorkerPoolSpec() - parameter_server_spec: _WorkerPoolSpec = _WorkerPoolSpec() + server_spec: _WorkerPoolSpec = _WorkerPoolSpec() evaluator_spec: _WorkerPoolSpec = _WorkerPoolSpec() @property @@ -152,17 +161,52 @@ def pool_specs( spec_order = [ self.chief_spec, self.worker_spec, - self.parameter_server_spec, + self.server_spec, self.evaluator_spec, ] specs = [s.spec_dict for s in spec_order] + last_non_empty_seen = False for i in reversed(range(len(spec_order))): if spec_order[i].is_empty: - specs.pop() + if not last_non_empty_seen: + specs.pop() + else: + specs[i] = {} else: - break + last_non_empty_seen = True return specs + def reduction_server_pool( + self, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: str = "n1-highcpu-16", + ): + """Parameterizes Config to support only reduction server replicas. + + Args: + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + The type of machine to use for reduction server, default is `n1-highcpu-16`. + + Raise: + ValueError if chief worker is not set + """ + reduction_server_spec = _WorkerPoolSpec( + replica_count=reduction_server_replica_count, + machine_type=reduction_server_machine_type, + container_uri=REDUCTION_SERVER_CONTAINER_URI, + ) + + if self.chief_spec.is_empty: + raise ValueError("reduction server can not be set without chief worker.") + + return _DistributedTrainingSpec( + chief_spec=self.chief_spec, + worker_spec=self.worker_spec, + server_spec=reduction_server_spec, + ) + @classmethod def chief_worker_pool( cls, diff --git a/tests/unit/aiplatform/test_custom_job.py b/tests/unit/aiplatform/test_custom_job.py index f44a1471cc..a2962f766e 100644 --- a/tests/unit/aiplatform/test_custom_job.py +++ b/tests/unit/aiplatform/test_custom_job.py @@ -520,6 +520,8 @@ def test_create_from_local_script_with_all_args( accelerator_count=test_training_jobs._TEST_ACCELERATOR_COUNT, boot_disk_type=test_training_jobs._TEST_BOOT_DISK_TYPE, boot_disk_size_gb=test_training_jobs._TEST_BOOT_DISK_SIZE_GB, + reduction_server_replica_count=test_training_jobs._TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=test_training_jobs._TEST_REDUCTION_SERVER_MACHINE_TYPE, base_output_dir=_TEST_BASE_OUTPUT_DIR, labels=_TEST_LABELS, ) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 1a919f1635..67132a0e89 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -93,6 +93,11 @@ _TEST_RUN_ARGS = ["-v", 0.1, "--test=arg"] _TEST_REPLICA_COUNT = 1 _TEST_MACHINE_TYPE = "n1-standard-4" +_TEST_REDUCTION_SERVER_REPLICA_COUNT = 1 +_TEST_REDUCTION_SERVER_MACHINE_TYPE = "n1-highcpu-16" +_TEST_REDUCTION_SERVER_CONTAINER_URI = ( + "us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest" +) _TEST_ACCELERATOR_TYPE = "NVIDIA_TESLA_K80" _TEST_INVALID_ACCELERATOR_TYPE = "NVIDIA_DOES_NOT_EXIST" _TEST_ACCELERATOR_COUNT = 1 @@ -464,6 +469,12 @@ def make_training_pipeline(state, add_training_task_metadata=True): ) +def make_training_pipeline_with_no_model_upload(state): + return gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, state=state, + ) + + @pytest.fixture def mock_pipeline_service_get(): with mock.patch.object( @@ -1520,6 +1531,131 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( + self, + mock_pipeline_service_create_with_no_model_to_upload, + mock_pipeline_service_get_with_no_model_to_upload, + mock_python_package_to_gcs, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomTrainingJob( + display_name=_TEST_DISPLAY_NAME, + script_path=_TEST_LOCAL_SCRIPT_FILE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + sync=sync, + ) + + if not sync: + job.wait() + + mock_python_package_to_gcs.assert_called_once_with( + gcs_staging_dir=_TEST_BUCKET_NAME, + project=_TEST_PROJECT, + credentials=initializer.global_config.credentials, + ) + + true_args = _TEST_RUN_ARGS + true_env = [ + {"name": key, "value": value} + for key, value in _TEST_ENVIRONMENT_VARIABLES.items() + ] + + true_worker_pool_spec = [ + { + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + "env": true_env, + }, + }, + { + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": source_utils._TrainingScriptPythonPackager.module_name, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + "env": true_env, + }, + }, + { + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + }, + ] + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.custom_task, + training_task_inputs=json_format.ParseDict( + { + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + }, + struct_pb2.Value(), + ), + ) + + mock_pipeline_service_create_with_no_model_to_upload.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource == make_training_pipeline_with_no_model_upload( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.usefixtures("get_training_job_custom_mock") def test_get_training_job(self, get_training_job_custom_mock): aiplatform.init(project=_TEST_PROJECT) @@ -2719,6 +2855,115 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( + self, + mock_pipeline_service_create_with_no_model_to_upload, + mock_pipeline_service_get_with_no_model_to_upload, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomContainerTrainingJob( + display_name=_TEST_DISPLAY_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + command=_TEST_TRAINING_CONTAINER_CMD, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + sync=sync, + ) + + if not sync: + job.wait() + + true_args = _TEST_RUN_ARGS + + true_worker_pool_spec = [ + { + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "containerSpec": { + "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, + "command": _TEST_TRAINING_CONTAINER_CMD, + "args": true_args, + }, + }, + { + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "containerSpec": { + "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, + "command": _TEST_TRAINING_CONTAINER_CMD, + "args": true_args, + }, + }, + { + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + }, + ] + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.custom_task, + training_task_inputs=json_format.ParseDict( + { + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + }, + struct_pb2.Value(), + ), + ) + + mock_pipeline_service_create_with_no_model_to_upload.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource == make_training_pipeline_with_no_model_upload( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset( self, @@ -2938,7 +3183,7 @@ def test_machine_spec_return_spec_dict(self): assert test_spec.spec_dict == true_spec_dict - def test_machine_spec_return_spec_with_boot_disk_dict(self): + def test_machine_spec_return_spec_dict_with_boot_disk(self): test_spec = worker_spec_utils._WorkerPoolSpec( replica_count=_TEST_REPLICA_COUNT, machine_type=_TEST_MACHINE_TYPE, @@ -3013,6 +3258,25 @@ def test_machine_spec_spec_dict_is_not_empty(self): assert not test_spec.is_empty + def test_machine_spec_return_with_container_uri_spec_dict(self): + test_spec = worker_spec_utils._WorkerPoolSpec( + replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + container_uri=_TEST_REDUCTION_SERVER_CONTAINER_URI, + ) + + true_spec_dict = { + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + } + + assert test_spec.spec_dict == true_spec_dict + class Test_DistributedTrainingSpec: def test_machine_spec_returns_pool_spec(self): @@ -3030,7 +3294,7 @@ def test_machine_spec_returns_pool_spec(self): accelerator_count=_TEST_ACCELERATOR_COUNT, accelerator_type=_TEST_ACCELERATOR_TYPE, ), - parameter_server_spec=worker_spec_utils._WorkerPoolSpec( + server_spec=worker_spec_utils._WorkerPoolSpec( replica_count=3, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -3185,7 +3449,7 @@ def test_machine_spec_handles_missing_pools(self): accelerator_type=_TEST_ACCELERATOR_TYPE, ), worker_spec=worker_spec_utils._WorkerPoolSpec(replica_count=0), - parameter_server_spec=worker_spec_utils._WorkerPoolSpec( + server_spec=worker_spec_utils._WorkerPoolSpec( replica_count=3, machine_type=_TEST_MACHINE_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, @@ -3207,21 +3471,65 @@ def test_machine_spec_handles_missing_pools(self): "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, }, }, + {}, { - "machine_spec": {"machine_type": "n1-standard-4"}, - "replica_count": 0, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "replica_count": 3, "disk_spec": { "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, }, }, + ] + + assert spec.pool_specs == true_pool_spec + + def test_reduction_server_pool_returns_spec_with_chief_worker(self): + + worker_pool_specs = worker_spec_utils._DistributedTrainingSpec.chief_worker_pool( + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ) + worker_pool_specs = worker_pool_specs.reduction_server_pool( + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + ) + + true_pool_spec = [ { "machine_spec": { "machine_type": _TEST_MACHINE_TYPE, "accelerator_type": _TEST_ACCELERATOR_TYPE, "accelerator_count": _TEST_ACCELERATOR_COUNT, }, - "replica_count": 3, + "replica_count": 1, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + }, + { + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "replica_count": 9, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + }, + { + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, "disk_spec": { "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, @@ -3229,7 +3537,55 @@ def test_machine_spec_handles_missing_pools(self): }, ] - assert spec.pool_specs == true_pool_spec + assert worker_pool_specs.pool_specs == true_pool_spec + + def test_reduction_server_pool_returns_spec_no_chief(self): + worker_pool_specs = worker_spec_utils._DistributedTrainingSpec() + with pytest.raises(ValueError): + worker_pool_specs.reduction_server_pool( + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + ) + + def test_reduction_server_pool_returns_spec_chief_only(self): + + worker_pool_specs = worker_spec_utils._DistributedTrainingSpec.chief_worker_pool( + replica_count=1, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ) + worker_pool_specs = worker_pool_specs.reduction_server_pool( + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + ) + + true_pool_spec = [ + { + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "replica_count": 1, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + }, + {}, + { + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + }, + ] + + assert worker_pool_specs.pool_specs == true_pool_spec class TestCustomPythonPackageTrainingJob: @@ -4246,6 +4602,118 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( + self, + mock_pipeline_service_create_with_no_model_to_upload, + mock_pipeline_service_get_with_no_model_to_upload, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomPythonPackageTrainingJob( + display_name=_TEST_DISPLAY_NAME, + python_package_gcs_uri=_TEST_OUTPUT_PYTHON_PACKAGE_PATH, + python_module_name=_TEST_PYTHON_MODULE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + reduction_server_replica_count=_TEST_REDUCTION_SERVER_REPLICA_COUNT, + reduction_server_machine_type=_TEST_REDUCTION_SERVER_MACHINE_TYPE, + sync=sync, + ) + + if not sync: + job.wait() + + true_args = _TEST_RUN_ARGS + + true_worker_pool_spec = [ + { + "replica_count": 1, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + }, + }, + { + "replica_count": 9, + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + "python_package_spec": { + "executor_image_uri": _TEST_TRAINING_CONTAINER_IMAGE, + "python_module": _TEST_PYTHON_MODULE_NAME, + "package_uris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + }, + }, + { + "replica_count": _TEST_REDUCTION_SERVER_REPLICA_COUNT, + "machine_spec": {"machine_type": _TEST_REDUCTION_SERVER_MACHINE_TYPE}, + "container_spec": {"image_uri": _TEST_REDUCTION_SERVER_CONTAINER_URI}, + "disk_spec": { + "boot_disk_type": _TEST_BOOT_DISK_TYPE_DEFAULT, + "boot_disk_size_gb": _TEST_BOOT_DISK_SIZE_GB_DEFAULT, + }, + }, + ] + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.custom_task, + training_task_inputs=json_format.ParseDict( + { + "worker_pool_specs": true_worker_pool_spec, + "base_output_directory": { + "output_uri_prefix": _TEST_BASE_OUTPUT_DIR + }, + }, + struct_pb2.Value(), + ), + ) + + mock_pipeline_service_create_with_no_model_to_upload.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource == make_training_pipeline_with_no_model_upload( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset_without_model_display_name_nor_model_labels( self,