Skip to content

Commit

Permalink
feat: define useful properties on `google.auth.external_account.Crede…
Browse files Browse the repository at this point in the history
…ntials` (#770)

This includes the following properties:

- `info`: This is the reverse of `from_info` defined on subclasses and useful to
  serialize external account credentials.
- `service_account_email`: This is the corresponding service account email if impersonation is used.
- `is_user`: This is `False` for workload identity pools and `True` for workforce pools (not yet supported).
  This can be mainly determined from the STS audience.

While the properties will primarily facilitate integration with gcloud, they are publicly useful for other contexts.
  • Loading branch information
bojeil-google committed Jun 9, 2021
1 parent 458f40b commit f97499c
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 8 deletions.
84 changes: 76 additions & 8 deletions google/auth/external_account.py
Expand Up @@ -28,8 +28,10 @@
"""

import abc
import copy
import datetime
import json
import re

import six

Expand All @@ -40,6 +42,8 @@
from google.oauth2 import sts
from google.oauth2 import utils

# External account JSON type identifier.
_EXTERNAL_ACCOUNT_JSON_TYPE = "external_account"
# The token exchange grant_type used for exchanging credentials.
_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
# The token exchange requested_token_type. This is always an access_token.
Expand Down Expand Up @@ -117,6 +121,76 @@ def __init__(
self._impersonated_credentials = None
self._project_id = None

@property
def info(self):
"""Generates the dictionary representation of the current credentials.
Returns:
Mapping: The dictionary representation of the credentials. This is the
reverse of "from_info" defined on the subclasses of this class. It is
useful for serializing the current credentials so it can deserialized
later.
"""
config_info = {
"type": _EXTERNAL_ACCOUNT_JSON_TYPE,
"audience": self._audience,
"subject_token_type": self._subject_token_type,
"token_url": self._token_url,
"service_account_impersonation_url": self._service_account_impersonation_url,
"credential_source": copy.deepcopy(self._credential_source),
"quota_project_id": self._quota_project_id,
"client_id": self._client_id,
"client_secret": self._client_secret,
}
# Remove None fields in the info dictionary.
for k, v in dict(config_info).items():
if v is None:
del config_info[k]

return config_info

@property
def service_account_email(self):
"""Returns the service account email if service account impersonation is used.
Returns:
Optional[str]: The service account email if impersonation is used. Otherwise
None is returned.
"""
if self._service_account_impersonation_url:
# Parse email from URL. The formal looks as follows:
# https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken
url = self._service_account_impersonation_url
start_index = url.rfind("/")
end_index = url.find(":generateAccessToken")
if start_index != -1 and end_index != -1 and start_index < end_index:
start_index = start_index + 1
return url[start_index:end_index]
return None

@property
def is_user(self):
"""Returns whether the credentials represent a user (True) or workload (False).
Workloads behave similarly to service accounts. Currently workloads will use
service account impersonation but will eventually not require impersonation.
As a result, this property is more reliable than the service account email
property in determining if the credentials represent a user or workload.
Returns:
bool: True if the credentials represent a user. False if they represent a
workload.
"""
# If service account impersonation is used, the credentials will always represent a
# service account.
if self._service_account_impersonation_url:
return False
# Workforce pools representing users have the following audience format:
# //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId
p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/")
if p.match(self._audience):
return True
return False

@property
def requires_scopes(self):
"""Checks if the credentials requires scopes.
Expand Down Expand Up @@ -282,14 +356,8 @@ def _initialize_impersonated_credentials(self):
)

# Determine target_principal.
start_index = self._service_account_impersonation_url.rfind("/")
end_index = self._service_account_impersonation_url.find(":generateAccessToken")
if start_index != -1 and end_index != -1 and start_index < end_index:
start_index = start_index + 1
target_principal = self._service_account_impersonation_url[
start_index:end_index
]
else:
target_principal = self.service_account_email
if not target_principal:
raise exceptions.RefreshError(
"Unable to determine target principal from service account impersonation URL."
)
Expand Down
13 changes: 13 additions & 0 deletions tests/test_aws.py
Expand Up @@ -919,6 +919,19 @@ def test_constructor_invalid_environment_id_version(self):

