Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: define useful properties on google.auth.external_account.Credentials #770

Merged
merged 2 commits into from Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 76 additions & 8 deletions google/auth/external_account.py
Expand Up @@ -28,8 +28,10 @@
"""

import abc
import copy
import datetime
import json
import re

import six

Expand All @@ -40,6 +42,8 @@
from google.oauth2 import sts
from google.oauth2 import utils

# External account JSON type identifier.
_EXTERNAL_ACCOUNT_JSON_TYPE = "external_account"
# The token exchange grant_type used for exchanging credentials.
_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
# The token exchange requested_token_type. This is always an access_token.
Expand Down Expand Up @@ -117,6 +121,76 @@ def __init__(
self._impersonated_credentials = None
self._project_id = None

@property
def info(self):
"""Generates the dictionary representation of the current credentials.

Returns:
Mapping: The dictionary representation of the credentials. This is the
reverse of "from_info" defined on the subclasses of this class. It is
useful for serializing the current credentials so it can deserialized
later.
"""
config_info = {
"type": _EXTERNAL_ACCOUNT_JSON_TYPE,
"audience": self._audience,
"subject_token_type": self._subject_token_type,
"token_url": self._token_url,
"service_account_impersonation_url": self._service_account_impersonation_url,
"credential_source": copy.deepcopy(self._credential_source),
"quota_project_id": self._quota_project_id,
"client_id": self._client_id,
"client_secret": self._client_secret,
}
# Remove None fields in the info dictionary.
for k, v in dict(config_info).items():
if v is None:
del config_info[k]
Copy link
Contributor

@tseaver tseaver Jun 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, deleting an item from a dictionary while iterating it is unsafe. It would be better to create a new one, e.g.:

return {key: value for key, value in config_info.items() if value is not None}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I can update this in a follow up.


return config_info

@property
def service_account_email(self):
"""Returns the service account email if service account impersonation is used.

Returns:
Optional[str]: The service account email if impersonation is used. Otherwise
None is returned.
"""
if self._service_account_impersonation_url:
# Parse email from URL. The formal looks as follows:
# https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken
url = self._service_account_impersonation_url
start_index = url.rfind("/")
end_index = url.find(":generateAccessToken")
if start_index != -1 and end_index != -1 and start_index < end_index:
start_index = start_index + 1
return url[start_index:end_index]
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it is unusual that I would say so, this code would be clearer using a regex:

service_account_regex = re.compile(r".*/(?P<email>.*):generateAccessToken")
...
        if self._service_account_impersonation_url:
            match = service_account_regex.match(self._service_account_impersonation_url)
            if match is not None:
                return match.group("email")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would disagree on this one. It is better/more efficient to use string methods when possible over regex. It does require a couple more steps to do so here but I provided an example for the format used. It should make it easier to understand and follow the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that you used a regex in a very similar case below, except without pulling the value from the group. Efficiency here is likely not germane, and debatable: it would need measuring, if we thought it important; clarity (to the reader) is far more crucial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate your input but I think this is a very subjective situation. I would change it but I think this is a coin toss.
The other pattern I am checking for below would be much harder to achieve without a regex. In this specific case, the implementation is simpler to read and follow, especially when paired with an example.


@property
def is_user(self):
"""Returns whether the credentials represent a user (True) or workload (False).
Workloads behave similarly to service accounts. Currently workloads will use
service account impersonation but will eventually not require impersonation.
As a result, this property is more reliable than the service account email
property in determining if the credentials represent a user or workload.

Returns:
bool: True if the credentials represent a user. False if they represent a
workload.
"""
# If service account impersonation is used, the credentials will always represent a
# service account.
if self._service_account_impersonation_url:
return False
# Workforce pools representing users have the following audience format:
# //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId
p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This regex should likely be compiled at module scope, rather than on each call (yes, I know the re module interns some number of recent compilations).

if p.match(self._audience):
return True
return False

@property
def requires_scopes(self):
"""Checks if the credentials requires scopes.
Expand Down Expand Up @@ -282,14 +356,8 @@ def _initialize_impersonated_credentials(self):
)

