Skip to content

Commit

Permalink
feat: expose boot disk type and size for CustomTrainingJob, CustomPyt…
Browse files Browse the repository at this point in the history
…honPackageTrainingJob, and CustomContainerTrainingJob (#602)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-aiplatform/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes #<b/193822723> 🦕


- Change `_MachineSpec` to `_WorkerPoolSpec`
    - Add `boot_disk_type` and `boot_disk_size_gb` in `_MachineSpec` and `_DistributedTrainingSpec`
- Expose `boot_disk_type` and `boot_disk_size_gb` in `run` with default values of `CustomTrainingJob`, `CustomPythonPackageTrainingJob`, and `CustomContainerTrainingJob`
    - Add in `boot_disk_type` and `boot_disk_size_gb` in `_CustomTrainingJob._prepare_and_validate_run`
- Expose `boot_disk_type` and `boot_disk_size_gb` in `CustomJob.from_local_script`
    - Update TypeHint for command line arguments `args` to be passed to the Python task
- Modify unit tests to for default and overwrite boot disk config
  • Loading branch information
morgandu committed Aug 12, 2021
1 parent b42bf0d commit 355ea24
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 35 deletions.
15 changes: 13 additions & 2 deletions google/cloud/aiplatform/jobs.py
Expand Up @@ -1070,13 +1070,15 @@ def from_local_script(
display_name: str,
script_path: str,
container_uri: str,
args: Optional[List[Union[str, float, int]]] = None,
args: Optional[Sequence[str]] = None,
requirements: Optional[Sequence[str]] = None,
environment_variables: Optional[Dict[str, str]] = None,
replica_count: int = 1,
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
base_output_dir: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
Expand Down Expand Up @@ -1110,7 +1112,7 @@ def from_local_script(
Required. Local path to training script.
container_uri (str):
Required: Uri of the training container image to use for custom job.
args (Optional[List[Union[str, float, int]]]):
args (Optional[Sequence[str]]):
Optional. Command line arguments to be passed to the Python task.
requirements (Sequence[str]):
Optional. List of python packages dependencies of script.
Expand All @@ -1136,6 +1138,13 @@ def from_local_script(
NVIDIA_TESLA_T4
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Optional. Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Optional. Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
base_output_dir (str):
Optional. GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
Expand Down Expand Up @@ -1188,6 +1197,8 @@ def from_local_script(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
).pool_specs

python_packager = source_utils._TrainingScriptPythonPackager(
Expand Down
44 changes: 44 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -1139,6 +1139,8 @@ def _prepare_and_validate_run(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
) -> Tuple[worker_spec_utils._DistributedTrainingSpec, Optional[gca_model.Model]]:
"""Create worker pool specs and managed model as well validating the
run.
Expand Down Expand Up @@ -1172,6 +1174,13 @@ def _prepare_and_validate_run(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
Returns:
Worker pools specs and managed model for run.
Expand Down Expand Up @@ -1204,6 +1213,8 @@ def _prepare_and_validate_run(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
).pool_specs

managed_model = self._managed_model
Expand Down Expand Up @@ -1588,6 +1599,8 @@ def run(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
Expand Down Expand Up @@ -1724,6 +1737,13 @@ def run(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -1774,6 +1794,8 @@ def run(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

# make and copy package
Expand Down Expand Up @@ -2241,6 +2263,8 @@ def run(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
Expand Down Expand Up @@ -2370,6 +2394,13 @@ def run(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -2425,6 +2456,8 @@ def run(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

return self._run(
Expand Down Expand Up @@ -4402,6 +4435,8 @@ def run(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
Expand Down Expand Up @@ -4531,6 +4566,13 @@ def run(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model. This is ignored if Dataset is not provided.
Expand Down Expand Up @@ -4581,6 +4623,8 @@ def run(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

return self._run(
Expand Down
59 changes: 43 additions & 16 deletions google/cloud/aiplatform/utils/worker_spec_utils.py
Expand Up @@ -22,16 +22,19 @@
)


class _MachineSpec(NamedTuple):
"""Specification container for Machine specs used for distributed training.
class _WorkerPoolSpec(NamedTuple):
"""Specification container for Worker Pool specs used for distributed training.
Usage:
spec = _MachineSpec(
spec = _WorkerPoolSpec(
replica_count=10,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80')
accelerator_type='NVIDIA_TESLA_K80',
boot_disk_type='pd-ssd',
boot_disk_size_gb=100,
)
Note that container and python package specs are not stored with this spec.
"""
Expand All @@ -40,6 +43,8 @@ class _MachineSpec(NamedTuple):
machine_type: str = "n1-standard-4"
accelerator_count: int = 0
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED"
boot_disk_type: str = "pd-ssd"
boot_disk_size_gb: int = 100

def _get_accelerator_type(self) -> Optional[str]:
"""Validates accelerator_type and returns the name of the accelerator.
Expand Down Expand Up @@ -70,7 +75,12 @@ def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]:
spec = {
"machine_spec": {"machine_type": self.machine_type},
"replica_count": self.replica_count,
"disk_spec": {
"boot_disk_type": self.boot_disk_type,
"boot_disk_size_gb": self.boot_disk_size_gb,
},
}

accelerator_type = self._get_accelerator_type()
if accelerator_type and self.accelerator_count:
spec["machine_spec"]["accelerator_type"] = accelerator_type
Expand Down Expand Up @@ -98,25 +108,29 @@ class _DistributedTrainingSpec(NamedTuple):
Usage:
dist_training_spec = _DistributedTrainingSpec(
chief_spec = _MachineSpec(
chief_spec = _WorkerPoolSpec(
replica_count=1,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80'
),
worker_spec = _MachineSpec(
accelerator_type='NVIDIA_TESLA_K80',
boot_disk_type='pd-ssd',
boot_disk_size_gb=100,
),
worker_spec = _WorkerPoolSpec(
replica_count=10,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80'
)
accelerator_type='NVIDIA_TESLA_K80',
boot_disk_type='pd-ssd',
boot_disk_size_gb=100,
),
)
"""

chief_spec: _MachineSpec = _MachineSpec()
worker_spec: _MachineSpec = _MachineSpec()
parameter_server_spec: _MachineSpec = _MachineSpec()
evaluator_spec: _MachineSpec = _MachineSpec()
chief_spec: _WorkerPoolSpec = _WorkerPoolSpec()
worker_spec: _WorkerPoolSpec = _WorkerPoolSpec()
parameter_server_spec: _WorkerPoolSpec = _WorkerPoolSpec()
evaluator_spec: _WorkerPoolSpec = _WorkerPoolSpec()

@property
def pool_specs(
Expand Down Expand Up @@ -156,6 +170,8 @@ def chief_worker_pool(
machine_type: str = "n1-standard-4",
accelerator_count: int = 0,
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
) -> "_DistributedTrainingSpec":
"""Parameterizes Config to support only chief with worker replicas.
Expand All @@ -174,6 +190,13 @@ def chief_worker_pool(
NVIDIA_TESLA_T4
accelerator_count (int):
The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Type of the boot disk (default is `pd-ssd`).
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Size in GB of the boot disk (default is 100GB).
boot disk size must be within the range of [100, 64000].
Returns:
_DistributedTrainingSpec representing one chief and n workers all of same
Expand All @@ -182,18 +205,22 @@ def chief_worker_pool(
if replica_count <= 0:
return cls()

chief_spec = _MachineSpec(
chief_spec = _WorkerPoolSpec(
replica_count=1,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

worker_spec = _MachineSpec(
worker_spec = _WorkerPoolSpec(
replica_count=replica_count - 1,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
)

return cls(chief_spec=chief_spec, worker_spec=worker_spec)
43 changes: 42 additions & 1 deletion tests/unit/aiplatform/test_custom_job.py
Expand Up @@ -54,6 +54,8 @@

_TEST_TRAINING_CONTAINER_IMAGE = "gcr.io/test-training/container:image"

_TEST_RUN_ARGS = ["-v", "0.1", "--test=arg"]

_TEST_WORKER_POOL_SPEC = [
{
"machine_spec": {
Expand All @@ -62,10 +64,11 @@
"accelerator_count": 1,
},
"replica_count": 1,
"disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100},
"container_spec": {
"image_uri": _TEST_TRAINING_CONTAINER_IMAGE,
"command": [],
"args": [],
"args": _TEST_RUN_ARGS,
},
}
]
Expand Down Expand Up @@ -490,3 +493,41 @@ def test_create_custom_job_without_base_output_dir(self,):
assert job.job_spec.base_output_directory.output_uri_prefix.startswith(
f"{_TEST_STAGING_BUCKET}/aiplatform-custom-job"
)

@pytest.mark.usefixtures("mock_python_package_to_gcs")
@pytest.mark.parametrize("sync", [True, False])
def test_create_from_local_script_with_all_args(
self, get_custom_job_mock, create_custom_job_mock, sync
):
aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

# configuration on this is tested in test_training_jobs.py
job = aiplatform.CustomJob.from_local_script(
display_name=_TEST_DISPLAY_NAME,
script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME,
container_uri=_TEST_TRAINING_CONTAINER_IMAGE,
args=_TEST_RUN_ARGS,
requirements=test_training_jobs._TEST_REQUIREMENTS,
environment_variables=test_training_jobs._TEST_ENVIRONMENT_VARIABLES,
replica_count=test_training_jobs._TEST_REPLICA_COUNT,
machine_type=test_training_jobs._TEST_MACHINE_TYPE,
accelerator_type=test_training_jobs._TEST_ACCELERATOR_TYPE,
accelerator_count=test_training_jobs._TEST_ACCELERATOR_COUNT,
boot_disk_type=test_training_jobs._TEST_BOOT_DISK_TYPE,
boot_disk_size_gb=test_training_jobs._TEST_BOOT_DISK_SIZE_GB,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
labels=_TEST_LABELS,
)

job.run(sync=sync)

job.wait()

assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED
)
8 changes: 8 additions & 0 deletions tests/unit/aiplatform/test_end_to_end.py
Expand Up @@ -211,6 +211,10 @@ def test_dataset_create_to_model_predict(
"accelerator_type": test_training_jobs._TEST_ACCELERATOR_TYPE,
"accelerator_count": test_training_jobs._TEST_ACCELERATOR_COUNT,
},
"disk_spec": {
"boot_disk_type": test_training_jobs._TEST_BOOT_DISK_TYPE_DEFAULT,
"boot_disk_size_gb": test_training_jobs._TEST_BOOT_DISK_SIZE_GB_DEFAULT,
},
"python_package_spec": {
"executor_image_uri": test_training_jobs._TEST_TRAINING_CONTAINER_IMAGE,
"python_module": source_utils._TrainingScriptPythonPackager.module_name,
Expand Down Expand Up @@ -394,6 +398,10 @@ def test_dataset_create_to_model_predict_with_pipeline_fail(
"accelerator_type": test_training_jobs._TEST_ACCELERATOR_TYPE,
"accelerator_count": test_training_jobs._TEST_ACCELERATOR_COUNT,
},
"disk_spec": {
"boot_disk_type": test_training_jobs._TEST_BOOT_DISK_TYPE_DEFAULT,
"boot_disk_size_gb": test_training_jobs._TEST_BOOT_DISK_SIZE_GB_DEFAULT,
},
"python_package_spec": {
"executor_image_uri": test_training_jobs._TEST_TRAINING_CONTAINER_IMAGE,
"python_module": source_utils._TrainingScriptPythonPackager.module_name,
Expand Down

0 comments on commit 355ea24

Please sign in to comment.