Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add service account support to Custom Training and Model deployment #342

Merged
merged 4 commits into from Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 53 additions & 12 deletions google/cloud/aiplatform/models.py
Expand Up @@ -477,6 +477,7 @@ def deploy(
max_replica_count: int = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -531,6 +532,13 @@ def deploy(
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -569,6 +577,7 @@ def deploy(
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
service_account=service_account,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters,
metadata=metadata,
Expand All @@ -587,6 +596,7 @@ def _deploy(
max_replica_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -641,6 +651,13 @@ def _deploy(
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -677,6 +694,7 @@ def _deploy(
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
service_account=service_account,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters,
metadata=metadata,
Expand All @@ -701,6 +719,7 @@ def _deploy_call(
max_replica_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -753,6 +772,13 @@ def _deploy_call(
is not provided, the larger value of min_replica_count or 1 will
be used. If value provided is smaller than min_replica_count, it
will automatically be increased to be min_replica_count.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -788,6 +814,12 @@ def _deploy_call(
gca_endpoint = gca_endpoint_v1beta1
gca_machine_resources = gca_machine_resources_v1beta1

deployed_model = gca_endpoint.DeployedModel(
model=model_resource_name,
display_name=deployed_model_display_name,
service_account=service_account,
)

if machine_type:
machine_spec = gca_machine_resources.MachineSpec(machine_type=machine_type)

Expand All @@ -796,26 +828,17 @@ def _deploy_call(
machine_spec.accelerator_type = accelerator_type
machine_spec.accelerator_count = accelerator_count

dedicated_resources = gca_machine_resources.DedicatedResources(
deployed_model.dedicated_resources = gca_machine_resources.DedicatedResources(
machine_spec=machine_spec,
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
)
deployed_model = gca_endpoint.DeployedModel(
dedicated_resources=dedicated_resources,
model=model_resource_name,
display_name=deployed_model_display_name,
)

else:
automatic_resources = gca_machine_resources.AutomaticResources(
deployed_model.automatic_resources = gca_machine_resources.AutomaticResources(
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
)
deployed_model = gca_endpoint.DeployedModel(
automatic_resources=automatic_resources,
model=model_resource_name,
display_name=deployed_model_display_name,
)

# Service will throw error if both metadata and parameters are not provided
if explanation_metadata and explanation_parameters:
Expand Down Expand Up @@ -1493,6 +1516,7 @@ def deploy(
max_replica_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -1548,6 +1572,13 @@ def deploy(
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -1601,6 +1632,7 @@ def deploy(
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
service_account=service_account,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters,
metadata=metadata,
Expand All @@ -1621,6 +1653,7 @@ def _deploy(
max_replica_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
service_account: Optional[str] = None,
explanation_metadata: Optional[explain.ExplanationMetadata] = None,
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
Expand Down Expand Up @@ -1676,6 +1709,13 @@ def _deploy(
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
Optional. The number of accelerators to attach to a worker replica.
service_account (str):
The service account that the DeployedModel's container runs as. Specify the
email address of the service account. If this service account is not
specified, the container runs as a service account that doesn't have access
to the resource project.
Users deploying the Model must have the `iam.serviceAccounts.actAs`
permission on this service account.
explanation_metadata (explain.ExplanationMetadata):
Optional. Metadata describing the Model's input and output for explanation.
Both `explanation_metadata` and `explanation_parameters` must be
Expand Down Expand Up @@ -1732,6 +1772,7 @@ def _deploy(
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
service_account=service_account,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters,
metadata=metadata,
Expand Down
40 changes: 37 additions & 3 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -1517,6 +1517,7 @@ def _prepare_training_task_inputs_and_output_dir(
self,
worker_pool_specs: _DistributedTrainingSpec,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
) -> Tuple[Dict, str]:
"""Prepares training task inputs and output directory for custom job.

Expand All @@ -1526,6 +1527,9 @@ def _prepare_training_task_inputs_and_output_dir(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
Returns:
Training task inputs and Output directory for custom job.
"""
Expand All @@ -1542,6 +1546,9 @@ def _prepare_training_task_inputs_and_output_dir(
"baseOutputDirectory": {"output_uri_prefix": base_output_dir},
}

if service_account:
training_task_inputs["serviceAccount"] = service_account

return training_task_inputs, base_output_dir

@property
Expand Down Expand Up @@ -1787,6 +1794,7 @@ def run(
annotation_schema_uri: Optional[str] = None,
model_display_name: Optional[str] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
args: Optional[List[Union[str, float, int]]] = None,
replica_count: int = 0,
Expand Down Expand Up @@ -1864,6 +1872,9 @@ def run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
Provide this field if `dataset` is a BiqQuery dataset.
The BigQuery project location where the training data is to
Expand Down Expand Up @@ -1942,6 +1953,7 @@ def run(
managed_model=managed_model,
args=args,
base_output_dir=base_output_dir,
service_account=service_account,
bigquery_destination=bigquery_destination,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
Expand All @@ -1967,6 +1979,7 @@ def _run(
managed_model: Optional[gca_model.Model] = None,
args: Optional[List[Union[str, float, int]]] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
Expand Down Expand Up @@ -2000,6 +2013,9 @@ def _run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
Provide this field if `dataset` is a BiqQuery dataset.
The BigQuery project location where the training data is to
Expand Down Expand Up @@ -2063,7 +2079,7 @@ def _run(
training_task_inputs,
base_output_dir,
) = self._prepare_training_task_inputs_and_output_dir(
worker_pool_specs, base_output_dir
worker_pool_specs, base_output_dir, service_account
)

model = self._run_job(
Expand Down Expand Up @@ -2306,6 +2322,7 @@ def run(
annotation_schema_uri: Optional[str] = None,
model_display_name: Optional[str] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
args: Optional[List[Union[str, float, int]]] = None,
replica_count: int = 0,
Expand Down Expand Up @@ -2383,6 +2400,9 @@ def run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
Provide this field if `dataset` is a BiqQuery dataset.
The BigQuery project location where the training data is to
Expand Down Expand Up @@ -2460,6 +2480,7 @@ def run(
managed_model=managed_model,
args=args,
base_output_dir=base_output_dir,
service_account=service_account,
bigquery_destination=bigquery_destination,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
Expand All @@ -2484,6 +2505,7 @@ def _run(
managed_model: Optional[gca_model.Model] = None,
args: Optional[List[Union[str, float, int]]] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
Expand Down Expand Up @@ -2514,6 +2536,9 @@ def _run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
The BigQuery project location where the training data is to
be written to. In the given project a new dataset is created
Expand Down Expand Up @@ -2570,7 +2595,7 @@ def _run(
training_task_inputs,
base_output_dir,
) = self._prepare_training_task_inputs_and_output_dir(
worker_pool_specs, base_output_dir
worker_pool_specs, base_output_dir, service_account
)

model = self._run_job(
Expand Down Expand Up @@ -3573,6 +3598,7 @@ def run(
annotation_schema_uri: Optional[str] = None,
model_display_name: Optional[str] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
bigquery_destination: Optional[str] = None,
args: Optional[List[Union[str, float, int]]] = None,
replica_count: int = 0,
Expand Down Expand Up @@ -3650,6 +3676,9 @@ def run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
bigquery_destination (str):
Provide this field if `dataset` is a BiqQuery dataset.
The BigQuery project location where the training data is to
Expand Down Expand Up @@ -3722,6 +3751,7 @@ def run(
managed_model=managed_model,
args=args,
base_output_dir=base_output_dir,
service_account=service_account,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
test_fraction_split=test_fraction_split,
Expand All @@ -3746,6 +3776,7 @@ def _run(
managed_model: Optional[gca_model.Model] = None,
args: Optional[List[Union[str, float, int]]] = None,
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
Expand Down Expand Up @@ -3777,6 +3808,9 @@ def _run(
base_output_dir (str):
GCS output directory of job. If not provided a
timestamped directory in the staging directory will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model.
Expand Down Expand Up @@ -3819,7 +3853,7 @@ def _run(
training_task_inputs,
base_output_dir,
) = self._prepare_training_task_inputs_and_output_dir(
worker_pool_specs, base_output_dir
worker_pool_specs, base_output_dir, service_account
)

model = self._run_job(
Expand Down