assert excinfo.match(r"aws version '3' is not supported in the current build.")

def test_info(self):
credentials = self.make_credentials(
credential_source=self.CREDENTIAL_SOURCE.copy()
)

assert credentials.info == {
"type": "external_account",
"audience": AUDIENCE,
"subject_token_type": SUBJECT_TOKEN_TYPE,
"token_url": TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE,
}

def test_retrieve_subject_token_missing_region_url(self):
# When AWS_REGION envvar is not available, region_url is required for
# determining the current AWS region.
Expand Down
116 changes: 116 additions & 0 deletions tests/test_external_account.py
Expand Up @@ -31,6 +31,12 @@
# Base64 encoding of "username:password"
BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
# List of valid workforce pool audiences.
TEST_USER_AUDIENCES = [
"//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
]


class CredentialsImpl(external_account.Credentials):
Expand Down Expand Up @@ -342,6 +348,116 @@ def test_with_invalid_impersonation_target_principal(self):
r"Unable to determine target principal from service account impersonation URL."
)

def test_info(self):
credentials = self.make_credentials()

assert credentials.info == {
"type": "external_account",
"audience": self.AUDIENCE,
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"token_url": self.TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE.copy(),
}

def test_info_with_full_options(self):
credentials = self.make_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
quota_project_id=self.QUOTA_PROJECT_ID,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)

assert credentials.info == {
"type": "external_account",
"audience": self.AUDIENCE,
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"token_url": self.TOKEN_URL,
"service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
"credential_source": self.CREDENTIAL_SOURCE.copy(),
"quota_project_id": self.QUOTA_PROJECT_ID,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}

def test_service_account_email_without_impersonation(self):
credentials = self.make_credentials()

assert credentials.service_account_email is None

def test_service_account_email_with_impersonation(self):
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
)

assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL

@pytest.mark.parametrize(
"audience",
# Workload identity pool audiences or invalid workforce pool audiences.
[
# Legacy K8s audience format.
"identitynamespace:1f12345:my_provider",
(
"//iam.googleapis.com/projects/123456/locations/"
"global/workloadIdentityPools/pool-id/providers/"
"provider-id"
),
(
"//iam.googleapis.com/projects/123456/locations/"
"eu/workloadIdentityPools/pool-id/providers/"
"provider-id"
),
# Pool ID with workforcePools string.
(
"//iam.googleapis.com/projects/123456/locations/"
"global/workloadIdentityPools/workforcePools/providers/"
"provider-id"
),
# Unrealistic / incorrect workforce pool audiences.
"//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
"//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
],
)
def test_is_user_with_non_users(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)

assert credentials.is_user is False

@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
def test_is_user_with_users(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)

assert credentials.is_user is True

@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
def test_is_user_with_users_and_impersonation(self, audience):
# Initialize the credentials with service account impersonation.
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)

# Even though the audience is for a workforce pool, since service account
# impersonation is used, the credentials will represent a service account and
# not a user.
assert credentials.is_user is False

@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_without_client_auth_success(self, unused_utcnow):
response = self.SUCCESS_RESPONSE.copy()
Expand Down
26 changes: 26 additions & 0 deletions tests/test_identity_pool.py
Expand Up @@ -430,6 +430,32 @@ def test_constructor_missing_subject_token_field_name(self):
r"Missing subject_token_field_name for JSON credential_source format"
)

def test_info_with_file_credential_source(self):
credentials = self.make_credentials(
credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
)

assert credentials.info == {
"type": "external_account",
"audience": AUDIENCE,
"subject_token_type": SUBJECT_TOKEN_TYPE,
"token_url": TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
}

def test_info_with_url_credential_source(self):
credentials = self.make_credentials(
credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
)

assert credentials.info == {
"type": "external_account",
"audience": AUDIENCE,
"subject_token_type": SUBJECT_TOKEN_TYPE,
"token_url": TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
}

def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
# Provide empty text file.
empty_file = tmpdir.join("empty.txt")
Expand Down

0 comments on commit f97499c

Please sign in to comment.