From 44932cb8710e12279dbd4e9271577f8bee238980 Mon Sep 17 00:00:00 2001 From: MF2199 <38331387+mf2199@users.noreply.github.com> Date: Thu, 12 Nov 2020 12:08:21 -0500 Subject: [PATCH] feat: Backup Level IAM (#160) --- google/cloud/bigtable/backup.py | 52 ++++++++++++++++ tests/unit/test_backup.py | 107 ++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+) diff --git a/google/cloud/bigtable/backup.py b/google/cloud/bigtable/backup.py index 03a1c894e..291ac783a 100644 --- a/google/cloud/bigtable/backup.py +++ b/google/cloud/bigtable/backup.py @@ -21,6 +21,7 @@ BigtableTableAdminClient, ) from google.cloud.bigtable_admin_v2.types import table_pb2 +from google.cloud.bigtable.policy import Policy from google.cloud.exceptions import NotFound from google.protobuf import field_mask_pb2 @@ -392,3 +393,54 @@ def restore(self, table_id): """ api = self._instance._client.table_admin_client return api.restore_table(self._instance.name, table_id, self.name) + + def get_iam_policy(self): + """Gets the IAM access control policy for this backup. + + :rtype: :class:`google.cloud.bigtable.policy.Policy` + :returns: The current IAM policy of this backup. + """ + table_api = self._instance._client.table_admin_client + args = {"resource": self.name} + response = table_api.get_iam_policy(**args) + return Policy.from_pb(response) + + def set_iam_policy(self, policy): + """Sets the IAM access control policy for this backup. Replaces any + existing policy. + + For more information about policy, please see documentation of + class `google.cloud.bigtable.policy.Policy` + + :type policy: :class:`google.cloud.bigtable.policy.Policy` + :param policy: A new IAM policy to replace the current IAM policy + of this backup. + + :rtype: :class:`google.cloud.bigtable.policy.Policy` + :returns: The current IAM policy of this backup. + """ + table_api = self._instance._client.table_admin_client + response = table_api.set_iam_policy(resource=self.name, policy=policy.to_pb()) + return Policy.from_pb(response) + + def test_iam_permissions(self, permissions): + """Tests whether the caller has the given permissions for this backup. + Returns the permissions that the caller has. + + :type permissions: list + :param permissions: The set of permissions to check for + the ``resource``. Permissions with wildcards (such as '*' + or 'storage.*') are not allowed. For more information see + `IAM Overview + `_. + `Bigtable Permissions + `_. + + :rtype: list + :returns: A List(string) of permissions allowed on the backup. + """ + table_api = self._instance._client.table_admin_client + response = table_api.test_iam_permissions( + resource=self.name, permissions=permissions + ) + return list(response.permissions) diff --git a/tests/unit/test_backup.py b/tests/unit/test_backup.py index 2f263dffd..0285d668b 100644 --- a/tests/unit/test_backup.py +++ b/tests/unit/test_backup.py @@ -734,6 +734,113 @@ def test_restore_success(self): backup=self.BACKUP_NAME, ) + def test_get_iam_policy(self): + from google.cloud.bigtable.client import Client + from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client + from google.iam.v1 import policy_pb2 + from google.cloud.bigtable.policy import BIGTABLE_ADMIN_ROLE + + credentials = _make_credentials() + client = Client(project=self.PROJECT_ID, credentials=credentials, admin=True) + + instance = client.instance(instance_id=self.INSTANCE_ID) + backup = self._make_one(self.BACKUP_ID, instance, cluster_id=self.CLUSTER_ID) + + version = 1 + etag = b"etag_v1" + members = ["serviceAccount:service_acc1@test.com", "user:user1@test.com"] + bindings = [{"role": BIGTABLE_ADMIN_ROLE, "members": members}] + iam_policy = policy_pb2.Policy(version=version, etag=etag, bindings=bindings) + + table_api = mock.create_autospec( + bigtable_table_admin_client.BigtableTableAdminClient + ) + client._table_admin_client = table_api + table_api.get_iam_policy.return_value = iam_policy + + result = backup.get_iam_policy() + + table_api.get_iam_policy.assert_called_once_with(resource=backup.name) + self.assertEqual(result.version, version) + self.assertEqual(result.etag, etag) + + admins = result.bigtable_admins + self.assertEqual(len(admins), len(members)) + for found, expected in zip(sorted(admins), sorted(members)): + self.assertEqual(found, expected) + + def test_set_iam_policy(self): + from google.cloud.bigtable.client import Client + from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client + from google.iam.v1 import policy_pb2 + from google.cloud.bigtable.policy import Policy + from google.cloud.bigtable.policy import BIGTABLE_ADMIN_ROLE + + credentials = _make_credentials() + client = Client(project=self.PROJECT_ID, credentials=credentials, admin=True) + + instance = client.instance(instance_id=self.INSTANCE_ID) + backup = self._make_one(self.BACKUP_ID, instance, cluster_id=self.CLUSTER_ID) + + version = 1 + etag = b"etag_v1" + members = ["serviceAccount:service_acc1@test.com", "user:user1@test.com"] + bindings = [{"role": BIGTABLE_ADMIN_ROLE, "members": sorted(members)}] + iam_policy_pb = policy_pb2.Policy(version=version, etag=etag, bindings=bindings) + + table_api = mock.create_autospec( + bigtable_table_admin_client.BigtableTableAdminClient + ) + client._table_admin_client = table_api + table_api.set_iam_policy.return_value = iam_policy_pb + + iam_policy = Policy(etag=etag, version=version) + iam_policy[BIGTABLE_ADMIN_ROLE] = [ + Policy.user("user1@test.com"), + Policy.service_account("service_acc1@test.com"), + ] + + result = backup.set_iam_policy(iam_policy) + + table_api.set_iam_policy.assert_called_once_with( + resource=backup.name, policy=iam_policy_pb + ) + self.assertEqual(result.version, version) + self.assertEqual(result.etag, etag) + + admins = result.bigtable_admins + self.assertEqual(len(admins), len(members)) + for found, expected in zip(sorted(admins), sorted(members)): + self.assertEqual(found, expected) + + def test_test_iam_permissions(self): + from google.cloud.bigtable.client import Client + from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client + from google.iam.v1 import iam_policy_pb2 + + credentials = _make_credentials() + client = Client(project=self.PROJECT_ID, credentials=credentials, admin=True) + + instance = client.instance(instance_id=self.INSTANCE_ID) + backup = self._make_one(self.BACKUP_ID, instance, cluster_id=self.CLUSTER_ID) + + permissions = ["bigtable.backups.create", "bigtable.backups.list"] + + response = iam_policy_pb2.TestIamPermissionsResponse(permissions=permissions) + + table_api = mock.create_autospec( + bigtable_table_admin_client.BigtableTableAdminClient + ) + table_api.test_iam_permissions.return_value = response + client._table_admin_client = table_api + + result = backup.test_iam_permissions(permissions) + + self.assertEqual(result, permissions) + table_api.test_iam_permissions.assert_called_once_with( + resource=backup.name, permissions=permissions + ) + class _Client(object): def __init__(self, project=TestBackup.PROJECT_ID):