From 7ae28b84b0b8dd7068f8c0d0303776098816fab0 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Mon, 10 May 2021 07:31:39 -0700 Subject: [PATCH] feat: expose env var in cust training class run func args (#366) --- google/cloud/aiplatform/training_jobs.py | 120 ++++++++++++++++++++ tests/unit/aiplatform/test_training_jobs.py | 22 ++++ 2 files changed, 142 insertions(+) diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index ccd0ca7be6..f80174efdc 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -1805,6 +1805,7 @@ def run( service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, replica_count: int = 0, machine_type: str = "n1-standard-4", accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", @@ -1880,6 +1881,13 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -1900,6 +1908,16 @@ def run( - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } replica_count (int): The number of worker replicas. If replica count = 1 then one chief replica will be provisioned. If replica_count > 1 the remainder will be @@ -1960,6 +1978,7 @@ def run( worker_pool_specs=worker_pool_specs, managed_model=managed_model, args=args, + environment_variables=environment_variables, base_output_dir=base_output_dir, service_account=service_account, bigquery_destination=bigquery_destination, @@ -1986,6 +2005,7 @@ def _run( worker_pool_specs: _DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, base_output_dir: Optional[str] = None, service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, @@ -2018,9 +2038,26 @@ def _run( Model proto if this script produces a Managed Model. args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -2083,6 +2120,9 @@ def _run( if args: spec["pythonPackageSpec"]["args"] = args + if environment_variables: + spec["pythonPackageSpec"]["env"] = environment_variables + ( training_task_inputs, base_output_dir, @@ -2334,6 +2374,7 @@ def run( service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, replica_count: int = 0, machine_type: str = "n1-standard-4", accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", @@ -2402,6 +2443,13 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -2422,6 +2470,16 @@ def run( - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } replica_count (int): The number of worker replicas. If replica count = 1 then one chief replica will be provisioned. If replica_count > 1 the remainder will be @@ -2481,6 +2539,7 @@ def run( worker_pool_specs=worker_pool_specs, managed_model=managed_model, args=args, + environment_variables=environment_variables, base_output_dir=base_output_dir, service_account=service_account, bigquery_destination=bigquery_destination, @@ -2506,6 +2565,7 @@ def _run( worker_pool_specs: _DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, base_output_dir: Optional[str] = None, service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, @@ -2535,9 +2595,26 @@ def _run( Model proto if this script produces a Managed Model. args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -2593,6 +2670,9 @@ def _run( if args: spec["containerSpec"]["args"] = args + if environment_variables: + spec["containerSpec"]["env"] = environment_variables + ( training_task_inputs, base_output_dir, @@ -3625,6 +3705,7 @@ def run( service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, replica_count: int = 0, machine_type: str = "n1-standard-4", accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", @@ -3693,6 +3774,13 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -3713,6 +3801,16 @@ def run( - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } replica_count (int): The number of worker replicas. If replica count = 1 then one chief replica will be provisioned. If replica_count > 1 the remainder will be @@ -3767,6 +3865,7 @@ def run( worker_pool_specs=worker_pool_specs, managed_model=managed_model, args=args, + environment_variables=environment_variables, base_output_dir=base_output_dir, service_account=service_account, training_fraction_split=training_fraction_split, @@ -3792,6 +3891,7 @@ def _run( worker_pool_specs: _DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, base_output_dir: Optional[str] = None, service_account: Optional[str] = None, training_fraction_split: float = 0.8, @@ -3822,9 +3922,26 @@ def _run( Model proto if this script produces a Managed Model. args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -3866,6 +3983,9 @@ def _run( if args: spec["pythonPackageSpec"]["args"] = args + if environment_variables: + spec["pythonPackageSpec"]["env"] = environment_variables + ( training_task_inputs, base_output_dir, diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 1a61469444..44e662a36e 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -119,6 +119,9 @@ "learning_rate": 0.01, "loss_fn": "mse", } +_TEST_ENVIRONMENT_VARIABLES = { + "MY_PATH": "/path/to/my_path", +} _TEST_MODEL_SERVING_CONTAINER_PORTS = [8888, 10000] _TEST_MODEL_DESCRIPTION = "test description" @@ -596,6 +599,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( base_output_dir=_TEST_BASE_OUTPUT_DIR, service_account=_TEST_SERVICE_ACCOUNT, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -618,6 +622,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( ) true_args = _TEST_RUN_ARGS + true_env = _TEST_ENVIRONMENT_VARIABLES true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -631,6 +636,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, } @@ -754,6 +760,7 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( base_output_dir=_TEST_BASE_OUTPUT_DIR, bigquery_destination=_TEST_BIGQUERY_DESTINATION, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -770,6 +777,7 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( model_from_job.wait() true_args = _TEST_RUN_ARGS + true_env = _TEST_ENVIRONMENT_VARIABLES true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -783,6 +791,7 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, } @@ -1018,6 +1027,7 @@ def test_run_call_pipeline_service_create_with_no_dataset( model_from_job = job.run( base_output_dir=_TEST_BASE_OUTPUT_DIR, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -1039,6 +1049,7 @@ def test_run_call_pipeline_service_create_with_no_dataset( ) true_args = _TEST_RUN_ARGS + true_env = _TEST_ENVIRONMENT_VARIABLES true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -1052,6 +1063,7 @@ def test_run_call_pipeline_service_create_with_no_dataset( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, } @@ -1263,6 +1275,7 @@ def test_run_call_pipeline_service_create_distributed_training( dataset=mock_tabular_dataset, base_output_dir=_TEST_BASE_OUTPUT_DIR, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=10, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -1284,6 +1297,7 @@ def test_run_call_pipeline_service_create_distributed_training( ) true_args = _TEST_RUN_ARGS + true_env = _TEST_ENVIRONMENT_VARIABLES true_worker_pool_spec = [ { @@ -1298,6 +1312,7 @@ def test_run_call_pipeline_service_create_distributed_training( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, }, { @@ -1312,6 +1327,7 @@ def test_run_call_pipeline_service_create_distributed_training( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, }, ] @@ -1730,6 +1746,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( dataset=mock_tabular_dataset, base_output_dir=_TEST_BASE_OUTPUT_DIR, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -1746,6 +1763,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( model_from_job.wait() true_args = _TEST_RUN_ARGS + true_env = _TEST_ENVIRONMENT_VARIABLES true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -1758,6 +1776,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, "command": _TEST_TRAINING_CONTAINER_CMD, "args": true_args, + "env": true_env, }, } @@ -2937,6 +2956,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( base_output_dir=_TEST_BASE_OUTPUT_DIR, service_account=_TEST_SERVICE_ACCOUNT, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -2952,6 +2972,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( model_from_job.wait() true_args = _TEST_RUN_ARGS + true_env = _TEST_ENVIRONMENT_VARIABLES true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -2965,6 +2986,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( "pythonModule": _TEST_PYTHON_MODULE_NAME, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, }