Skip to content

Commit

Permalink
feat: enable boot disk for CustomJob.from_local_script
Browse files Browse the repository at this point in the history
fix: typehint for command line arguments to be passed to the Python task
fix: change _MachineSpec to _WorkerPoolSpec
  • Loading branch information
morgandu committed Aug 11, 2021
1 parent d5aaa2e commit eb99bd0
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 30 deletions.
15 changes: 13 additions & 2 deletions google/cloud/aiplatform/jobs.py
Expand Up @@ -1070,13 +1070,15 @@ def from_local_script(
display_name: str,
script_path: str,
container_uri: str,
args: Optional[List[Union[str, float, int]]] = None,
args: Optional[Sequence[str]] = None,
requirements: Optional[Sequence[str]] = None,
environment_variables: Optional[Dict[str, str]] = None,
replica_count: int = 1,
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
boot_disk_type: str = "pd-ssd",
boot_disk_size_gb: int = 100,
base_output_dir: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
Expand Down Expand Up @@ -1110,7 +1112,7 @@ def from_local_script(
Required. Local path to training script.
container_uri (str):
Required: Uri of the training container image to use for custom job.
args (Optional[List[Union[str, float, int]]]):
args (Optional[Sequence[str]]):
Optional. Command line arguments to be passed to the Python task.
requirements (Sequence[str]):
Optional. List of python packages dependencies of script.
Expand All @@ -1136,6 +1138,13 @@ def from_local_script(
NVIDIA_TESLA_T4
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
boot_disk_type (str):
Optional. Type of the boot disk, default is `pd-ssd`.
Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or
`pd-standard` (Persistent Disk Hard Disk Drive).
boot_disk_size_gb (int):
Optional. Size in GB of the boot disk, default is 100GB.
boot disk size must be within the range of [100, 64000].
base_output_dir (str):
Optional. GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
Expand Down Expand Up @@ -1188,6 +1197,8 @@ def from_local_script(
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
).pool_specs

python_packager = source_utils._TrainingScriptPythonPackager(
Expand Down
22 changes: 11 additions & 11 deletions google/cloud/aiplatform/utils/worker_spec_utils.py
Expand Up @@ -22,12 +22,12 @@
)


class _MachineSpec(NamedTuple):
"""Specification container for Machine specs used for distributed training.
class _WorkerPoolSpec(NamedTuple):
"""Specification container for Worker Pool specs used for distributed training.
Usage:
spec = _MachineSpec(
spec = _WorkerPoolSpec(
replica_count=10,
machine_type='n1-standard-4',
accelerator_count=2,
Expand Down Expand Up @@ -108,15 +108,15 @@ class _DistributedTrainingSpec(NamedTuple):
Usage:
dist_training_spec = _DistributedTrainingSpec(
chief_spec = _MachineSpec(
chief_spec = _WorkerPoolSpec(
replica_count=1,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80',
boot_disk_type='pd-ssd',
boot_disk_size_gb=100,
),
worker_spec = _MachineSpec(
worker_spec = _WorkerPoolSpec(
replica_count=10,
machine_type='n1-standard-4',
accelerator_count=2,
Expand All @@ -127,10 +127,10 @@ class _DistributedTrainingSpec(NamedTuple):
)
"""

chief_spec: _MachineSpec = _MachineSpec()
worker_spec: _MachineSpec = _MachineSpec()
parameter_server_spec: _MachineSpec = _MachineSpec()
evaluator_spec: _MachineSpec = _MachineSpec()
chief_spec: _WorkerPoolSpec = _WorkerPoolSpec()
worker_spec: _WorkerPoolSpec = _WorkerPoolSpec()
parameter_server_spec: _WorkerPoolSpec = _WorkerPoolSpec()
evaluator_spec: _WorkerPoolSpec = _WorkerPoolSpec()

@property
def pool_specs(
Expand Down Expand Up @@ -205,7 +205,7 @@ def chief_worker_pool(
if replica_count <= 0:
return cls()

chief_spec = _MachineSpec(
chief_spec = _WorkerPoolSpec(
replica_count=1,
machine_type=machine_type,
accelerator_count=accelerator_count,
Expand All @@ -214,7 +214,7 @@ def chief_worker_pool(
boot_disk_size_gb=boot_disk_size_gb,
)

worker_spec = _MachineSpec(
worker_spec = _WorkerPoolSpec(
replica_count=replica_count - 1,
machine_type=machine_type,
accelerator_count=accelerator_count,
Expand Down
43 changes: 42 additions & 1 deletion tests/unit/aiplatform/test_custom_job.py
Expand Up @@ -54,6 +54,8 @@

_TEST_TRAINING_CONTAINER_IMAGE = "gcr.io/test-training/container:image"

_TEST_RUN_ARGS = ["-v", "0.1", "--test=arg"]

_TEST_WORKER_POOL_SPEC = [
{
"machine_spec": {
Expand All @@ -62,10 +64,11 @@
"accelerator_count": 1,
},
"replica_count": 1,
"disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100},
"container_spec": {
"image_uri": _TEST_TRAINING_CONTAINER_IMAGE,
"command": [],
"args": [],
"args": _TEST_RUN_ARGS,
},
}
]
Expand Down Expand Up @@ -490,3 +493,41 @@ def test_create_custom_job_without_base_output_dir(self,):
assert job.job_spec.base_output_directory.output_uri_prefix.startswith(
f"{_TEST_STAGING_BUCKET}/aiplatform-custom-job"
)

@pytest.mark.usefixtures("mock_python_package_to_gcs")
@pytest.mark.parametrize("sync", [True, False])
def test_create_from_local_script_with_all_args(
self, get_custom_job_mock, create_custom_job_mock, sync
):
aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

# configuration on this is tested in test_training_jobs.py
job = aiplatform.CustomJob.from_local_script(
display_name=_TEST_DISPLAY_NAME,
script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME,
container_uri=_TEST_TRAINING_CONTAINER_IMAGE,
args=_TEST_RUN_ARGS,
requirements=test_training_jobs._TEST_REQUIREMENTS,
environment_variables=test_training_jobs._TEST_ENVIRONMENT_VARIABLES,
replica_count=test_training_jobs._TEST_REPLICA_COUNT,
machine_type=test_training_jobs._TEST_MACHINE_TYPE,
accelerator_type=test_training_jobs._TEST_ACCELERATOR_TYPE,
accelerator_count=test_training_jobs._TEST_ACCELERATOR_COUNT,
boot_disk_type=test_training_jobs._TEST_BOOT_DISK_TYPE,
boot_disk_size_gb=test_training_jobs._TEST_BOOT_DISK_SIZE_GB,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
labels=_TEST_LABELS,
)

job.run(sync=sync)

job.wait()

assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED
)
32 changes: 16 additions & 16 deletions tests/unit/aiplatform/test_training_jobs.py
Expand Up @@ -2882,9 +2882,9 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset_raises_if_anno
)


class Test_MachineSpec:
class Test_WorkerPoolSpec:
def test_machine_spec_return_spec_dict(self):
test_spec = worker_spec_utils._MachineSpec(
test_spec = worker_spec_utils._WorkerPoolSpec(
replica_count=_TEST_REPLICA_COUNT,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
Expand All @@ -2907,7 +2907,7 @@ def test_machine_spec_return_spec_dict(self):
assert test_spec.spec_dict == true_spec_dict

def test_machine_spec_return_spec_with_boot_disk_dict(self):
test_spec = worker_spec_utils._MachineSpec(
test_spec = worker_spec_utils._WorkerPoolSpec(
replica_count=_TEST_REPLICA_COUNT,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
Expand All @@ -2932,7 +2932,7 @@ def test_machine_spec_return_spec_with_boot_disk_dict(self):
assert test_spec.spec_dict == true_spec_dict

def test_machine_spec_return_spec_dict_with_no_accelerator(self):
test_spec = worker_spec_utils._MachineSpec(
test_spec = worker_spec_utils._WorkerPoolSpec(
replica_count=_TEST_REPLICA_COUNT,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=0,
Expand All @@ -2951,7 +2951,7 @@ def test_machine_spec_return_spec_dict_with_no_accelerator(self):
assert test_spec.spec_dict == true_spec_dict

def test_machine_spec_spec_dict_raises_invalid_accelerator(self):
test_spec = worker_spec_utils._MachineSpec(
test_spec = worker_spec_utils._WorkerPoolSpec(
replica_count=_TEST_REPLICA_COUNT,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
Expand All @@ -2962,7 +2962,7 @@ def test_machine_spec_spec_dict_raises_invalid_accelerator(self):
test_spec.spec_dict

def test_machine_spec_spec_dict_is_empty(self):
test_spec = worker_spec_utils._MachineSpec(
test_spec = worker_spec_utils._WorkerPoolSpec(
replica_count=0,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
Expand All @@ -2972,7 +2972,7 @@ def test_machine_spec_spec_dict_is_empty(self):
assert test_spec.is_empty

def test_machine_spec_spec_dict_is_not_empty(self):
test_spec = worker_spec_utils._MachineSpec(
test_spec = worker_spec_utils._WorkerPoolSpec(
replica_count=_TEST_REPLICA_COUNT,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
Expand All @@ -2986,25 +2986,25 @@ class Test_DistributedTrainingSpec:
def test_machine_spec_returns_pool_spec(self):

spec = worker_spec_utils._DistributedTrainingSpec(
chief_spec=worker_spec_utils._MachineSpec(
chief_spec=worker_spec_utils._WorkerPoolSpec(
replica_count=1,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
accelerator_type=_TEST_ACCELERATOR_TYPE,
),
worker_spec=worker_spec_utils._MachineSpec(
worker_spec=worker_spec_utils._WorkerPoolSpec(
replica_count=10,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
accelerator_type=_TEST_ACCELERATOR_TYPE,
),
parameter_server_spec=worker_spec_utils._MachineSpec(
parameter_server_spec=worker_spec_utils._WorkerPoolSpec(
replica_count=3,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
accelerator_type=_TEST_ACCELERATOR_TYPE,
),
evaluator_spec=worker_spec_utils._MachineSpec(
evaluator_spec=worker_spec_utils._WorkerPoolSpec(
replica_count=1,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
Expand Down Expand Up @@ -3132,7 +3132,7 @@ def test_chief_worker_pool_returns_just_chief(self):
def test_machine_spec_raise_with_more_than_one_chief_replica(self):

spec = worker_spec_utils._DistributedTrainingSpec(
chief_spec=worker_spec_utils._MachineSpec(
chief_spec=worker_spec_utils._WorkerPoolSpec(
replica_count=2,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
Expand All @@ -3146,20 +3146,20 @@ def test_machine_spec_raise_with_more_than_one_chief_replica(self):
def test_machine_spec_handles_missing_pools(self):

spec = worker_spec_utils._DistributedTrainingSpec(
chief_spec=worker_spec_utils._MachineSpec(
chief_spec=worker_spec_utils._WorkerPoolSpec(
replica_count=1,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
accelerator_type=_TEST_ACCELERATOR_TYPE,
),
worker_spec=worker_spec_utils._MachineSpec(replica_count=0),
parameter_server_spec=worker_spec_utils._MachineSpec(
worker_spec=worker_spec_utils._WorkerPoolSpec(replica_count=0),
parameter_server_spec=worker_spec_utils._WorkerPoolSpec(
replica_count=3,
machine_type=_TEST_MACHINE_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
accelerator_type=_TEST_ACCELERATOR_TYPE,
),
evaluator_spec=worker_spec_utils._MachineSpec(replica_count=0),
evaluator_spec=worker_spec_utils._WorkerPoolSpec(replica_count=0),
)

true_pool_spec = [
Expand Down

0 comments on commit eb99bd0

Please sign in to comment.