Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose boot disk type and size for CustomTrainingJob, CustomPythonPackageTrainingJob, and CustomContainerTrainingJob #602

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions google/cloud/aiplatform/jobs.py
Expand Up @@ -1070,13 +1070,15 @@ def from_local_script(
display_name: str,
script_path: str,
container_uri: str,
args: Optional[List[Union[str, float, int]]] = None,
args: Optional[Sequence[str]] = None,
requirements: Optional[Sequence[str]] = None,
environment_variables: Optional[Dict[str, str]] = None,
replica_count: int = 1,
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
base_output_dir: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
Expand Down Expand Up @@ -1110,7 +1112,7 @@ def from_local_script(
Required. Local path to training script.
container_uri (str):
Required: Uri of the training container image to use for custom job.
args (Optional[List[Union[str, float, int]]]):
args (Optional[Sequence[str]]):
Optional. Command line arguments to be passed to the Python task.
requirements (Sequence[str]):
Optional. List of python packages dependencies of script.
Expand All @@ -1136,6 +1138,13 @@ def from_local_script(
NVIDIA_TESLA_T4
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Optional. Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Optional. Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
base_output_dir (str):
Optional. GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
Expand Down Expand Up @@ -1188,6 +1197,8 @@ def from_local_script(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
).pool_specs

python_packager = source_utils._TrainingScriptPythonPackager(
Expand Down
44 changes: 44 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -1139,6 +1139,8 @@ def _prepare_and_validate_run(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
) -> Tuple[worker_spec_utils._DistributedTrainingSpec, Optional[gca_model.Model]]:
"""Create worker pool specs and managed model as well validating the
run.
Expand Down Expand Up @@ -1172,6 +1174,13 @@ def _prepare_and_validate_run(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
Returns:
Worker pools specs and managed model for run.

Expand Down Expand Up @@ -1204,6 +1213,8 @@ def _prepare_and_validate_run(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
).pool_specs

managed_model = self._managed_model
Expand Down Expand Up @@ -1588,6 +1599,8 @@ def run(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
Expand Down Expand Up @@ -1724,6 +1737,13 @@ def run(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -1774,6 +1794,8 @@ def run(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

# make and copy package
Expand Down Expand Up @@ -2241,6 +2263,8 @@ def run(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
Expand Down Expand Up @@ -2370,6 +2394,13 @@ def run(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -2425,6 +2456,8 @@ def run(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

return self._run(
Expand Down Expand Up @@ -4402,6 +4435,8 @@ def run(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
Expand Down Expand Up @@ -4531,6 +4566,13 @@ def run(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -4581,6 +4623,8 @@ def run(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

return self._run(
Expand Down
59 changes: 43 additions & 16 deletions google/cloud/aiplatform/utils/worker_spec_utils.py
Expand Up @@ -22,16 +22,19 @@
)


class _MachineSpec(NamedTuple):
"""Specification container for Machine specs used for distributed training.
class _WorkerPoolSpec(NamedTuple):
"""Specification container for Worker Pool specs used for distributed training.

Usage:

spec = _MachineSpec(
spec = _WorkerPoolSpec(
replica_count=10,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80')
accelerator_type='NVIDIA_TESLA_K80',
boot_disk_type='pd-ssd',
boot_disk_size_gb=100,
)

Note that container and python package specs are not stored with this spec.
"""
Expand All @@ -40,6 +43,8 @@ class _MachineSpec(NamedTuple):
machine_type: str = "n1-standard-4"
accelerator_count: int = 0
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED"
boot_disk_type: str = "pd-ssd"
boot_disk_size_gb: int = 100

def _get_accelerator_type(self) -> Optional[str]:
"""Validates accelerator_type and returns the name of the accelerator.
Expand Down Expand Up @@ -70,7 +75,12 @@ def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]:
spec = {
"machine_spec": {"machine_type": self.machine_type},
"replica_count": self.replica_count,
"disk_spec": {
"boot_disk_type": self.boot_disk_type,
"boot_disk_size_gb": self.boot_disk_size_gb,
},
}

accelerator_type = self._get_accelerator_type()
if accelerator_type and self.accelerator_count:
spec["machine_spec"]["accelerator_type"] = accelerator_type
Expand Down Expand Up @@ -98,25 +108,29 @@ class _DistributedTrainingSpec(NamedTuple):
Usage:

dist_training_spec = _DistributedTrainingSpec(
chief_spec = _MachineSpec(
chief_spec = _WorkerPoolSpec(
replica_count=1,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80'
),
worker_spec = _MachineSpec(
accelerator_type='NVIDIA_TESLA_K80',
boot_disk_type='pd-ssd',
boot_disk_size_gb=100,
),
worker_spec = _WorkerPoolSpec(
replica_count=10,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80'
)
accelerator_type='NVIDIA_TESLA_K80',
boot_disk_type='pd-ssd',
boot_disk_size_gb=100,
),
)
"""

chief_spec: _MachineSpec = _MachineSpec()
worker_spec: _MachineSpec = _MachineSpec()
parameter_server_spec: _MachineSpec = _MachineSpec()
evaluator_spec: _MachineSpec = _MachineSpec()
chief_spec: _WorkerPoolSpec = _WorkerPoolSpec()
worker_spec: _WorkerPoolSpec = _WorkerPoolSpec()
parameter_server_spec: _WorkerPoolSpec = _WorkerPoolSpec()
evaluator_spec: _WorkerPoolSpec = _WorkerPoolSpec()

@property
def pool_specs(
Expand Down Expand Up @@ -156,6 +170,8 @@ def chief_worker_pool(
machine_type: str = "n1-standard-4",
accelerator_count: int = 0,
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
) -> "_DistributedTrainingSpec":
"""Parameterizes Config to support only chief with worker replicas.

Expand All @@ -174,6 +190,13 @@ def chief_worker_pool(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk (default is `pd-ssd`).
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk (default is 100GB).
boot disk size must be within the range of [100, 64000].

Returns:
_DistributedTrainingSpec representing one chief and n workers all of same
Expand All @@ -182,18 +205,22 @@ def chief_worker_pool(
if replica_count <= 0:
return cls()

chief_spec = _MachineSpec(
chief_spec = _WorkerPoolSpec(
replica_count=1,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

worker_spec = _MachineSpec(
worker_spec = _WorkerPoolSpec(
replica_count=replica_count - 1,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

return cls(chief_spec=chief_spec, worker_spec=worker_spec)
43 changes: 42 additions & 1 deletion tests/unit/aiplatform/test_custom_job.py
Expand Up @@ -54,6 +54,8 @@

_TEST_TRAINING_CONTAINER_IMAGE = "gcr.io/test-training/container:image"

_TEST_RUN_ARGS = ["-v", "0.1", "--test=arg"]

_TEST_WORKER_POOL_SPEC = [
{
"machine_spec": {
Expand All @@ -62,10 +64,11 @@
"accelerator_count": 1,
},
"replica_count": 1,
"disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100},
"container_spec": {
"image_uri": _TEST_TRAINING_CONTAINER_IMAGE,
"command": [],
"args": [],
"args": _TEST_RUN_ARGS,
},
}
]
Expand Down Expand Up @@ -490,3 +493,41 @@ def test_create_custom_job_without_base_output_dir(self,):
assert job.job_spec.base_output_directory.output_uri_prefix.startswith(
f"{_TEST_STAGING_BUCKET}/aiplatform-custom-job"
)

@pytest.mark.usefixtures("mock_python_package_to_gcs")
@pytest.mark.parametrize("sync", [True, False])
def test_create_from_local_script_with_all_args(
self, get_custom_job_mock, create_custom_job_mock, sync
):
aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

# configuration on this is tested in test_training_jobs.py
job = aiplatform.CustomJob.from_local_script(
display_name=_TEST_DISPLAY_NAME,
script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME,
container_uri=_TEST_TRAINING_CONTAINER_IMAGE,
args=_TEST_RUN_ARGS,
requirements=test_training_jobs._TEST_REQUIREMENTS,
environment_variables=test_training_jobs._TEST_ENVIRONMENT_VARIABLES,
replica_count=test_training_jobs._TEST_REPLICA_COUNT,
machine_type=test_training_jobs._TEST_MACHINE_TYPE,
accelerator_type=test_training_jobs._TEST_ACCELERATOR_TYPE,
accelerator_count=test_training_jobs._TEST_ACCELERATOR_COUNT,
boot_disk_type=test_training_jobs._TEST_BOOT_DISK_TYPE,
boot_disk_size_gb=test_training_jobs._TEST_BOOT_DISK_SIZE_GB,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
labels=_TEST_LABELS,
)

job.run(sync=sync)

job.wait()

assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED
)
8 changes: 8 additions & 0 deletions tests/unit/aiplatform/test_end_to_end.py
Expand Up @@ -211,6 +211,10 @@ def test_dataset_create_to_model_predict(
"accelerator_type": test_training_jobs._TEST_ACCELERATOR_TYPE,
"accelerator_count": test_training_jobs._TEST_ACCELERATOR_COUNT,
},
"disk_spec": {
"boot_disk_type": test_training_jobs._TEST_BOOT_DISK_TYPE_DEFAULT,
"boot_disk_size_gb": test_training_jobs._TEST_BOOT_DISK_SIZE_GB_DEFAULT,
},
"python_package_spec": {
"executor_image_uri": test_training_jobs._TEST_TRAINING_CONTAINER_IMAGE,
"python_module": source_utils._TrainingScriptPythonPackager.module_name,
Expand Down Expand Up @@ -394,6 +398,10 @@ def test_dataset_create_to_model_predict_with_pipeline_fail(
"accelerator_type": test_training_jobs._TEST_ACCELERATOR_TYPE,
"accelerator_count": test_training_jobs._TEST_ACCELERATOR_COUNT,
},
"disk_spec": {
"boot_disk_type": test_training_jobs._TEST_BOOT_DISK_TYPE_DEFAULT,
"boot_disk_size_gb": test_training_jobs._TEST_BOOT_DISK_SIZE_GB_DEFAULT,
},
"python_package_spec": {
"executor_image_uri": test_training_jobs._TEST_TRAINING_CONTAINER_IMAGE,
"python_module": source_utils._TrainingScriptPythonPackager.module_name,
Expand Down