Skip to content

Commit

Permalink
feat: define google.auth.downscoped.Credentials class (#801)
Browse files Browse the repository at this point in the history
* feat: define `google.auth.downscoped.Credentials` class

This is based on [Downscoping with Credential Access Boundaries](https://cloud.google.com/iam/docs/downscoping-short-lived-credentials).
The new credentials are initialized mainly using elevated source
credentials and a `google.auth.downscoped.CredentialAccessBoundary`
instance.
The credentials will then get access tokens from the source
credentials and exchange them via the GCP STS token exchange
endpoint using the provided credentials access boundary rules
for downscoped access tokens.

The new credentials will inherit the source credentials' scopes
but the scopes are not exposed as we cannot always determine the
scopes form the source credentials.

* Fixes typos in comments.

* Addresses review comments.

* Moves all constants in the test file to module scope.
  • Loading branch information
bojeil-google committed Jul 9, 2021
1 parent d3944af commit 2f5c3a6
Show file tree
Hide file tree
Showing 2 changed files with 353 additions and 0 deletions.
86 changes: 86 additions & 0 deletions google/auth/downscoped.py
Expand Up @@ -48,9 +48,24 @@
.. _Downscoping with Credential Access Boundaries: https://cloud.google.com/iam/docs/downscoping-short-lived-credentials
"""

import datetime

from google.auth import _helpers
from google.auth import credentials
from google.oauth2 import sts

# The maximum number of access boundary rules a Credential Access Boundary can
# contain.
_MAX_ACCESS_BOUNDARY_RULES_COUNT = 10
# The token exchange grant_type used for exchanging credentials.
_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
# The token exchange requested_token_type. This is always an access_token.
_STS_REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
# The STS token URL used to exchanged a short lived access token for a downscoped one.
_STS_TOKEN_URL = "https://sts.googleapis.com/v1/token"
# The subject token type to use when exchanging a short lived access token for a
# downscoped token.
_STS_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"


class CredentialAccessBoundary(object):
Expand Down Expand Up @@ -403,3 +418,74 @@ def to_json(self):
if self.description:
json["description"] = self.description
return json


class Credentials(credentials.CredentialsWithQuotaProject):
"""Defines a set of Google credentials that are downscoped from an existing set
of Google OAuth2 credentials. This is useful to restrict the Identity and Access
Management (IAM) permissions that a short-lived credential can use.
The common pattern of usage is to have a token broker with elevated access
generate these downscoped credentials from higher access source credentials and
pass the downscoped short-lived access tokens to a token consumer via some
secure authenticated channel for limited access to Google Cloud Storage
resources.
"""

def __init__(
self, source_credentials, credential_access_boundary, quota_project_id=None
):
"""Instantiates a downscoped credentials object using the provided source
credentials and credential access boundary rules.
To downscope permissions of a source credential, a Credential Access Boundary
that specifies which resources the new credential can access, as well as an
upper bound on the permissions that are available on each resource, has to be
defined. A downscoped credential can then be instantiated using the source
credential and the Credential Access Boundary.
Args:
source_credentials (google.auth.credentials.Credentials): The source credentials
to be downscoped based on the provided Credential Access Boundary rules.
credential_access_boundary (google.auth.downscoped.CredentialAccessBoundary):
The Credential Access Boundary which contains a list of access boundary
rules. Each rule contains information on the resource that the rule applies to,
the upper bound of the permissions that are available on that resource and an
optional condition to further restrict permissions.
quota_project_id (Optional[str]): The optional quota project ID.
Raises:
google.auth.exceptions.RefreshError: If the source credentials
return an error on token refresh.
google.auth.exceptions.OAuthError: If the STS token exchange
endpoint returned an error during downscoped token generation.
"""

super(Credentials, self).__init__()
self._source_credentials = source_credentials
self._credential_access_boundary = credential_access_boundary
self._quota_project_id = quota_project_id
self._sts_client = sts.Client(_STS_TOKEN_URL)

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
# Generate an access token from the source credentials.
self._source_credentials.refresh(request)
now = _helpers.utcnow()
# Exchange the access token for a downscoped access token.
response_data = self._sts_client.exchange_token(
request=request,
grant_type=_STS_GRANT_TYPE,
subject_token=self._source_credentials.token,
subject_token_type=_STS_SUBJECT_TOKEN_TYPE,
requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
additional_options=self._credential_access_boundary.to_json(),
)
self.token = response_data.get("access_token")
lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
self.expiry = now + lifetime

@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
def with_quota_project(self, quota_project_id):
return self.__class__(
self._source_credentials,
self._credential_access_boundary,
quota_project_id=quota_project_id,
)
267 changes: 267 additions & 0 deletions tests/test_downscoped.py
Expand Up @@ -12,9 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json

import mock
import pytest
from six.moves import http_client
from six.moves import urllib

from google.auth import _helpers
from google.auth import credentials
from google.auth import downscoped
from google.auth import exceptions
from google.auth import transport


EXPRESSION = (
Expand All @@ -36,6 +46,54 @@
)
OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
SUCCESS_RESPONSE = {
"access_token": "ACCESS_TOKEN",
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
"token_type": "Bearer",
"expires_in": 3600,
}
ERROR_RESPONSE = {
"error": "invalid_grant",
"error_description": "Subject token is invalid.",
"error_uri": "https://tools.ietf.org/html/rfc6749",
}
CREDENTIAL_ACCESS_BOUNDARY_JSON = {
"accessBoundary": {
"accessBoundaryRules": [
{
"availablePermissions": AVAILABLE_PERMISSIONS,
"availableResource": AVAILABLE_RESOURCE,
"availabilityCondition": {
"expression": EXPRESSION,
"title": TITLE,
"description": DESCRIPTION,
},
}
]
}
}


class SourceCredentials(credentials.Credentials):
def __init__(self, raise_error=False):
super(SourceCredentials, self).__init__()
self._counter = 0
self._raise_error = raise_error

def refresh(self, request):
if self._raise_error:
raise exceptions.RefreshError(
"Failed to refresh access token in source credentials."
)
now = _helpers.utcnow()
self._counter += 1
self.token = "ACCESS_TOKEN_{}".format(self._counter)
self.expiry = now + datetime.timedelta(seconds=3600)


def make_availability_condition(expression, title=None, description=None):
Expand Down Expand Up @@ -383,3 +441,212 @@ def test_to_json(self):
]
}
}


class TestCredentials(object):
@staticmethod
def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)

return downscoped.Credentials(
source_credentials, credential_access_boundary, quota_project_id
)

@staticmethod
def make_mock_request(data, status=http_client.OK):
response = mock.create_autospec(transport.Response, instance=True)
response.status = status
response.data = json.dumps(data).encode("utf-8")

request = mock.create_autospec(transport.Request)
request.return_value = response

return request

@staticmethod
def assert_request_kwargs(request_kwargs, headers, request_data):
"""Asserts the request was called with the expected parameters.
"""
assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT
assert request_kwargs["method"] == "POST"
assert request_kwargs["headers"] == headers
assert request_kwargs["body"] is not None
body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
for (k, v) in body_tuples:
assert v.decode("utf-8") == request_data[k.decode("utf-8")]
assert len(body_tuples) == len(request_data.keys())

def test_default_state(self):
credentials = self.make_credentials()

# No token acquired yet.
assert not credentials.token
assert not credentials.valid
# Expiration hasn't been set yet.
assert not credentials.expiry
assert not credentials.expired
# No quota project ID set.
assert not credentials.quota_project_id

def test_with_quota_project(self):
credentials = self.make_credentials()

assert not credentials.quota_project_id

quota_project_creds = credentials.with_quota_project("project-foo")

assert quota_project_creds.quota_project_id == "project-foo"

@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh(self, unused_utcnow):
response = SUCCESS_RESPONSE.copy()
# Test custom expiration to confirm expiry is set correctly.
response["expires_in"] = 2800
expected_expiry = datetime.datetime.min + datetime.timedelta(
seconds=response["expires_in"]
)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
"grant_type": GRANT_TYPE,
"subject_token": "ACCESS_TOKEN_1",
"subject_token_type": SUBJECT_TOKEN_TYPE,
"requested_token_type": REQUESTED_TOKEN_TYPE,
"options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
}
request = self.make_mock_request(status=http_client.OK, data=response)
source_credentials = SourceCredentials()
credentials = self.make_credentials(source_credentials=source_credentials)

# Spy on calls to source credentials refresh to confirm the expected request
# instance is used.
with mock.patch.object(
source_credentials, "refresh", wraps=source_credentials.refresh
) as wrapped_souce_cred_refresh:
credentials.refresh(request)

self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == response["access_token"]
# Confirm source credentials called with the same request instance.
wrapped_souce_cred_refresh.assert_called_with(request)

def test_refresh_token_exchange_error(self):
request = self.make_mock_request(
status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
)
credentials = self.make_credentials()

with pytest.raises(exceptions.OAuthError) as excinfo:
credentials.refresh(request)

assert excinfo.match(
r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
)
assert not credentials.expired
assert credentials.token is None

def test_refresh_source_credentials_refresh_error(self):
# Initialize downscoped credentials with source credentials that raise
# an error on refresh.
credentials = self.make_credentials(
source_credentials=SourceCredentials(raise_error=True)
)

with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(mock.sentinel.request)

assert excinfo.match(r"Failed to refresh access token in source credentials.")
assert not credentials.expired
assert credentials.token is None

def test_apply_without_quota_project_id(self):
headers = {}
request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
credentials = self.make_credentials()

credentials.refresh(request)
credentials.apply(headers)

assert headers == {
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
}

def test_apply_with_quota_project_id(self):
headers = {"other": "header-value"}
request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)

credentials.refresh(request)
credentials.apply(headers)

assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": QUOTA_PROJECT_ID,
}

def test_before_request(self):
headers = {"other": "header-value"}
request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
credentials = self.make_credentials()

# First call should call refresh, setting the token.
credentials.before_request(request, "POST", "https://example.com/api", headers)

assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
}

# Second call shouldn't call refresh (request should be untouched).
credentials.before_request(
mock.sentinel.request, "POST", "https://example.com/api", headers
)

assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
}

@mock.patch("google.auth._helpers.utcnow")
def test_before_request_expired(self, utcnow):
headers = {}
request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
credentials = self.make_credentials()
credentials.token = "token"
utcnow.return_value = datetime.datetime.min
# Set the expiration to one second more than now plus the clock skew
# accommodation. These credentials should be valid.
credentials.expiry = (
datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
)

assert credentials.valid
assert not credentials.expired

credentials.before_request(request, "POST", "https://example.com/api", headers)

# Cached token should be used.
assert headers == {"authorization": "Bearer token"}

# Next call should simulate 1 second passed.
utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)

assert not credentials.valid
assert credentials.expired

credentials.before_request(request, "POST", "https://example.com/api", headers)

# New token should be retrieved.
assert headers == {
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
}

0 comments on commit 2f5c3a6

Please sign in to comment.