# Determine target_principal.
start_index = self._service_account_impersonation_url.rfind("/")
end_index = self._service_account_impersonation_url.find(":generateAccessToken")
if start_index != -1 and end_index != -1 and start_index < end_index:
start_index = start_index + 1
target_principal = self._service_account_impersonation_url[
start_index:end_index
]
else:
target_principal = self.service_account_email
if not target_principal:
raise exceptions.RefreshError(
"Unable to determine target principal from service account impersonation URL."
)
Expand Down
13 changes: 13 additions & 0 deletions tests/test_aws.py
Expand Up @@ -919,6 +919,19 @@ def test_constructor_invalid_environment_id_version(self):

assert excinfo.match(r"aws version '3' is not supported in the current build.")

def test_info(self):
credentials = self.make_credentials(
credential_source=self.CREDENTIAL_SOURCE.copy()
)

assert credentials.info == {
"type": "external_account",
"audience": AUDIENCE,
"subject_token_type": SUBJECT_TOKEN_TYPE,
"token_url": TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE,
}

def test_retrieve_subject_token_missing_region_url(self):
# When AWS_REGION envvar is not available, region_url is required for
# determining the current AWS region.
Expand Down
116 changes: 116 additions & 0 deletions tests/test_external_account.py
Expand Up @@ -31,6 +31,12 @@
# Base64 encoding of "username:password"
BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
# List of valid workforce pool audiences.
TEST_USER_AUDIENCES = [
"//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
]


class CredentialsImpl(external_account.Credentials):
Expand Down Expand Up @@ -342,6 +348,116 @@ def test_with_invalid_impersonation_target_principal(self):
r"Unable to determine target principal from service account impersonation URL."
)

def test_info(self):
credentials = self.make_credentials()

assert credentials.info == {
"type": "external_account",
"audience": self.AUDIENCE,
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"token_url": self.TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE.copy(),
}

def test_info_with_full_options(self):
credentials = self.make_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
quota_project_id=self.QUOTA_PROJECT_ID,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)

assert credentials.info == {
"type": "external_account",
"audience": self.AUDIENCE,
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"token_url": self.TOKEN_URL,
"service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
"credential_source": self.CREDENTIAL_SOURCE.copy(),
"quota_project_id": self.QUOTA_PROJECT_ID,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}

def test_service_account_email_without_impersonation(self):
credentials = self.make_credentials()

assert credentials.service_account_email is None

def test_service_account_email_with_impersonation(self):
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
)

assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL

@pytest.mark.parametrize(
"audience",
# Workload identity pool audiences or invalid workforce pool audiences.
[
# Legacy K8s audience format.
"identitynamespace:1f12345:my_provider",
(
"//iam.googleapis.com/projects/123456/locations/"
"global/workloadIdentityPools/pool-id/providers/"
"provider-id"
),
(
"//iam.googleapis.com/projects/123456/locations/"
"eu/workloadIdentityPools/pool-id/providers/"
"provider-id"
),
# Pool ID with workforcePools string.
(
"//iam.googleapis.com/projects/123456/locations/"
"global/workloadIdentityPools/workforcePools/providers/"
"provider-id"
),
# Unrealistic / incorrect workforce pool audiences.
"//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
"//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
],
)
def test_is_user_with_non_users(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)

assert credentials.is_user is False

@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
def test_is_user_with_users(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)

assert credentials.is_user is True

@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
def test_is_user_with_users_and_impersonation(self, audience):
# Initialize the credentials with service account impersonation.
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)

# Even though the audience is for a workforce pool, since service account
# impersonation is used, the credentials will represent a service account and
# not a user.
assert credentials.is_user is False

@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_without_client_auth_success(self, unused_utcnow):
response = self.SUCCESS_RESPONSE.copy()
Expand Down
26 changes: 26 additions & 0 deletions tests/test_identity_pool.py
Expand Up @@ -430,6 +430,32 @@ def test_constructor_missing_subject_token_field_name(self):
r"Missing subject_token_field_name for JSON credential_source format"
)

def test_info_with_file_credential_source(self):
credentials = self.make_credentials(
credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
)

assert credentials.info == {
"type": "external_account",
"audience": AUDIENCE,
"subject_token_type": SUBJECT_TOKEN_TYPE,
"token_url": TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
}

def test_info_with_url_credential_source(self):
credentials = self.make_credentials(
credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
)

assert credentials.info == {
"type": "external_account",
"audience": AUDIENCE,
"subject_token_type": SUBJECT_TOKEN_TYPE,
"token_url": TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
}

def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
# Provide empty text file.
empty_file = tmpdir.join("empty.txt")
Expand Down