Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Backup Level IAM #160

Merged
merged 1 commit into from Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 52 additions & 0 deletions google/cloud/bigtable/backup.py
Expand Up @@ -21,6 +21,7 @@
BigtableTableAdminClient,
)
from google.cloud.bigtable_admin_v2.types import table_pb2
from google.cloud.bigtable.policy import Policy
from google.cloud.exceptions import NotFound
from google.protobuf import field_mask_pb2

Expand Down Expand Up @@ -392,3 +393,54 @@ def restore(self, table_id):
"""
api = self._instance._client.table_admin_client
return api.restore_table(self._instance.name, table_id, self.name)

def get_iam_policy(self):
"""Gets the IAM access control policy for this backup.

:rtype: :class:`google.cloud.bigtable.policy.Policy`
:returns: The current IAM policy of this backup.
"""
table_api = self._instance._client.table_admin_client
args = {"resource": self.name}
response = table_api.get_iam_policy(**args)
return Policy.from_pb(response)

def set_iam_policy(self, policy):
"""Sets the IAM access control policy for this backup. Replaces any
existing policy.

For more information about policy, please see documentation of
class `google.cloud.bigtable.policy.Policy`

:type policy: :class:`google.cloud.bigtable.policy.Policy`
:param policy: A new IAM policy to replace the current IAM policy
of this backup.

:rtype: :class:`google.cloud.bigtable.policy.Policy`
:returns: The current IAM policy of this backup.
"""
table_api = self._instance._client.table_admin_client
response = table_api.set_iam_policy(resource=self.name, policy=policy.to_pb())
return Policy.from_pb(response)

def test_iam_permissions(self, permissions):
"""Tests whether the caller has the given permissions for this backup.
Returns the permissions that the caller has.

:type permissions: list
:param permissions: The set of permissions to check for
the ``resource``. Permissions with wildcards (such as '*'
or 'storage.*') are not allowed. For more information see
`IAM Overview
<https://cloud.google.com/iam/docs/overview#permissions>`_.
`Bigtable Permissions
<https://cloud.google.com/bigtable/docs/access-control>`_.

:rtype: list
:returns: A List(string) of permissions allowed on the backup.
"""
table_api = self._instance._client.table_admin_client
response = table_api.test_iam_permissions(
resource=self.name, permissions=permissions
)
return list(response.permissions)
107 changes: 107 additions & 0 deletions tests/unit/test_backup.py
Expand Up @@ -734,6 +734,113 @@ def test_restore_success(self):
backup=self.BACKUP_NAME,
)

def test_get_iam_policy(self):
from google.cloud.bigtable.client import Client
from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client
from google.iam.v1 import policy_pb2
from google.cloud.bigtable.policy import BIGTABLE_ADMIN_ROLE

credentials = _make_credentials()
client = Client(project=self.PROJECT_ID, credentials=credentials, admin=True)

instance = client.instance(instance_id=self.INSTANCE_ID)
backup = self._make_one(self.BACKUP_ID, instance, cluster_id=self.CLUSTER_ID)

version = 1
etag = b"etag_v1"
members = ["serviceAccount:service_acc1@test.com", "user:user1@test.com"]
bindings = [{"role": BIGTABLE_ADMIN_ROLE, "members": members}]
iam_policy = policy_pb2.Policy(version=version, etag=etag, bindings=bindings)

table_api = mock.create_autospec(
bigtable_table_admin_client.BigtableTableAdminClient
)
client._table_admin_client = table_api
table_api.get_iam_policy.return_value = iam_policy

result = backup.get_iam_policy()

table_api.get_iam_policy.assert_called_once_with(resource=backup.name)
self.assertEqual(result.version, version)
self.assertEqual(result.etag, etag)

admins = result.bigtable_admins
self.assertEqual(len(admins), len(members))
for found, expected in zip(sorted(admins), sorted(members)):
self.assertEqual(found, expected)

def test_set_iam_policy(self):
from google.cloud.bigtable.client import Client
from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client
from google.iam.v1 import policy_pb2
from google.cloud.bigtable.policy import Policy
from google.cloud.bigtable.policy import BIGTABLE_ADMIN_ROLE

credentials = _make_credentials()
client = Client(project=self.PROJECT_ID, credentials=credentials, admin=True)

instance = client.instance(instance_id=self.INSTANCE_ID)
backup = self._make_one(self.BACKUP_ID, instance, cluster_id=self.CLUSTER_ID)

version = 1
etag = b"etag_v1"
members = ["serviceAccount:service_acc1@test.com", "user:user1@test.com"]
bindings = [{"role": BIGTABLE_ADMIN_ROLE, "members": sorted(members)}]
iam_policy_pb = policy_pb2.Policy(version=version, etag=etag, bindings=bindings)

table_api = mock.create_autospec(
bigtable_table_admin_client.BigtableTableAdminClient
)
client._table_admin_client = table_api
table_api.set_iam_policy.return_value = iam_policy_pb

iam_policy = Policy(etag=etag, version=version)
iam_policy[BIGTABLE_ADMIN_ROLE] = [
Policy.user("user1@test.com"),
Policy.service_account("service_acc1@test.com"),
]

result = backup.set_iam_policy(iam_policy)

table_api.set_iam_policy.assert_called_once_with(
resource=backup.name, policy=iam_policy_pb
)
self.assertEqual(result.version, version)
self.assertEqual(result.etag, etag)

admins = result.bigtable_admins
self.assertEqual(len(admins), len(members))
for found, expected in zip(sorted(admins), sorted(members)):
self.assertEqual(found, expected)

def test_test_iam_permissions(self):
from google.cloud.bigtable.client import Client
from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client
from google.iam.v1 import iam_policy_pb2

credentials = _make_credentials()
client = Client(project=self.PROJECT_ID, credentials=credentials, admin=True)

instance = client.instance(instance_id=self.INSTANCE_ID)
backup = self._make_one(self.BACKUP_ID, instance, cluster_id=self.CLUSTER_ID)

permissions = ["bigtable.backups.create", "bigtable.backups.list"]

response = iam_policy_pb2.TestIamPermissionsResponse(permissions=permissions)

table_api = mock.create_autospec(
bigtable_table_admin_client.BigtableTableAdminClient
)
table_api.test_iam_permissions.return_value = response
client._table_admin_client = table_api

result = backup.test_iam_permissions(permissions)

self.assertEqual(result, permissions)
table_api.test_iam_permissions.assert_called_once_with(
resource=backup.name, permissions=permissions
)


class _Client(object):
def __init__(self, project=TestBackup.PROJECT_ID):
Expand Down