New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: define useful properties on google.auth.external_account.Credentials
#770
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,8 +28,10 @@ | |
""" | ||
|
||
import abc | ||
import copy | ||
import datetime | ||
import json | ||
import re | ||
|
||
import six | ||
|
||
|
@@ -40,6 +42,8 @@ | |
from google.oauth2 import sts | ||
from google.oauth2 import utils | ||
|
||
# External account JSON type identifier. | ||
_EXTERNAL_ACCOUNT_JSON_TYPE = "external_account" | ||
# The token exchange grant_type used for exchanging credentials. | ||
_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" | ||
# The token exchange requested_token_type. This is always an access_token. | ||
|
@@ -117,6 +121,76 @@ def __init__( | |
self._impersonated_credentials = None | ||
self._project_id = None | ||
|
||
@property | ||
def info(self): | ||
"""Generates the dictionary representation of the current credentials. | ||
|
||
Returns: | ||
Mapping: The dictionary representation of the credentials. This is the | ||
reverse of "from_info" defined on the subclasses of this class. It is | ||
useful for serializing the current credentials so it can deserialized | ||
later. | ||
""" | ||
config_info = { | ||
"type": _EXTERNAL_ACCOUNT_JSON_TYPE, | ||
"audience": self._audience, | ||
"subject_token_type": self._subject_token_type, | ||
"token_url": self._token_url, | ||
"service_account_impersonation_url": self._service_account_impersonation_url, | ||
"credential_source": copy.deepcopy(self._credential_source), | ||
"quota_project_id": self._quota_project_id, | ||
"client_id": self._client_id, | ||
"client_secret": self._client_secret, | ||
} | ||
# Remove None fields in the info dictionary. | ||
for k, v in dict(config_info).items(): | ||
if v is None: | ||
del config_info[k] | ||
|
||
return config_info | ||
|
||
@property | ||
def service_account_email(self): | ||
"""Returns the service account email if service account impersonation is used. | ||
|
||
Returns: | ||
Optional[str]: The service account email if impersonation is used. Otherwise | ||
None is returned. | ||
""" | ||
if self._service_account_impersonation_url: | ||
# Parse email from URL. The formal looks as follows: | ||
# https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken | ||
url = self._service_account_impersonation_url | ||
start_index = url.rfind("/") | ||
end_index = url.find(":generateAccessToken") | ||
if start_index != -1 and end_index != -1 and start_index < end_index: | ||
start_index = start_index + 1 | ||
return url[start_index:end_index] | ||
return None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although it is unusual that I would say so, this code would be clearer using a regex: service_account_regex = re.compile(r".*/(?P<email>.*):generateAccessToken")
...
if self._service_account_impersonation_url:
match = service_account_regex.match(self._service_account_impersonation_url)
if match is not None:
return match.group("email") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would disagree on this one. It is better/more efficient to use string methods when possible over regex. It does require a couple more steps to do so here but I provided an example for the format used. It should make it easier to understand and follow the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that you used a regex in a very similar case below, except without pulling the value from the group. Efficiency here is likely not germane, and debatable: it would need measuring, if we thought it important; clarity (to the reader) is far more crucial. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I appreciate your input but I think this is a very subjective situation. I would change it but I think this is a coin toss. |
||
|
||
@property | ||
def is_user(self): | ||
"""Returns whether the credentials represent a user (True) or workload (False). | ||
Workloads behave similarly to service accounts. Currently workloads will use | ||
service account impersonation but will eventually not require impersonation. | ||
As a result, this property is more reliable than the service account email | ||
property in determining if the credentials represent a user or workload. | ||
|
||
Returns: | ||
bool: True if the credentials represent a user. False if they represent a | ||
workload. | ||
""" | ||
# If service account impersonation is used, the credentials will always represent a | ||
# service account. | ||
if self._service_account_impersonation_url: | ||
return False | ||
# Workforce pools representing users have the following audience format: | ||
# //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId | ||
p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This regex should likely be compiled at module scope, rather than on each call (yes, I know the |
||
if p.match(self._audience): | ||
return True | ||
return False | ||
|
||
@property | ||
def requires_scopes(self): | ||
"""Checks if the credentials requires scopes. | ||
|
@@ -282,14 +356,8 @@ def _initialize_impersonated_credentials(self): | |
) | ||
|
||
# Determine target_principal. | ||
start_index = self._service_account_impersonation_url.rfind("/") | ||
end_index = self._service_account_impersonation_url.find(":generateAccessToken") | ||
if start_index != -1 and end_index != -1 and start_index < end_index: | ||
start_index = start_index + 1 | ||
target_principal = self._service_account_impersonation_url[ | ||
start_index:end_index | ||
] | ||
else: | ||
target_principal = self.service_account_email | ||
if not target_principal: | ||
raise exceptions.RefreshError( | ||
"Unable to determine target principal from service account impersonation URL." | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, deleting an item from a dictionary while iterating it is unsafe. It would be better to create a new one, e.g.:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I can update this in a follow up.