Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose base_output_dir for custom job #586

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 16 additions & 1 deletion google/cloud/aiplatform/jobs.py
Expand Up @@ -59,6 +59,7 @@
study as gca_study_compat,
)


_LOGGER = base.Logger(__name__)

_JOB_COMPLETE_STATES = (
Expand Down Expand Up @@ -930,6 +931,7 @@ def __init__(
self,
display_name: str,
worker_pool_specs: Union[List[Dict], List[aiplatform.gapic.WorkerPoolSpec]],
base_output_dir: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
Expand Down Expand Up @@ -977,6 +979,9 @@ def __init__(
worker_pool_specs (Union[List[Dict], List[aiplatform.gapic.WorkerPoolSpec]]):
Required. The spec of the worker pools including machine type and Docker image.
Can provided as a list of dictionaries or list of WorkerPoolSpec proto messages.
base_output_dir (str):
Optional. GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
project (str):
Optional.Project to run the custom job in. Overrides project set in aiplatform.init.
location (str):
Expand Down Expand Up @@ -1008,12 +1013,17 @@ def __init__(
"should be set using aiplatform.init(staging_bucket='gs://my-bucket')"
)

# default directory if not given
base_output_dir = base_output_dir or utils._timestamped_gcs_dir(
staging_bucket, "aiplatform-custom-job"
)

self._gca_resource = gca_custom_job_compat.CustomJob(
display_name=display_name,
job_spec=gca_custom_job_compat.CustomJobSpec(
worker_pool_specs=worker_pool_specs,
base_output_directory=gca_io_compat.GcsDestination(
output_uri_prefix=staging_bucket
output_uri_prefix=base_output_dir
),
),
encryption_spec=initializer.global_config.get_encryption_spec(
Expand Down Expand Up @@ -1049,6 +1059,7 @@ def from_local_script(
machine_type: str = "n1-standard-4",
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
accelerator_count: int = 0,
base_output_dir: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
Expand Down Expand Up @@ -1105,6 +1116,9 @@ def from_local_script(
NVIDIA_TESLA_T4
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
base_output_dir (str):
Optional. GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
project (str):
Optional. Project to run the custom job in. Overrides project set in aiplatform.init.
location (str):
Expand Down Expand Up @@ -1170,6 +1184,7 @@ def from_local_script(
return cls(
display_name=display_name,
worker_pool_specs=worker_pool_specs,
base_output_dir=base_output_dir,
project=project,
location=location,
credentials=credentials,
Expand Down
41 changes: 35 additions & 6 deletions tests/unit/aiplatform/test_custom_job.py
Expand Up @@ -71,6 +71,7 @@
]

_TEST_STAGING_BUCKET = "gs://test-staging-bucket"
_TEST_BASE_OUTPUT_DIR = f"{_TEST_STAGING_BUCKET}/{_TEST_DISPLAY_NAME}"

# CMEK encryption
_TEST_DEFAULT_ENCRYPTION_KEY_NAME = "key_default"
Expand All @@ -91,7 +92,7 @@
job_spec=gca_custom_job_compat.CustomJobSpec(
worker_pool_specs=_TEST_WORKER_POOL_SPEC,
base_output_directory=gca_io_compat.GcsDestination(
output_uri_prefix=_TEST_STAGING_BUCKET
output_uri_prefix=_TEST_BASE_OUTPUT_DIR
),
scheduling=gca_custom_job_compat.Scheduling(
timeout=duration_pb2.Duration(seconds=_TEST_TIMEOUT),
Expand Down Expand Up @@ -224,7 +225,9 @@ def test_create_custom_job(self, create_custom_job_mock, get_custom_job_mock, sy
)

job = aiplatform.CustomJob(
display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC
display_name=_TEST_DISPLAY_NAME,
worker_pool_specs=_TEST_WORKER_POOL_SPEC,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
)

job.run(
Expand Down Expand Up @@ -265,7 +268,9 @@ def test_run_custom_job_with_fail_raises(
)

job = aiplatform.CustomJob(
display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC
display_name=_TEST_DISPLAY_NAME,
worker_pool_specs=_TEST_WORKER_POOL_SPEC,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
)

with pytest.raises(RuntimeError) as e:
Expand Down Expand Up @@ -306,7 +311,9 @@ def test_run_custom_job_with_fail_at_creation(self):
)

job = aiplatform.CustomJob(
display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC
display_name=_TEST_DISPLAY_NAME,
worker_pool_specs=_TEST_WORKER_POOL_SPEC,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
)

job.run(
Expand Down Expand Up @@ -342,7 +349,9 @@ def test_custom_job_get_state_raises_without_run(self):
)

job = aiplatform.CustomJob(
display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC
display_name=_TEST_DISPLAY_NAME,
worker_pool_specs=_TEST_WORKER_POOL_SPEC,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
)

with pytest.raises(RuntimeError):
Expand Down Expand Up @@ -385,6 +394,7 @@ def test_create_from_local_script(
display_name=_TEST_DISPLAY_NAME,
script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME,
container_uri=_TEST_TRAINING_CONTAINER_IMAGE,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
)

job.run(sync=sync)
Expand Down Expand Up @@ -428,7 +438,9 @@ def test_create_custom_job_with_tensorboard(
)

job = aiplatform.CustomJob(
display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC
display_name=_TEST_DISPLAY_NAME,
worker_pool_specs=_TEST_WORKER_POOL_SPEC,
base_output_dir=_TEST_BASE_OUTPUT_DIR,
)

job.run(
Expand All @@ -454,3 +466,20 @@ def test_create_custom_job_with_tensorboard(
assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED
)

def test_create_custom_job_without_base_output_dir(self,):

aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

job = aiplatform.CustomJob(
display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC,
)

assert job.job_spec.base_output_directory.output_uri_prefix.startswith(
f"{_TEST_STAGING_BUCKET}/aiplatform-custom-job"
)
6 changes: 6 additions & 0 deletions tests/unit/aiplatform/test_hyperparameter_tuning_job.py
Expand Up @@ -49,6 +49,7 @@
_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}"

_TEST_STAGING_BUCKET = test_custom_job._TEST_STAGING_BUCKET
_TEST_BASE_OUTPUT_DIR = test_custom_job._TEST_BASE_OUTPUT_DIR

_TEST_HYPERPARAMETERTUNING_JOB_NAME = (
f"{_TEST_PARENT}/hyperparameterTuningJobs/{_TEST_ID}"
Expand Down Expand Up @@ -260,6 +261,7 @@ def test_create_hyperparameter_tuning_job(
custom_job = aiplatform.CustomJob(
display_name=test_custom_job._TEST_DISPLAY_NAME,
worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC,
base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR,
)

job = aiplatform.HyperparameterTuningJob(
Expand Down Expand Up @@ -321,6 +323,7 @@ def test_run_hyperparameter_tuning_job_with_fail_raises(
custom_job = aiplatform.CustomJob(
display_name=test_custom_job._TEST_DISPLAY_NAME,
worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC,
base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR,
)

job = aiplatform.HyperparameterTuningJob(
Expand Down Expand Up @@ -376,6 +379,7 @@ def test_run_hyperparameter_tuning_job_with_fail_at_creation(self):
custom_job = aiplatform.CustomJob(
display_name=test_custom_job._TEST_DISPLAY_NAME,
worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC,
base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR,
)

job = aiplatform.HyperparameterTuningJob(
Expand Down Expand Up @@ -440,6 +444,7 @@ def test_hyperparameter_tuning_job_get_state_raises_without_run(self):
custom_job = aiplatform.CustomJob(
display_name=test_custom_job._TEST_DISPLAY_NAME,
worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC,
base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR,
)

job = aiplatform.HyperparameterTuningJob(
Expand Down Expand Up @@ -497,6 +502,7 @@ def test_create_hyperparameter_tuning_job_with_tensorboard(
custom_job = aiplatform.CustomJob(
display_name=test_custom_job._TEST_DISPLAY_NAME,
worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC,
base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR,
)

job = aiplatform.HyperparameterTuningJob(
Expand Down