From b4b1b12d735e7c40717bd9ff8f8fd330d5e83738 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Tue, 20 Apr 2021 18:08:53 -0500 Subject: [PATCH] feat: Add service account support to Custom Training and Model deployment (#342) --- google/cloud/aiplatform/models.py | 65 +++++++++++++++++---- google/cloud/aiplatform/training_jobs.py | 40 ++++++++++++- tests/unit/aiplatform/test_endpoints.py | 3 + tests/unit/aiplatform/test_models.py | 3 + tests/unit/aiplatform/test_training_jobs.py | 7 +++ 5 files changed, 103 insertions(+), 15 deletions(-) diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index d96b681695..9aabc3ff17 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -477,6 +477,7 @@ def deploy( max_replica_count: int = 1, accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, + service_account: Optional[str] = None, explanation_metadata: Optional[explain.ExplanationMetadata] = None, explanation_parameters: Optional[explain.ExplanationParameters] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), @@ -531,6 +532,13 @@ def deploy( NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3 accelerator_count (int): Optional. The number of accelerators to attach to a worker replica. + service_account (str): + The service account that the DeployedModel's container runs as. Specify the + email address of the service account. If this service account is not + specified, the container runs as a service account that doesn't have access + to the resource project. + Users deploying the Model must have the `iam.serviceAccounts.actAs` + permission on this service account. explanation_metadata (explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be @@ -569,6 +577,7 @@ def deploy( max_replica_count=max_replica_count, accelerator_type=accelerator_type, accelerator_count=accelerator_count, + service_account=service_account, explanation_metadata=explanation_metadata, explanation_parameters=explanation_parameters, metadata=metadata, @@ -587,6 +596,7 @@ def _deploy( max_replica_count: Optional[int] = 1, accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, + service_account: Optional[str] = None, explanation_metadata: Optional[explain.ExplanationMetadata] = None, explanation_parameters: Optional[explain.ExplanationParameters] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), @@ -641,6 +651,13 @@ def _deploy( NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3 accelerator_count (int): Optional. The number of accelerators to attach to a worker replica. + service_account (str): + The service account that the DeployedModel's container runs as. Specify the + email address of the service account. If this service account is not + specified, the container runs as a service account that doesn't have access + to the resource project. + Users deploying the Model must have the `iam.serviceAccounts.actAs` + permission on this service account. explanation_metadata (explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be @@ -677,6 +694,7 @@ def _deploy( max_replica_count=max_replica_count, accelerator_type=accelerator_type, accelerator_count=accelerator_count, + service_account=service_account, explanation_metadata=explanation_metadata, explanation_parameters=explanation_parameters, metadata=metadata, @@ -701,6 +719,7 @@ def _deploy_call( max_replica_count: Optional[int] = 1, accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, + service_account: Optional[str] = None, explanation_metadata: Optional[explain.ExplanationMetadata] = None, explanation_parameters: Optional[explain.ExplanationParameters] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), @@ -753,6 +772,13 @@ def _deploy_call( is not provided, the larger value of min_replica_count or 1 will be used. If value provided is smaller than min_replica_count, it will automatically be increased to be min_replica_count. + service_account (str): + The service account that the DeployedModel's container runs as. Specify the + email address of the service account. If this service account is not + specified, the container runs as a service account that doesn't have access + to the resource project. + Users deploying the Model must have the `iam.serviceAccounts.actAs` + permission on this service account. explanation_metadata (explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be @@ -788,6 +814,12 @@ def _deploy_call( gca_endpoint = gca_endpoint_v1beta1 gca_machine_resources = gca_machine_resources_v1beta1 + deployed_model = gca_endpoint.DeployedModel( + model=model_resource_name, + display_name=deployed_model_display_name, + service_account=service_account, + ) + if machine_type: machine_spec = gca_machine_resources.MachineSpec(machine_type=machine_type) @@ -796,26 +828,17 @@ def _deploy_call( machine_spec.accelerator_type = accelerator_type machine_spec.accelerator_count = accelerator_count - dedicated_resources = gca_machine_resources.DedicatedResources( + deployed_model.dedicated_resources = gca_machine_resources.DedicatedResources( machine_spec=machine_spec, min_replica_count=min_replica_count, max_replica_count=max_replica_count, ) - deployed_model = gca_endpoint.DeployedModel( - dedicated_resources=dedicated_resources, - model=model_resource_name, - display_name=deployed_model_display_name, - ) + else: - automatic_resources = gca_machine_resources.AutomaticResources( + deployed_model.automatic_resources = gca_machine_resources.AutomaticResources( min_replica_count=min_replica_count, max_replica_count=max_replica_count, ) - deployed_model = gca_endpoint.DeployedModel( - automatic_resources=automatic_resources, - model=model_resource_name, - display_name=deployed_model_display_name, - ) # Service will throw error if both metadata and parameters are not provided if explanation_metadata and explanation_parameters: @@ -1493,6 +1516,7 @@ def deploy( max_replica_count: Optional[int] = 1, accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, + service_account: Optional[str] = None, explanation_metadata: Optional[explain.ExplanationMetadata] = None, explanation_parameters: Optional[explain.ExplanationParameters] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), @@ -1548,6 +1572,13 @@ def deploy( NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3 accelerator_count (int): Optional. The number of accelerators to attach to a worker replica. + service_account (str): + The service account that the DeployedModel's container runs as. Specify the + email address of the service account. If this service account is not + specified, the container runs as a service account that doesn't have access + to the resource project. + Users deploying the Model must have the `iam.serviceAccounts.actAs` + permission on this service account. explanation_metadata (explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be @@ -1601,6 +1632,7 @@ def deploy( max_replica_count=max_replica_count, accelerator_type=accelerator_type, accelerator_count=accelerator_count, + service_account=service_account, explanation_metadata=explanation_metadata, explanation_parameters=explanation_parameters, metadata=metadata, @@ -1621,6 +1653,7 @@ def _deploy( max_replica_count: Optional[int] = 1, accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, + service_account: Optional[str] = None, explanation_metadata: Optional[explain.ExplanationMetadata] = None, explanation_parameters: Optional[explain.ExplanationParameters] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), @@ -1676,6 +1709,13 @@ def _deploy( NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3 accelerator_count (int): Optional. The number of accelerators to attach to a worker replica. + service_account (str): + The service account that the DeployedModel's container runs as. Specify the + email address of the service account. If this service account is not + specified, the container runs as a service account that doesn't have access + to the resource project. + Users deploying the Model must have the `iam.serviceAccounts.actAs` + permission on this service account. explanation_metadata (explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be @@ -1732,6 +1772,7 @@ def _deploy( max_replica_count=max_replica_count, accelerator_type=accelerator_type, accelerator_count=accelerator_count, + service_account=service_account, explanation_metadata=explanation_metadata, explanation_parameters=explanation_parameters, metadata=metadata, diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 5a12c3286b..a0090d1ee3 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -1517,6 +1517,7 @@ def _prepare_training_task_inputs_and_output_dir( self, worker_pool_specs: _DistributedTrainingSpec, base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, ) -> Tuple[Dict, str]: """Prepares training task inputs and output directory for custom job. @@ -1526,6 +1527,9 @@ def _prepare_training_task_inputs_and_output_dir( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. Returns: Training task inputs and Output directory for custom job. """ @@ -1542,6 +1546,9 @@ def _prepare_training_task_inputs_and_output_dir( "baseOutputDirectory": {"output_uri_prefix": base_output_dir}, } + if service_account: + training_task_inputs["serviceAccount"] = service_account + return training_task_inputs, base_output_dir @property @@ -1787,6 +1794,7 @@ def run( annotation_schema_uri: Optional[str] = None, model_display_name: Optional[str] = None, base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, replica_count: int = 0, @@ -1864,6 +1872,9 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. bigquery_destination (str): Provide this field if `dataset` is a BiqQuery dataset. The BigQuery project location where the training data is to @@ -1942,6 +1953,7 @@ def run( managed_model=managed_model, args=args, base_output_dir=base_output_dir, + service_account=service_account, bigquery_destination=bigquery_destination, training_fraction_split=training_fraction_split, validation_fraction_split=validation_fraction_split, @@ -1967,6 +1979,7 @@ def _run( managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, training_fraction_split: float = 0.8, validation_fraction_split: float = 0.1, @@ -2000,6 +2013,9 @@ def _run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. bigquery_destination (str): Provide this field if `dataset` is a BiqQuery dataset. The BigQuery project location where the training data is to @@ -2063,7 +2079,7 @@ def _run( training_task_inputs, base_output_dir, ) = self._prepare_training_task_inputs_and_output_dir( - worker_pool_specs, base_output_dir + worker_pool_specs, base_output_dir, service_account ) model = self._run_job( @@ -2306,6 +2322,7 @@ def run( annotation_schema_uri: Optional[str] = None, model_display_name: Optional[str] = None, base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, replica_count: int = 0, @@ -2383,6 +2400,9 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. bigquery_destination (str): Provide this field if `dataset` is a BiqQuery dataset. The BigQuery project location where the training data is to @@ -2460,6 +2480,7 @@ def run( managed_model=managed_model, args=args, base_output_dir=base_output_dir, + service_account=service_account, bigquery_destination=bigquery_destination, training_fraction_split=training_fraction_split, validation_fraction_split=validation_fraction_split, @@ -2484,6 +2505,7 @@ def _run( managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, training_fraction_split: float = 0.8, validation_fraction_split: float = 0.1, @@ -2514,6 +2536,9 @@ def _run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. bigquery_destination (str): The BigQuery project location where the training data is to be written to. In the given project a new dataset is created @@ -2570,7 +2595,7 @@ def _run( training_task_inputs, base_output_dir, ) = self._prepare_training_task_inputs_and_output_dir( - worker_pool_specs, base_output_dir + worker_pool_specs, base_output_dir, service_account ) model = self._run_job( @@ -3573,6 +3598,7 @@ def run( annotation_schema_uri: Optional[str] = None, model_display_name: Optional[str] = None, base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, replica_count: int = 0, @@ -3650,6 +3676,9 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. bigquery_destination (str): Provide this field if `dataset` is a BiqQuery dataset. The BigQuery project location where the training data is to @@ -3722,6 +3751,7 @@ def run( managed_model=managed_model, args=args, base_output_dir=base_output_dir, + service_account=service_account, training_fraction_split=training_fraction_split, validation_fraction_split=validation_fraction_split, test_fraction_split=test_fraction_split, @@ -3746,6 +3776,7 @@ def _run( managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, training_fraction_split: float = 0.8, validation_fraction_split: float = 0.1, test_fraction_split: float = 0.1, @@ -3777,6 +3808,9 @@ def _run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. training_fraction_split (float): The fraction of the input data that is to be used to train the Model. @@ -3819,7 +3853,7 @@ def _run( training_task_inputs, base_output_dir, ) = self._prepare_training_task_inputs_and_output_dir( - worker_pool_specs, base_output_dir + worker_pool_specs, base_output_dir, service_account ) model = self._run_job( diff --git a/tests/unit/aiplatform/test_endpoints.py b/tests/unit/aiplatform/test_endpoints.py index ea74c89e5e..03c3f38667 100644 --- a/tests/unit/aiplatform/test_endpoints.py +++ b/tests/unit/aiplatform/test_endpoints.py @@ -85,6 +85,7 @@ _TEST_PREDICTION = [[1.0, 2.0, 3.0], [3.0, 3.0, 1.0]] _TEST_INSTANCES = [[1.0, 2.0, 3.0], [1.0, 3.0, 4.0]] _TEST_CREDENTIALS = mock.Mock(spec=auth_credentials.AnonymousCredentials()) +_TEST_SERVICE_ACCOUNT = "vinnys@my-project.iam.gserviceaccount.com" _TEST_DEPLOYED_MODELS = [ gca_endpoint.DeployedModel(id=_TEST_ID, display_name=_TEST_DISPLAY_NAME), @@ -667,6 +668,7 @@ def test_deploy_with_dedicated_resources(self, deploy_model_mock, sync): machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, sync=sync, ) @@ -687,6 +689,7 @@ def test_deploy_with_dedicated_resources(self, deploy_model_mock, sync): dedicated_resources=expected_dedicated_resources, model=test_model.resource_name, display_name=None, + service_account=_TEST_SERVICE_ACCOUNT, ) deploy_model_mock.assert_called_once_with( endpoint=test_endpoint.resource_name, diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index 47b000d189..e76c1451f7 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -120,6 +120,7 @@ _TEST_PREDICTION_SCHEMA_URI = "gs://test/schema/predictions.yaml" _TEST_CREDENTIALS = mock.Mock(spec=auth_credentials.AnonymousCredentials()) +_TEST_SERVICE_ACCOUNT = "vinnys@my-project.iam.gserviceaccount.com" _TEST_EXPLANATION_METADATA = aiplatform.explain.ExplanationMetadata( inputs={ @@ -715,6 +716,7 @@ def test_deploy_no_endpoint_dedicated_resources(self, deploy_model_mock, sync): machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, accelerator_count=_TEST_ACCELERATOR_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, sync=sync, ) @@ -733,6 +735,7 @@ def test_deploy_no_endpoint_dedicated_resources(self, deploy_model_mock, sync): dedicated_resources=expected_dedicated_resources, model=test_model.resource_name, display_name=None, + service_account=_TEST_SERVICE_ACCOUNT, ) deploy_model_mock.assert_called_once_with( endpoint=test_endpoint.resource_name, diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index d63b028445..1a61469444 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -84,6 +84,7 @@ _TEST_ANNOTATION_SCHEMA_URI = schema.dataset.annotation.image.classification _TEST_BASE_OUTPUT_DIR = "gs://test-base-output-dir" +_TEST_SERVICE_ACCOUNT = "vinnys@my-project.iam.gserviceaccount.com" _TEST_BIGQUERY_DESTINATION = "bq://test-project" _TEST_RUN_ARGS = ["-v", 0.1, "--test=arg"] _TEST_REPLICA_COUNT = 1 @@ -593,6 +594,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( model_from_job = job.run( dataset=mock_tabular_dataset, base_output_dir=_TEST_BASE_OUTPUT_DIR, + service_account=_TEST_SERVICE_ACCOUNT, args=_TEST_RUN_ARGS, replica_count=1, machine_type=_TEST_MACHINE_TYPE, @@ -688,6 +690,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( { "workerPoolSpecs": [true_worker_pool_spec], "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "serviceAccount": _TEST_SERVICE_ACCOUNT, }, struct_pb2.Value(), ), @@ -2501,6 +2504,7 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( dataset=mock_nontabular_dataset, annotation_schema_uri=_TEST_ANNOTATION_SCHEMA_URI, base_output_dir=_TEST_BASE_OUTPUT_DIR, + service_account=_TEST_SERVICE_ACCOUNT, args=_TEST_RUN_ARGS, replica_count=1, machine_type=_TEST_MACHINE_TYPE, @@ -2582,6 +2586,7 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset( { "workerPoolSpecs": [true_worker_pool_spec], "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "serviceAccount": _TEST_SERVICE_ACCOUNT, }, struct_pb2.Value(), ), @@ -2930,6 +2935,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( dataset=mock_tabular_dataset, model_display_name=_TEST_MODEL_DISPLAY_NAME, base_output_dir=_TEST_BASE_OUTPUT_DIR, + service_account=_TEST_SERVICE_ACCOUNT, args=_TEST_RUN_ARGS, replica_count=1, machine_type=_TEST_MACHINE_TYPE, @@ -3018,6 +3024,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( { "workerPoolSpecs": [true_worker_pool_spec], "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + "serviceAccount": _TEST_SERVICE_ACCOUNT, }, struct_pb2.Value(), ),