diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 6cc549027b..66b0479ced 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -59,6 +59,7 @@ study as gca_study_compat, ) + _LOGGER = base.Logger(__name__) _JOB_COMPLETE_STATES = ( @@ -930,6 +931,7 @@ def __init__( self, display_name: str, worker_pool_specs: Union[List[Dict], List[aiplatform.gapic.WorkerPoolSpec]], + base_output_dir: Optional[str] = None, project: Optional[str] = None, location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, @@ -977,6 +979,9 @@ def __init__( worker_pool_specs (Union[List[Dict], List[aiplatform.gapic.WorkerPoolSpec]]): Required. The spec of the worker pools including machine type and Docker image. Can provided as a list of dictionaries or list of WorkerPoolSpec proto messages. + base_output_dir (str): + Optional. GCS output directory of job. If not provided a + timestamped directory in the staging directory will be used. project (str): Optional.Project to run the custom job in. Overrides project set in aiplatform.init. location (str): @@ -1008,12 +1013,17 @@ def __init__( "should be set using aiplatform.init(staging_bucket='gs://my-bucket')" ) + # default directory if not given + base_output_dir = base_output_dir or utils._timestamped_gcs_dir( + staging_bucket, "aiplatform-custom-job" + ) + self._gca_resource = gca_custom_job_compat.CustomJob( display_name=display_name, job_spec=gca_custom_job_compat.CustomJobSpec( worker_pool_specs=worker_pool_specs, base_output_directory=gca_io_compat.GcsDestination( - output_uri_prefix=staging_bucket + output_uri_prefix=base_output_dir ), ), encryption_spec=initializer.global_config.get_encryption_spec( @@ -1049,6 +1059,7 @@ def from_local_script( machine_type: str = "n1-standard-4", accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", accelerator_count: int = 0, + base_output_dir: Optional[str] = None, project: Optional[str] = None, location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, @@ -1105,6 +1116,9 @@ def from_local_script( NVIDIA_TESLA_T4 accelerator_count (int): Optional. The number of accelerators to attach to a worker replica. + base_output_dir (str): + Optional. GCS output directory of job. If not provided a + timestamped directory in the staging directory will be used. project (str): Optional. Project to run the custom job in. Overrides project set in aiplatform.init. location (str): @@ -1170,6 +1184,7 @@ def from_local_script( return cls( display_name=display_name, worker_pool_specs=worker_pool_specs, + base_output_dir=base_output_dir, project=project, location=location, credentials=credentials, diff --git a/tests/unit/aiplatform/test_custom_job.py b/tests/unit/aiplatform/test_custom_job.py index de144d5241..363ad18048 100644 --- a/tests/unit/aiplatform/test_custom_job.py +++ b/tests/unit/aiplatform/test_custom_job.py @@ -71,6 +71,7 @@ ] _TEST_STAGING_BUCKET = "gs://test-staging-bucket" +_TEST_BASE_OUTPUT_DIR = f"{_TEST_STAGING_BUCKET}/{_TEST_DISPLAY_NAME}" # CMEK encryption _TEST_DEFAULT_ENCRYPTION_KEY_NAME = "key_default" @@ -91,7 +92,7 @@ job_spec=gca_custom_job_compat.CustomJobSpec( worker_pool_specs=_TEST_WORKER_POOL_SPEC, base_output_directory=gca_io_compat.GcsDestination( - output_uri_prefix=_TEST_STAGING_BUCKET + output_uri_prefix=_TEST_BASE_OUTPUT_DIR ), scheduling=gca_custom_job_compat.Scheduling( timeout=duration_pb2.Duration(seconds=_TEST_TIMEOUT), @@ -224,7 +225,9 @@ def test_create_custom_job(self, create_custom_job_mock, get_custom_job_mock, sy ) job = aiplatform.CustomJob( - display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + display_name=_TEST_DISPLAY_NAME, + worker_pool_specs=_TEST_WORKER_POOL_SPEC, + base_output_dir=_TEST_BASE_OUTPUT_DIR, ) job.run( @@ -265,7 +268,9 @@ def test_run_custom_job_with_fail_raises( ) job = aiplatform.CustomJob( - display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + display_name=_TEST_DISPLAY_NAME, + worker_pool_specs=_TEST_WORKER_POOL_SPEC, + base_output_dir=_TEST_BASE_OUTPUT_DIR, ) with pytest.raises(RuntimeError) as e: @@ -306,7 +311,9 @@ def test_run_custom_job_with_fail_at_creation(self): ) job = aiplatform.CustomJob( - display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + display_name=_TEST_DISPLAY_NAME, + worker_pool_specs=_TEST_WORKER_POOL_SPEC, + base_output_dir=_TEST_BASE_OUTPUT_DIR, ) job.run( @@ -342,7 +349,9 @@ def test_custom_job_get_state_raises_without_run(self): ) job = aiplatform.CustomJob( - display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + display_name=_TEST_DISPLAY_NAME, + worker_pool_specs=_TEST_WORKER_POOL_SPEC, + base_output_dir=_TEST_BASE_OUTPUT_DIR, ) with pytest.raises(RuntimeError): @@ -385,6 +394,7 @@ def test_create_from_local_script( display_name=_TEST_DISPLAY_NAME, script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME, container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + base_output_dir=_TEST_BASE_OUTPUT_DIR, ) job.run(sync=sync) @@ -428,7 +438,9 @@ def test_create_custom_job_with_tensorboard( ) job = aiplatform.CustomJob( - display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC + display_name=_TEST_DISPLAY_NAME, + worker_pool_specs=_TEST_WORKER_POOL_SPEC, + base_output_dir=_TEST_BASE_OUTPUT_DIR, ) job.run( @@ -454,3 +466,20 @@ def test_create_custom_job_with_tensorboard( assert ( job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED ) + + def test_create_custom_job_without_base_output_dir(self,): + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_STAGING_BUCKET, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + job = aiplatform.CustomJob( + display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC, + ) + + assert job.job_spec.base_output_directory.output_uri_prefix.startswith( + f"{_TEST_STAGING_BUCKET}/aiplatform-custom-job" + ) diff --git a/tests/unit/aiplatform/test_hyperparameter_tuning_job.py b/tests/unit/aiplatform/test_hyperparameter_tuning_job.py index e2d716e729..752d39a93c 100644 --- a/tests/unit/aiplatform/test_hyperparameter_tuning_job.py +++ b/tests/unit/aiplatform/test_hyperparameter_tuning_job.py @@ -49,6 +49,7 @@ _TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" _TEST_STAGING_BUCKET = test_custom_job._TEST_STAGING_BUCKET +_TEST_BASE_OUTPUT_DIR = test_custom_job._TEST_BASE_OUTPUT_DIR _TEST_HYPERPARAMETERTUNING_JOB_NAME = ( f"{_TEST_PARENT}/hyperparameterTuningJobs/{_TEST_ID}" @@ -260,6 +261,7 @@ def test_create_hyperparameter_tuning_job( custom_job = aiplatform.CustomJob( display_name=test_custom_job._TEST_DISPLAY_NAME, worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR, ) job = aiplatform.HyperparameterTuningJob( @@ -321,6 +323,7 @@ def test_run_hyperparameter_tuning_job_with_fail_raises( custom_job = aiplatform.CustomJob( display_name=test_custom_job._TEST_DISPLAY_NAME, worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR, ) job = aiplatform.HyperparameterTuningJob( @@ -376,6 +379,7 @@ def test_run_hyperparameter_tuning_job_with_fail_at_creation(self): custom_job = aiplatform.CustomJob( display_name=test_custom_job._TEST_DISPLAY_NAME, worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR, ) job = aiplatform.HyperparameterTuningJob( @@ -440,6 +444,7 @@ def test_hyperparameter_tuning_job_get_state_raises_without_run(self): custom_job = aiplatform.CustomJob( display_name=test_custom_job._TEST_DISPLAY_NAME, worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR, ) job = aiplatform.HyperparameterTuningJob( @@ -497,6 +502,7 @@ def test_create_hyperparameter_tuning_job_with_tensorboard( custom_job = aiplatform.CustomJob( display_name=test_custom_job._TEST_DISPLAY_NAME, worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC, + base_output_dir=test_custom_job._TEST_BASE_OUTPUT_DIR, ) job = aiplatform.HyperparameterTuningJob(