diff --git a/google/auth/external_account.py b/google/auth/external_account.py index 0429ee08f..e40c6528b 100644 --- a/google/auth/external_account.py +++ b/google/auth/external_account.py @@ -28,8 +28,10 @@ """ import abc +import copy import datetime import json +import re import six @@ -40,6 +42,8 @@ from google.oauth2 import sts from google.oauth2 import utils +# External account JSON type identifier. +_EXTERNAL_ACCOUNT_JSON_TYPE = "external_account" # The token exchange grant_type used for exchanging credentials. _STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" # The token exchange requested_token_type. This is always an access_token. @@ -117,6 +121,76 @@ def __init__( self._impersonated_credentials = None self._project_id = None + @property + def info(self): + """Generates the dictionary representation of the current credentials. + + Returns: + Mapping: The dictionary representation of the credentials. This is the + reverse of "from_info" defined on the subclasses of this class. It is + useful for serializing the current credentials so it can deserialized + later. + """ + config_info = { + "type": _EXTERNAL_ACCOUNT_JSON_TYPE, + "audience": self._audience, + "subject_token_type": self._subject_token_type, + "token_url": self._token_url, + "service_account_impersonation_url": self._service_account_impersonation_url, + "credential_source": copy.deepcopy(self._credential_source), + "quota_project_id": self._quota_project_id, + "client_id": self._client_id, + "client_secret": self._client_secret, + } + # Remove None fields in the info dictionary. + for k, v in dict(config_info).items(): + if v is None: + del config_info[k] + + return config_info + + @property + def service_account_email(self): + """Returns the service account email if service account impersonation is used. + + Returns: + Optional[str]: The service account email if impersonation is used. Otherwise + None is returned. + """ + if self._service_account_impersonation_url: + # Parse email from URL. The formal looks as follows: + # https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken + url = self._service_account_impersonation_url + start_index = url.rfind("/") + end_index = url.find(":generateAccessToken") + if start_index != -1 and end_index != -1 and start_index < end_index: + start_index = start_index + 1 + return url[start_index:end_index] + return None + + @property + def is_user(self): + """Returns whether the credentials represent a user (True) or workload (False). + Workloads behave similarly to service accounts. Currently workloads will use + service account impersonation but will eventually not require impersonation. + As a result, this property is more reliable than the service account email + property in determining if the credentials represent a user or workload. + + Returns: + bool: True if the credentials represent a user. False if they represent a + workload. + """ + # If service account impersonation is used, the credentials will always represent a + # service account. + if self._service_account_impersonation_url: + return False + # Workforce pools representing users have the following audience format: + # //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId + p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/") + if p.match(self._audience): + return True + return False + @property def requires_scopes(self): """Checks if the credentials requires scopes. @@ -282,14 +356,8 @@ def _initialize_impersonated_credentials(self): ) # Determine target_principal. - start_index = self._service_account_impersonation_url.rfind("/") - end_index = self._service_account_impersonation_url.find(":generateAccessToken") - if start_index != -1 and end_index != -1 and start_index < end_index: - start_index = start_index + 1 - target_principal = self._service_account_impersonation_url[ - start_index:end_index - ] - else: + target_principal = self.service_account_email + if not target_principal: raise exceptions.RefreshError( "Unable to determine target principal from service account impersonation URL." ) diff --git a/tests/test_aws.py b/tests/test_aws.py index 7c7ee36be..9ca08d5b2 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -919,6 +919,19 @@ def test_constructor_invalid_environment_id_version(self): assert excinfo.match(r"aws version '3' is not supported in the current build.") + def test_info(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE, + } + def test_retrieve_subject_token_missing_region_url(self): # When AWS_REGION envvar is not available, region_url is required for # determining the current AWS region. diff --git a/tests/test_external_account.py b/tests/test_external_account.py index 8f8d98009..7390fb980 100644 --- a/tests/test_external_account.py +++ b/tests/test_external_account.py @@ -31,6 +31,12 @@ # Base64 encoding of "username:password" BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" +# List of valid workforce pool audiences. +TEST_USER_AUDIENCES = [ + "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id", + "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id", + "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id", +] class CredentialsImpl(external_account.Credentials): @@ -342,6 +348,116 @@ def test_with_invalid_impersonation_target_principal(self): r"Unable to determine target principal from service account impersonation URL." ) + def test_info(self): + credentials = self.make_credentials() + + assert credentials.info == { + "type": "external_account", + "audience": self.AUDIENCE, + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + "token_url": self.TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE.copy(), + } + + def test_info_with_full_options(self): + credentials = self.make_credentials( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + quota_project_id=self.QUOTA_PROJECT_ID, + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, + ) + + assert credentials.info == { + "type": "external_account", + "audience": self.AUDIENCE, + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + "token_url": self.TOKEN_URL, + "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL, + "credential_source": self.CREDENTIAL_SOURCE.copy(), + "quota_project_id": self.QUOTA_PROJECT_ID, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + } + + def test_service_account_email_without_impersonation(self): + credentials = self.make_credentials() + + assert credentials.service_account_email is None + + def test_service_account_email_with_impersonation(self): + credentials = self.make_credentials( + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL + ) + + assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL + + @pytest.mark.parametrize( + "audience", + # Workload identity pool audiences or invalid workforce pool audiences. + [ + # Legacy K8s audience format. + "identitynamespace:1f12345:my_provider", + ( + "//iam.googleapis.com/projects/123456/locations/" + "global/workloadIdentityPools/pool-id/providers/" + "provider-id" + ), + ( + "//iam.googleapis.com/projects/123456/locations/" + "eu/workloadIdentityPools/pool-id/providers/" + "provider-id" + ), + # Pool ID with workforcePools string. + ( + "//iam.googleapis.com/projects/123456/locations/" + "global/workloadIdentityPools/workforcePools/providers/" + "provider-id" + ), + # Unrealistic / incorrect workforce pool audiences. + "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id", + "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id", + "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id", + "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id", + "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id", + ], + ) + def test_is_user_with_non_users(self, audience): + credentials = CredentialsImpl( + audience=audience, + subject_token_type=self.SUBJECT_TOKEN_TYPE, + token_url=self.TOKEN_URL, + credential_source=self.CREDENTIAL_SOURCE, + ) + + assert credentials.is_user is False + + @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES) + def test_is_user_with_users(self, audience): + credentials = CredentialsImpl( + audience=audience, + subject_token_type=self.SUBJECT_TOKEN_TYPE, + token_url=self.TOKEN_URL, + credential_source=self.CREDENTIAL_SOURCE, + ) + + assert credentials.is_user is True + + @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES) + def test_is_user_with_users_and_impersonation(self, audience): + # Initialize the credentials with service account impersonation. + credentials = CredentialsImpl( + audience=audience, + subject_token_type=self.SUBJECT_TOKEN_TYPE, + token_url=self.TOKEN_URL, + credential_source=self.CREDENTIAL_SOURCE, + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, + ) + + # Even though the audience is for a workforce pool, since service account + # impersonation is used, the credentials will represent a service account and + # not a user. + assert credentials.is_user is False + @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) def test_refresh_without_client_auth_success(self, unused_utcnow): response = self.SUCCESS_RESPONSE.copy() diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py index 90a0e2549..b529268fb 100644 --- a/tests/test_identity_pool.py +++ b/tests/test_identity_pool.py @@ -430,6 +430,32 @@ def test_constructor_missing_subject_token_field_name(self): r"Missing subject_token_field_name for JSON credential_source format" ) + def test_info_with_file_credential_source(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, + } + + def test_info_with_url_credential_source(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy() + ) + + assert credentials.info == { + "type": "external_account", + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_JSON_URL, + } + def test_retrieve_subject_token_missing_subject_token(self, tmpdir): # Provide empty text file. empty_file = tmpdir.join("empty.txt")