From d883921ae8fdc92b2c2cf1b3a5cd389e1287eb60 Mon Sep 17 00:00:00 2001 From: bojeil-google Date: Thu, 8 Jul 2021 10:56:22 -0700 Subject: [PATCH] feat: define `CredentialAccessBoundary` classes (#793) Defines the following classes: - `google.auth.downscoped.CredentialAccessBoundary` - `google.auth.downscoped.AccessBoundaryRule` - `google.auth.downscoped.AvailabilityCondition` This is based on [Downscoping with Credential Access Boundaries](https://cloud.google.com/iam/docs/downscoping-short-lived-credentials). These classes help define the list of access boundary rules, each of which contains information on the resource that the rule applies to, the upper bound of the permissions that are available on that resource and an optional condition to further restrict permissions. --- google/auth/downscoped.py | 405 ++++++++++++++++++++++++++++++++++++++ tests/test_downscoped.py | 385 ++++++++++++++++++++++++++++++++++++ 2 files changed, 790 insertions(+) create mode 100644 google/auth/downscoped.py create mode 100644 tests/test_downscoped.py diff --git a/google/auth/downscoped.py b/google/auth/downscoped.py new file mode 100644 index 000000000..beea50ec7 --- /dev/null +++ b/google/auth/downscoped.py @@ -0,0 +1,405 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Downscoping with Credential Access Boundaries + +This module provides the ability to downscope credentials using +`Downscoping with Credential Access Boundaries`_. This is useful to restrict the +Identity and Access Management (IAM) permissions that a short-lived credential +can use. + +To downscope permissions of a source credential, a Credential Access Boundary +that specifies which resources the new credential can access, as well as +an upper bound on the permissions that are available on each resource, has to +be defined. A downscoped credential can then be instantiated using the source +credential and the Credential Access Boundary. + +The common pattern of usage is to have a token broker with elevated access +generate these downscoped credentials from higher access source credentials and +pass the downscoped short-lived access tokens to a token consumer via some +secure authenticated channel for limited access to Google Cloud Storage +resources. + +For example, a token broker can be set up on a server in a private network. +Various workloads (token consumers) in the same network will send authenticated +requests to that broker for downscoped tokens to access or modify specific google +cloud storage buckets. + +The broker will instantiate downscoped credentials instances that can be used to +generate short lived downscoped access tokens that can be passed to the token +consumer. These downscoped access tokens can be injected by the consumer into +google.oauth2.Credentials and used to initialize a storage client instance to +access Google Cloud Storage resources with restricted access. + +Note: Only Cloud Storage supports Credential Access Boundaries. Other Google +Cloud services do not support this feature. + +.. _Downscoping with Credential Access Boundaries: https://cloud.google.com/iam/docs/downscoping-short-lived-credentials +""" + +# The maximum number of access boundary rules a Credential Access Boundary can +# contain. +_MAX_ACCESS_BOUNDARY_RULES_COUNT = 10 + + +class CredentialAccessBoundary(object): + """Defines a Credential Access Boundary which contains a list of access boundary + rules. Each rule contains information on the resource that the rule applies to, + the upper bound of the permissions that are available on that resource and an + optional condition to further restrict permissions. + """ + + def __init__(self, rules=[]): + """Instantiates a Credential Access Boundary. A Credential Access Boundary + can contain up to 10 access boundary rules. + + Args: + rules (Sequence[google.auth.downscoped.AccessBoundaryRule]): The list of + access boundary rules limiting the access that a downscoped credential + will have. + Raises: + TypeError: If any of the rules are not a valid type. + ValueError: If the provided rules exceed the maximum allowed. + """ + self.rules = rules + + @property + def rules(self): + """Returns the list of access boundary rules defined on the Credential + Access Boundary. + + Returns: + Tuple[google.auth.downscoped.AccessBoundaryRule, ...]: The list of access + boundary rules defined on the Credential Access Boundary. These are returned + as an immutable tuple to prevent modification. + """ + return tuple(self._rules) + + @rules.setter + def rules(self, value): + """Updates the current rules on the Credential Access Boundary. This will overwrite + the existing set of rules. + + Args: + value (Sequence[google.auth.downscoped.AccessBoundaryRule]): The list of + access boundary rules limiting the access that a downscoped credential + will have. + Raises: + TypeError: If any of the rules are not a valid type. + ValueError: If the provided rules exceed the maximum allowed. + """ + if len(value) > _MAX_ACCESS_BOUNDARY_RULES_COUNT: + raise ValueError( + "Credential access boundary rules can have a maximum of {} rules.".format( + _MAX_ACCESS_BOUNDARY_RULES_COUNT + ) + ) + for access_boundary_rule in value: + if not isinstance(access_boundary_rule, AccessBoundaryRule): + raise TypeError( + "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." + ) + # Make a copy of the original list. + self._rules = list(value) + + def add_rule(self, rule): + """Adds a single access boundary rule to the existing rules. + + Args: + rule (google.auth.downscoped.AccessBoundaryRule): The access boundary rule, + limiting the access that a downscoped credential will have, to be added to + the existing rules. + Raises: + TypeError: If any of the rules are not a valid type. + ValueError: If the provided rules exceed the maximum allowed. + """ + if len(self.rules) == _MAX_ACCESS_BOUNDARY_RULES_COUNT: + raise ValueError( + "Credential access boundary rules can have a maximum of {} rules.".format( + _MAX_ACCESS_BOUNDARY_RULES_COUNT + ) + ) + if not isinstance(rule, AccessBoundaryRule): + raise TypeError( + "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." + ) + self._rules.append(rule) + + def to_json(self): + """Generates the dictionary representation of the Credential Access Boundary. + This uses the format expected by the Security Token Service API as documented in + `Defining a Credential Access Boundary`_. + + .. _Defining a Credential Access Boundary: + https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary + + Returns: + Mapping: Credential Access Boundary Rule represented in a dictionary object. + """ + rules = [] + for access_boundary_rule in self.rules: + rules.append(access_boundary_rule.to_json()) + + return {"accessBoundary": {"accessBoundaryRules": rules}} + + +class AccessBoundaryRule(object): + """Defines an access boundary rule which contains information on the resource that + the rule applies to, the upper bound of the permissions that are available on that + resource and an optional condition to further restrict permissions. + """ + + def __init__( + self, available_resource, available_permissions, availability_condition=None + ): + """Instantiates a single access boundary rule. + + Args: + available_resource (str): The full resource name of the Cloud Storage bucket + that the rule applies to. Use the format + "//storage.googleapis.com/projects/_/buckets/bucket-name". + available_permissions (Sequence[str]): A list defining the upper bound that + the downscoped token will have on the available permissions for the + resource. Each value is the identifier for an IAM predefined role or + custom role, with the prefix "inRole:". For example: + "inRole:roles/storage.objectViewer". + Only the permissions in these roles will be available. + availability_condition (Optional[google.auth.downscoped.AvailabilityCondition]): + Optional condition that restricts the availability of permissions to + specific Cloud Storage objects. + + Raises: + TypeError: If any of the parameters are not of the expected types. + ValueError: If any of the parameters are not of the expected values. + """ + self.available_resource = available_resource + self.available_permissions = available_permissions + self.availability_condition = availability_condition + + @property + def available_resource(self): + """Returns the current available resource. + + Returns: + str: The current available resource. + """ + return self._available_resource + + @available_resource.setter + def available_resource(self, value): + """Updates the current available resource. + + Args: + value (str): The updated value of the available resource. + + Raises: + TypeError: If the value is not a string. + """ + if not isinstance(value, str): + raise TypeError("The provided available_resource is not a string.") + self._available_resource = value + + @property + def available_permissions(self): + """Returns the current available permissions. + + Returns: + Tuple[str, ...]: The current available permissions. These are returned + as an immutable tuple to prevent modification. + """ + return tuple(self._available_permissions) + + @available_permissions.setter + def available_permissions(self, value): + """Updates the current available permissions. + + Args: + value (Sequence[str]): The updated value of the available permissions. + + Raises: + TypeError: If the value is not a list of strings. + ValueError: If the value is not valid. + """ + for available_permission in value: + if not isinstance(available_permission, str): + raise TypeError( + "Provided available_permissions are not a list of strings." + ) + if available_permission.find("inRole:") != 0: + raise ValueError( + "available_permissions must be prefixed with 'inRole:'." + ) + # Make a copy of the original list. + self._available_permissions = list(value) + + @property + def availability_condition(self): + """Returns the current availability condition. + + Returns: + Optional[google.auth.downscoped.AvailabilityCondition]: The current + availability condition. + """ + return self._availability_condition + + @availability_condition.setter + def availability_condition(self, value): + """Updates the current availability condition. + + Args: + value (Optional[google.auth.downscoped.AvailabilityCondition]): The updated + value of the availability condition. + + Raises: + TypeError: If the value is not of type google.auth.downscoped.AvailabilityCondition + or None. + """ + if not isinstance(value, AvailabilityCondition) and value is not None: + raise TypeError( + "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None." + ) + self._availability_condition = value + + def to_json(self): + """Generates the dictionary representation of the access boundary rule. + This uses the format expected by the Security Token Service API as documented in + `Defining a Credential Access Boundary`_. + + .. _Defining a Credential Access Boundary: + https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary + + Returns: + Mapping: The access boundary rule represented in a dictionary object. + """ + json = { + "availablePermissions": list(self.available_permissions), + "availableResource": self.available_resource, + } + if self.availability_condition: + json["availabilityCondition"] = self.availability_condition.to_json() + return json + + +class AvailabilityCondition(object): + """An optional condition that can be used as part of a Credential Access Boundary + to further restrict permissions.""" + + def __init__(self, expression, title=None, description=None): + """Instantiates an availability condition using the provided expression and + optional title or description. + + Args: + expression (str): A condition expression that specifies the Cloud Storage + objects where permissions are available. For example, this expression + makes permissions available for objects whose name starts with "customer-a": + "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')" + title (Optional[str]): An optional short string that identifies the purpose of + the condition. + description (Optional[str]): Optional details about the purpose of the condition. + + Raises: + TypeError: If any of the parameters are not of the expected types. + ValueError: If any of the parameters are not of the expected values. + """ + self.expression = expression + self.title = title + self.description = description + + @property + def expression(self): + """Returns the current condition expression. + + Returns: + str: The current conditon expression. + """ + return self._expression + + @expression.setter + def expression(self, value): + """Updates the current condition expression. + + Args: + value (str): The updated value of the condition expression. + + Raises: + TypeError: If the value is not of type string. + """ + if not isinstance(value, str): + raise TypeError("The provided expression is not a string.") + self._expression = value + + @property + def title(self): + """Returns the current title. + + Returns: + Optional[str]: The current title. + """ + return self._title + + @title.setter + def title(self, value): + """Updates the current title. + + Args: + value (Optional[str]): The updated value of the title. + + Raises: + TypeError: If the value is not of type string or None. + """ + if not isinstance(value, str) and value is not None: + raise TypeError("The provided title is not a string or None.") + self._title = value + + @property + def description(self): + """Returns the current description. + + Returns: + Optional[str]: The current description. + """ + return self._description + + @description.setter + def description(self, value): + """Updates the current description. + + Args: + value (Optional[str]): The updated value of the description. + + Raises: + TypeError: If the value is not of type string or None. + """ + if not isinstance(value, str) and value is not None: + raise TypeError("The provided description is not a string or None.") + self._description = value + + def to_json(self): + """Generates the dictionary representation of the availability condition. + This uses the format expected by the Security Token Service API as documented in + `Defining a Credential Access Boundary`_. + + .. _Defining a Credential Access Boundary: + https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary + + Returns: + Mapping[str, str]: The availability condition represented in a dictionary + object. + """ + json = {"expression": self.expression} + if self.title: + json["title"] = self.title + if self.description: + json["description"] = self.description + return json diff --git a/tests/test_downscoped.py b/tests/test_downscoped.py new file mode 100644 index 000000000..61b1d1876 --- /dev/null +++ b/tests/test_downscoped.py @@ -0,0 +1,385 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.auth import downscoped + + +EXPRESSION = ( + "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')" +) +TITLE = "customer-a-objects" +DESCRIPTION = ( + "Condition to make permissions available for objects starting with customer-a" +) +AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket" +AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"] + +OTHER_EXPRESSION = ( + "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')" +) +OTHER_TITLE = "customer-b-objects" +OTHER_DESCRIPTION = ( + "Condition to make permissions available for objects starting with customer-b" +) +OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket" +OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"] + + +def make_availability_condition(expression, title=None, description=None): + return downscoped.AvailabilityCondition(expression, title, description) + + +def make_access_boundary_rule( + available_resource, available_permissions, availability_condition=None +): + return downscoped.AccessBoundaryRule( + available_resource, available_permissions, availability_condition + ) + + +def make_credential_access_boundary(rules): + return downscoped.CredentialAccessBoundary(rules) + + +class TestAvailabilityCondition(object): + def test_constructor(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + + assert availability_condition.expression == EXPRESSION + assert availability_condition.title == TITLE + assert availability_condition.description == DESCRIPTION + + def test_constructor_required_params_only(self): + availability_condition = make_availability_condition(EXPRESSION) + + assert availability_condition.expression == EXPRESSION + assert availability_condition.title is None + assert availability_condition.description is None + + def test_setters(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + availability_condition.expression = OTHER_EXPRESSION + availability_condition.title = OTHER_TITLE + availability_condition.description = OTHER_DESCRIPTION + + assert availability_condition.expression == OTHER_EXPRESSION + assert availability_condition.title == OTHER_TITLE + assert availability_condition.description == OTHER_DESCRIPTION + + def test_invalid_expression_type(self): + with pytest.raises(TypeError) as excinfo: + make_availability_condition([EXPRESSION], TITLE, DESCRIPTION) + + assert excinfo.match("The provided expression is not a string.") + + def test_invalid_title_type(self): + with pytest.raises(TypeError) as excinfo: + make_availability_condition(EXPRESSION, False, DESCRIPTION) + + assert excinfo.match("The provided title is not a string or None.") + + def test_invalid_description_type(self): + with pytest.raises(TypeError) as excinfo: + make_availability_condition(EXPRESSION, TITLE, False) + + assert excinfo.match("The provided description is not a string or None.") + + def test_to_json_required_params_only(self): + availability_condition = make_availability_condition(EXPRESSION) + + assert availability_condition.to_json() == {"expression": EXPRESSION} + + def test_to_json_(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + + assert availability_condition.to_json() == { + "expression": EXPRESSION, + "title": TITLE, + "description": DESCRIPTION, + } + + +class TestAccessBoundaryRule(object): + def test_constructor(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + + assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE + assert access_boundary_rule.available_permissions == tuple( + AVAILABLE_PERMISSIONS + ) + assert access_boundary_rule.availability_condition == availability_condition + + def test_constructor_required_params_only(self): + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS + ) + + assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE + assert access_boundary_rule.available_permissions == tuple( + AVAILABLE_PERMISSIONS + ) + assert access_boundary_rule.availability_condition is None + + def test_setters(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + other_availability_condition = make_availability_condition( + OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE + access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS + access_boundary_rule.availability_condition = other_availability_condition + + assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE + assert access_boundary_rule.available_permissions == tuple( + OTHER_AVAILABLE_PERMISSIONS + ) + assert ( + access_boundary_rule.availability_condition == other_availability_condition + ) + + def test_invalid_available_resource_type(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + with pytest.raises(TypeError) as excinfo: + make_access_boundary_rule( + None, AVAILABLE_PERMISSIONS, availability_condition + ) + + assert excinfo.match("The provided available_resource is not a string.") + + def test_invalid_available_permissions_type(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + with pytest.raises(TypeError) as excinfo: + make_access_boundary_rule( + AVAILABLE_RESOURCE, [0, 1, 2], availability_condition + ) + + assert excinfo.match( + "Provided available_permissions are not a list of strings." + ) + + def test_invalid_available_permissions_value(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + with pytest.raises(ValueError) as excinfo: + make_access_boundary_rule( + AVAILABLE_RESOURCE, + ["roles/storage.objectViewer"], + availability_condition, + ) + + assert excinfo.match("available_permissions must be prefixed with 'inRole:'.") + + def test_invalid_availability_condition_type(self): + with pytest.raises(TypeError) as excinfo: + make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"} + ) + + assert excinfo.match( + "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None." + ) + + def test_to_json(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + + assert access_boundary_rule.to_json() == { + "availablePermissions": AVAILABLE_PERMISSIONS, + "availableResource": AVAILABLE_RESOURCE, + "availabilityCondition": { + "expression": EXPRESSION, + "title": TITLE, + "description": DESCRIPTION, + }, + } + + def test_to_json_required_params_only(self): + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS + ) + + assert access_boundary_rule.to_json() == { + "availablePermissions": AVAILABLE_PERMISSIONS, + "availableResource": AVAILABLE_RESOURCE, + } + + +class TestCredentialAccessBoundary(object): + def test_constructor(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + + assert credential_access_boundary.rules == tuple(rules) + + def test_setters(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + other_availability_condition = make_availability_condition( + OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION + ) + other_access_boundary_rule = make_access_boundary_rule( + OTHER_AVAILABLE_RESOURCE, + OTHER_AVAILABLE_PERMISSIONS, + other_availability_condition, + ) + other_rules = [other_access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + credential_access_boundary.rules = other_rules + + assert credential_access_boundary.rules == tuple(other_rules) + + def test_add_rule(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] * 9 + credential_access_boundary = make_credential_access_boundary(rules) + + # Add one more rule. This should not raise an error. + additional_access_boundary_rule = make_access_boundary_rule( + OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS + ) + credential_access_boundary.add_rule(additional_access_boundary_rule) + + assert len(credential_access_boundary.rules) == 10 + assert credential_access_boundary.rules[9] == additional_access_boundary_rule + + def test_add_rule_invalid_value(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] * 10 + credential_access_boundary = make_credential_access_boundary(rules) + + # Add one more rule to exceed maximum allowed rules. + with pytest.raises(ValueError) as excinfo: + credential_access_boundary.add_rule(access_boundary_rule) + + assert excinfo.match( + "Credential access boundary rules can have a maximum of 10 rules." + ) + assert len(credential_access_boundary.rules) == 10 + + def test_add_rule_invalid_type(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + + # Add an invalid rule to exceed maximum allowed rules. + with pytest.raises(TypeError) as excinfo: + credential_access_boundary.add_rule("invalid") + + assert excinfo.match( + "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." + ) + assert len(credential_access_boundary.rules) == 1 + assert credential_access_boundary.rules[0] == access_boundary_rule + + def test_invalid_rules_type(self): + with pytest.raises(TypeError) as excinfo: + make_credential_access_boundary(["invalid"]) + + assert excinfo.match( + "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." + ) + + def test_invalid_rules_value(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + too_many_rules = [access_boundary_rule] * 11 + with pytest.raises(ValueError) as excinfo: + make_credential_access_boundary(too_many_rules) + + assert excinfo.match( + "Credential access boundary rules can have a maximum of 10 rules." + ) + + def test_to_json(self): + availability_condition = make_availability_condition( + EXPRESSION, TITLE, DESCRIPTION + ) + access_boundary_rule = make_access_boundary_rule( + AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition + ) + rules = [access_boundary_rule] + credential_access_boundary = make_credential_access_boundary(rules) + + assert credential_access_boundary.to_json() == { + "accessBoundary": { + "accessBoundaryRules": [ + { + "availablePermissions": AVAILABLE_PERMISSIONS, + "availableResource": AVAILABLE_RESOURCE, + "availabilityCondition": { + "expression": EXPRESSION, + "title": TITLE, + "description": DESCRIPTION, + }, + } + ] + } + }