Skip to content

Commit

Permalink
feat: enable reduction server
Browse files Browse the repository at this point in the history
  • Loading branch information
morgandu committed Sep 30, 2021
1 parent 09e48de commit da33a28
Show file tree
Hide file tree
Showing 5 changed files with 631 additions and 61 deletions.
45 changes: 30 additions & 15 deletions google/cloud/aiplatform/jobs.py
Expand Up @@ -1061,6 +1061,8 @@ def from_local_script(
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
reduction_server_replica_count: int = 0,
reduction_server_machine_type: str = "n1-highcpu-16",
base_output_dir: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
Expand Down Expand Up @@ -1127,6 +1129,10 @@ def from_local_script(
boot_disk_size_gb (int):
Optional. Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
reduction_server_replica_count (int):
The number of reduction server replicas, default is 0.
reduction_server_machine_type (str):
The type of machine to use for reduction server, default is `n1-highcpu-16`.
base_output_dir (str):
Optional. GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
Expand Down Expand Up @@ -1181,7 +1187,15 @@ def from_local_script(
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
).pool_specs
)

if reduction_server_replica_count > 0:
worker_pool_specs = worker_pool_specs.reduction_server_pool(
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
)

worker_pool_specs = worker_pool_specs.pool_specs

python_packager = source_utils._TrainingScriptPythonPackager(
script_path=script_path, requirements=requirements
Expand All @@ -1192,20 +1206,21 @@ def from_local_script(
)

for spec in worker_pool_specs:
spec["python_package_spec"] = {
"executor_image_uri": container_uri,
"python_module": python_packager.module_name,
"package_uris": [package_gcs_uri],
}

if args:
spec["python_package_spec"]["args"] = args

if environment_variables:
spec["python_package_spec"]["env"] = [
{"name": key, "value": value}
for key, value in environment_variables.items()
]
if spec and "container_spec" not in spec:
spec["python_package_spec"] = {
"executor_image_uri": container_uri,
"python_module": python_packager.module_name,
"package_uris": [package_gcs_uri],
}

if args:
spec["python_package_spec"]["args"] = args

if environment_variables:
spec["python_package_spec"]["env"] = [
{"name": key, "value": value}
for key, value in environment_variables.items()
]

return cls(
display_name=display_name,
Expand Down
111 changes: 76 additions & 35 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -1278,6 +1278,8 @@ def _prepare_and_validate_run(
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
reduction_server_replica_count: int = 0,
reduction_server_machine_type: str = "n1-highcpu-16",
) -> Tuple[worker_spec_utils._DistributedTrainingSpec, Optional[gca_model.Model]]:
"""Create worker pool specs and managed model as well validating the
run.
Expand Down Expand Up @@ -1318,6 +1320,10 @@ def _prepare_and_validate_run(
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
reduction_server_replica_count (int):
The number of reduction server replicas, default is 0.
reduction_server_machine_type (str):
The type of machine to use for reduction server, default is `n1-highcpu-16`.
Returns:
Worker pools specs and managed model for run.
Expand Down Expand Up @@ -1352,7 +1358,15 @@ def _prepare_and_validate_run(
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
).pool_specs
)

if reduction_server_replica_count > 0:
worker_pool_specs = worker_pool_specs.reduction_server_pool(
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
)

worker_pool_specs = worker_pool_specs.pool_specs

managed_model = self._managed_model
if model_display_name:
Expand Down Expand Up @@ -1736,6 +1750,8 @@ def run(
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
reduction_server_replica_count: int = 0,
reduction_server_machine_type: str = "n1-highcpu-16",
training_fraction_split: Optional[float] = None,
validation_fraction_split: Optional[float] = None,
test_fraction_split: Optional[float] = None,
Expand Down Expand Up @@ -1907,6 +1923,10 @@ def run(
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
reduction_server_replica_count (int):
The number of reduction server replicas, default is 0.
reduction_server_machine_type (str):
The type of machine to use for reduction server, default is `n1-highcpu-16`.
training_fraction_split (float):
Optional. The fraction of the input data that is to be used to train
the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -1989,6 +2009,8 @@ def run(
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
)

# make and copy package
Expand Down Expand Up @@ -2198,20 +2220,21 @@ def _run(
)

for spec in worker_pool_specs:
spec["python_package_spec"] = {
"executor_image_uri": self._container_uri,
"python_module": python_packager.module_name,
"package_uris": [package_gcs_uri],
}
if spec and "container_spec" not in spec:
spec["python_package_spec"] = {
"executor_image_uri": self._container_uri,
"python_module": python_packager.module_name,
"package_uris": [package_gcs_uri],
}

if args:
spec["python_package_spec"]["args"] = args
if args:
spec["python_package_spec"]["args"] = args

if environment_variables:
spec["python_package_spec"]["env"] = [
{"name": key, "value": value}
for key, value in environment_variables.items()
]
if environment_variables:
spec["python_package_spec"]["env"] = [
{"name": key, "value": value}
for key, value in environment_variables.items()
]

(
training_task_inputs,
Expand Down Expand Up @@ -2498,6 +2521,8 @@ def run(
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
reduction_server_replica_count: int = 0,
reduction_server_machine_type: str = "n1-highcpu-16",
training_fraction_split: Optional[float] = None,
validation_fraction_split: Optional[float] = None,
test_fraction_split: Optional[float] = None,
Expand Down Expand Up @@ -2662,6 +2687,10 @@ def run(
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
reduction_server_replica_count (int):
The number of reduction server replicas, default is 0.
reduction_server_machine_type (str):
The type of machine to use for reduction server, default is `n1-highcpu-16`.
training_fraction_split (float):
Optional. The fraction of the input data that is to be used to train
the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -2749,6 +2778,8 @@ def run(
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
)

return self._run(
Expand Down Expand Up @@ -2942,19 +2973,20 @@ def _run(
"""

for spec in worker_pool_specs:
spec["containerSpec"] = {"imageUri": self._container_uri}
if spec and "container_spec" not in spec:
spec["containerSpec"] = {"imageUri": self._container_uri}

if self._command:
spec["containerSpec"]["command"] = self._command
if self._command:
spec["containerSpec"]["command"] = self._command

if args:
spec["containerSpec"]["args"] = args
if args:
spec["containerSpec"]["args"] = args

if environment_variables:
spec["containerSpec"]["env"] = [
{"name": key, "value": value}
for key, value in environment_variables.items()
]
if environment_variables:
spec["containerSpec"]["env"] = [
{"name": key, "value": value}
for key, value in environment_variables.items()
]

(
training_task_inputs,
Expand Down Expand Up @@ -5102,6 +5134,8 @@ def run(
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
reduction_server_replica_count: int = 0,
reduction_server_machine_type: str = "n1-highcpu-16",
training_fraction_split: Optional[float] = None,
validation_fraction_split: Optional[float] = None,
test_fraction_split: Optional[float] = None,
Expand Down Expand Up @@ -5266,6 +5300,10 @@ def run(
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
reduction_server_replica_count (int):
The number of reduction server replicas, default is 0.
reduction_server_machine_type (str):
The type of machine to use for reduction server, default is `n1-highcpu-16`.
training_fraction_split (float):
Optional. The fraction of the input data that is to be used to train
the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -5348,6 +5386,8 @@ def run(
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
)

return self._run(
Expand Down Expand Up @@ -5527,20 +5567,21 @@ def _run(
produce a Vertex AI Model.
"""
for spec in worker_pool_specs:
spec["python_package_spec"] = {
"executor_image_uri": self._container_uri,
"python_module": self._python_module,
"package_uris": [self._package_gcs_uri],
}
if spec and "container_spec" not in spec:
spec["python_package_spec"] = {
"executor_image_uri": self._container_uri,
"python_module": self._python_module,
"package_uris": [self._package_gcs_uri],
}

if args:
spec["python_package_spec"]["args"] = args
if args:
spec["python_package_spec"]["args"] = args

if environment_variables:
spec["python_package_spec"]["env"] = [
{"name": key, "value": value}
for key, value in environment_variables.items()
]
if environment_variables:
spec["python_package_spec"]["env"] = [
{"name": key, "value": value}
for key, value in environment_variables.items()
]

(
training_task_inputs,
Expand Down
52 changes: 48 additions & 4 deletions google/cloud/aiplatform/utils/worker_spec_utils.py
Expand Up @@ -21,6 +21,10 @@
accelerator_type as gca_accelerator_type_compat,
)

REDUCTION_SERVER_CONTAINER_URI = (
"us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest"
)


class _WorkerPoolSpec(NamedTuple):
"""Specification container for Worker Pool specs used for distributed training.
Expand All @@ -45,6 +49,7 @@ class _WorkerPoolSpec(NamedTuple):
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED"
boot_disk_type: str = "pd-ssd"
boot_disk_size_gb: int = 100
container_uri: str = None

def _get_accelerator_type(self) -> Optional[str]:
"""Validates accelerator_type and returns the name of the accelerator.
Expand Down Expand Up @@ -86,6 +91,10 @@ def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]:
spec["machine_spec"]["accelerator_type"] = accelerator_type
spec["machine_spec"]["accelerator_count"] = self.accelerator_count

if self.container_uri == REDUCTION_SERVER_CONTAINER_URI:
spec["container_spec"] = {
"image_uri": self.container_uri,
}
return spec

@property
Expand Down Expand Up @@ -129,7 +138,7 @@ class _DistributedTrainingSpec(NamedTuple):

chief_spec: _WorkerPoolSpec = _WorkerPoolSpec()
worker_spec: _WorkerPoolSpec = _WorkerPoolSpec()
parameter_server_spec: _WorkerPoolSpec = _WorkerPoolSpec()
server_spec: _WorkerPoolSpec = _WorkerPoolSpec()
evaluator_spec: _WorkerPoolSpec = _WorkerPoolSpec()

@property
Expand All @@ -152,17 +161,52 @@ def pool_specs(
spec_order = [
self.chief_spec,
self.worker_spec,
self.parameter_server_spec,
self.server_spec,
self.evaluator_spec,
]
specs = [s.spec_dict for s in spec_order]
last_non_empty_seen = False
for i in reversed(range(len(spec_order))):
if spec_order[i].is_empty:
specs.pop()
if not last_non_empty_seen:
specs.pop()
else:
specs[i] = {}
else:
break
last_non_empty_seen = True
return specs

def reduction_server_pool(
self,
reduction_server_replica_count: int = 0,
reduction_server_machine_type: str = "n1-highcpu-16",
):
"""Parameterizes Config to support only reduction server replicas.
Args:
reduction_server_replica_count (int):
The number of reduction server replicas, default is 0.
reduction_server_machine_type (str):
The type of machine to use for reduction server, default is `n1-highcpu-16`.
Raise:
ValueError if chief worker is not set
"""
reduction_server_spec = _WorkerPoolSpec(
replica_count=reduction_server_replica_count,
machine_type=reduction_server_machine_type,
container_uri=REDUCTION_SERVER_CONTAINER_URI,
)

if self.chief_spec.is_empty:
raise ValueError("reduction server can not be set without chief worker.")

return _DistributedTrainingSpec(
chief_spec=self.chief_spec,
worker_spec=self.worker_spec,
server_spec=reduction_server_spec,
)

@classmethod
def chief_worker_pool(
cls,
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/aiplatform/test_custom_job.py
Expand Up @@ -520,6 +520,8 @@ def test_create_from_local_script_with_all_args(
accelerator_count=test_training_jobs._TEST_ACCELERATOR_COUNT,
boot_disk_type=test_training_jobs._TEST_BOOT_DISK_TYPE,
boot_disk_size_gb=test_training_jobs._TEST_BOOT_DISK_SIZE_GB,
reduction_server_replica_count=test_training_jobs._TEST_REDUCTION_SERVER_REPLICA_COUNT,
reduction_server_machine_type=test_training_jobs._TEST_REDUCTION_SERVER_MACHINE_TYPE,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
labels=_TEST_LABELS,
)
Expand Down

0 comments on commit da33a28

Please sign in to comment.