Skip to content

Commit

Permalink
feat: Add service account support to Custom Training and Model deploy…
Browse files Browse the repository at this point in the history
…ment (#342)
  • Loading branch information
vinnysenthil committed Apr 20, 2021
1 parent a5fa7a2 commit b4b1b12
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 15 deletions.
65 changes: 53 additions & 12 deletions google/cloud/aiplatform/models.py
Expand Up @@ -477,6 +477,7 @@ def deploy(
max_replica_count: int = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -531,6 +532,13 @@ def deploy(
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -569,6 +577,7 @@ def deploy(
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
service_account=service_account,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters,
metadata=metadata,
Expand All @@ -587,6 +596,7 @@ def _deploy(
max_replica_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -641,6 +651,13 @@ def _deploy(
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -677,6 +694,7 @@ def _deploy(
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
service_account=service_account,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters,
metadata=metadata,
Expand All @@ -701,6 +719,7 @@ def _deploy_call(
max_replica_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -753,6 +772,13 @@ def _deploy_call(
is not provided, the larger value of min_replica_count or 1 will
be used. If value provided is smaller than min_replica_count, it
will automatically be increased to be min_replica_count.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -788,6 +814,12 @@ def _deploy_call(
gca_endpoint = gca_endpoint_v1beta1
gca_machine_resources = gca_machine_resources_v1beta1

deployed_model = gca_endpoint.DeployedModel(
model=model_resource_name,
display_name=deployed_model_display_name,
service_account=service_account,
)

if machine_type:
machine_spec = gca_machine_resources.MachineSpec(machine_type=machine_type)

Expand All @@ -796,26 +828,17 @@ def _deploy_call(
machine_spec.accelerator_type = accelerator_type
machine_spec.accelerator_count = accelerator_count

dedicated_resources = gca_machine_resources.DedicatedResources(
deployed_model.dedicated_resources = gca_machine_resources.DedicatedResources(
machine_spec=machine_spec,
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
)
deployed_model = gca_endpoint.DeployedModel(
dedicated_resources=dedicated_resources,
model=model_resource_name,
display_name=deployed_model_display_name,
)

else:
automatic_resources = gca_machine_resources.AutomaticResources(
deployed_model.automatic_resources = gca_machine_resources.AutomaticResources(
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
)
deployed_model = gca_endpoint.DeployedModel(
automatic_resources=automatic_resources,
model=model_resource_name,
display_name=deployed_model_display_name,
)

# Service will throw error if both metadata and parameters are not provided
if explanation_metadata and explanation_parameters:
Expand Down Expand Up @@ -1493,6 +1516,7 @@ def deploy(
max_replica_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -1548,6 +1572,13 @@ def deploy(
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -1601,6 +1632,7 @@ def deploy(
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
service_account=service_account,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters,
metadata=metadata,
Expand All @@ -1621,6 +1653,7 @@ def _deploy(
max_replica_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -1676,6 +1709,13 @@ def _deploy(
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -1732,6 +1772,7 @@ def _deploy(
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
service_account=service_account,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters,
metadata=metadata,
Expand Down
40 changes: 37 additions & 3 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -1517,6 +1517,7 @@ def _prepare_training_task_inputs_and_output_dir(
self,
worker_pool_specs: _DistributedTrainingSpec,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
) -> Tuple[Dict, str]:
"""Prepares training task inputs and output directory for custom job.
Expand All @@ -1526,6 +1527,9 @@ def _prepare_training_task_inputs_and_output_dir(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
Returns:
Training task inputs and Output directory for custom job.
"""
Expand All @@ -1542,6 +1546,9 @@ def _prepare_training_task_inputs_and_output_dir(
"baseOutputDirectory": {"output_uri_prefix": base_output_dir},
}

if service_account:
training_task_inputs["serviceAccount"] = service_account

return training_task_inputs, base_output_dir

@property
Expand Down Expand Up @@ -1787,6 +1794,7 @@ def run(
annotation_schema_uri: Optional[str] = None,
model_display_name: Optional[str] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
args: Optional[List[Union[str, float, int]]] = None,
replica_count: int = 0,
Expand Down Expand Up @@ -1864,6 +1872,9 @@ def run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
Provide this field if `dataset` is a BiqQuery dataset.
The BigQuery project location where the training data is to
Expand Down Expand Up @@ -1942,6 +1953,7 @@ def run(
managed_model=managed_model,
args=args,
base_output_dir=base_output_dir,
service_account=service_account,
bigquery_destination=bigquery_destination,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
Expand All @@ -1967,6 +1979,7 @@ def _run(
managed_model: Optional[gca_model.Model] = None,
args: Optional[List[Union[str, float, int]]] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
Expand Down Expand Up @@ -2000,6 +2013,9 @@ def _run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
Provide this field if `dataset` is a BiqQuery dataset.
The BigQuery project location where the training data is to
Expand Down Expand Up @@ -2063,7 +2079,7 @@ def _run(
training_task_inputs,
base_output_dir,
) = self._prepare_training_task_inputs_and_output_dir(
worker_pool_specs, base_output_dir
worker_pool_specs, base_output_dir, service_account
)

model = self._run_job(
Expand Down Expand Up @@ -2306,6 +2322,7 @@ def run(
annotation_schema_uri: Optional[str] = None,
model_display_name: Optional[str] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
args: Optional[List[Union[str, float, int]]] = None,
replica_count: int = 0,
Expand Down Expand Up @@ -2383,6 +2400,9 @@ def run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
Provide this field if `dataset` is a BiqQuery dataset.
The BigQuery project location where the training data is to
Expand Down Expand Up @@ -2460,6 +2480,7 @@ def run(
managed_model=managed_model,
args=args,
base_output_dir=base_output_dir,
service_account=service_account,
bigquery_destination=bigquery_destination,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
Expand All @@ -2484,6 +2505,7 @@ def _run(
managed_model: Optional[gca_model.Model] = None,
args: Optional[List[Union[str, float, int]]] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
Expand Down Expand Up @@ -2514,6 +2536,9 @@ def _run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
The BigQuery project location where the training data is to
be written to. In the given project a new dataset is created
Expand Down Expand Up @@ -2570,7 +2595,7 @@ def _run(
training_task_inputs,
base_output_dir,
) = self._prepare_training_task_inputs_and_output_dir(
worker_pool_specs, base_output_dir
worker_pool_specs, base_output_dir, service_account
)

model = self._run_job(
Expand Down Expand Up @@ -3573,6 +3598,7 @@ def run(
annotation_schema_uri: Optional[str] = None,
model_display_name: Optional[str] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
args: Optional[List[Union[str, float, int]]] = None,
replica_count: int = 0,
Expand Down Expand Up @@ -3650,6 +3676,9 @@ def run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
Provide this field if `dataset` is a BiqQuery dataset.
The BigQuery project location where the training data is to
Expand Down Expand Up @@ -3722,6 +3751,7 @@ def run(
managed_model=managed_model,
args=args,
base_output_dir=base_output_dir,
service_account=service_account,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
test_fraction_split=test_fraction_split,
Expand All @@ -3746,6 +3776,7 @@ def _run(
managed_model: Optional[gca_model.Model] = None,
args: Optional[List[Union[str, float, int]]] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
Expand Down Expand Up @@ -3777,6 +3808,9 @@ def _run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model.
Expand Down Expand Up @@ -3819,7 +3853,7 @@ def _run(
training_task_inputs,
base_output_dir,
) = self._prepare_training_task_inputs_and_output_dir(
worker_pool_specs, base_output_dir
worker_pool_specs, base_output_dir, service_account
)

model = self._run_job(
Expand Down

0 comments on commit b4b1b12

Please sign in to comment.