Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Backup Level IAM #3222

Merged
merged 17 commits into from Dec 29, 2020
Merged
25 changes: 14 additions & 11 deletions bigtable/admin.go
Expand Up @@ -115,6 +115,10 @@ func (ac *AdminClient) instancePrefix() string {
return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance)
}

func (ac *AdminClient) backupPath(cluster, backup string) string {
return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, ac.instance, cluster, backup)
}

// Tables returns a list of the tables in the instance.
func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
Expand Down Expand Up @@ -597,12 +601,17 @@ func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) err
}
}

// TableIAM creates an IAM client specific to a given Instance and Table within the configured project.
// TableIAM creates an IAM Handle specific to a given Instance and Table within the configured project.
func (ac *AdminClient) TableIAM(tableID string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient,
"projects/"+ac.project+"/instances/"+ac.instance+"/tables/"+tableID)
}

// BackupIAM creates an IAM Handle specific to a given Cluster and Backup.
func (ac *AdminClient) BackupIAM(cluster, backup string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, backup))
}

const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
const mtlsInstanceAdminAddr = "bigtableadmin.mtls.googleapis.com:443"

Expand Down Expand Up @@ -1443,7 +1452,7 @@ func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient
func (ac *AdminClient) RestoreTable(ctx context.Context, table, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
backupPath := prefix + "/clusters/" + cluster + "/backups/" + backup
backupPath := ac.backupPath(cluster, backup)

req := &btapb.RestoreTableRequest{
Parent: prefix,
Expand Down Expand Up @@ -1603,9 +1612,7 @@ type BackupInfo struct {
// BackupInfo gets backup metadata.
func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (*BackupInfo, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup
backupPath := ac.backupPath(cluster, backup)

req := &btapb.GetBackupRequest{
Name: backupPath,
Expand All @@ -1627,9 +1634,7 @@ func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (
// DeleteBackup deletes a backup in a cluster.
func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup
backupPath := ac.backupPath(cluster, backup)

req := &btapb.DeleteBackupRequest{
Name: backupPath,
Expand All @@ -1641,9 +1646,7 @@ func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string)
// UpdateBackup updates the backup metadata in a cluster. The API only supports updating expire time.
func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string, expireTime time.Time) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup
backupPath := ac.backupPath(cluster, backup)

expireTimestamp, err := ptypes.TimestampProto(expireTime)
if err != nil {
Expand Down
80 changes: 73 additions & 7 deletions bigtable/integration_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

"cloud.google.com/go/iam"
"cloud.google.com/go/internal"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
Expand Down Expand Up @@ -1266,6 +1267,70 @@ func TestIntegration_TableIam(t *testing.T) {
}
}

func TestIntegration_BackupIAM(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is an integration test file, it feels like we shouldn't be mocking out the interactions. It's probably enough to follow the pattern of TestIntegration_TableIam (which should be IAM, but that's another story 🙃).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. @kolea2 do you know how other languages are handling this testing, and if we will face any quota-related issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange, but I got an error while comparing policies without mocking. I had an Etag mismatch, even in the read-modify-write cycle.
From documentation:

It is strongly suggested that systems make use of the etag in the read-modify-write cycle to perform policy updates in order to avoid race conditions: An etag is returned in the response to getIamPolicy, and systems are expected to put that etag in the request to setIamPolicy to ensure that their change will be applied to the same version of the policy.

image

Will wait for response!

P.S. Anyway, I can also rename TestIntegration_TableIam to TestIntegration_TableIAM here :)
It's related to this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Etag should only change when there is a modification. So, something else must have modified the value between the initial read and the subsequent attempted write. Mind uploading the updated test without the mock, even if it isn't passing yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sure!
Uploaded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tritone for Java we take the existing IAM policy and reset it. Probably could be improved: https://github.com/googleapis/java-bigtable/blob/master/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableTableAdminClientIT.java#L194-L217. I haven't seen any quota related issues so far.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tbpg @tritone any additional thoughts here?

testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()

if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support IAM Policy creation")
}
timeout := 5 * time.Minute
ctx, _ := context.WithTimeout(context.Background(), timeout)

adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()

table := testEnv.Config().Table
cluster := testEnv.Config().Cluster

defer deleteTable(ctx, t, adminClient, table)
if err := adminClient.CreateTable(ctx, table); err != nil {
t.Fatalf("Creating table: %v", err)
}
// Create backup.
backup := "backup"
defer adminClient.DeleteBackup(ctx, cluster, backup)
if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}
iamHandle := adminClient.BackupIAM(cluster, backup)
// Get backup policy.
p, err := iamHandle.Policy(ctx)
if err != nil {
t.Errorf("iamHandle.Policy: %v", err)
}
// The resource is new, so the policy should be empty.
if got := p.Roles(); len(got) > 0 {
t.Errorf("got roles %v, want none", got)
}
// Set backup policy.
member := "domain:google.com"
// Add a member, set the policy, then check that the member is present.
p.Add(member, iam.Viewer)
if err = iamHandle.SetPolicy(ctx, p); err != nil {
t.Errorf("iamHandle.SetPolicy: %v", err)
}
p, err = iamHandle.Policy(ctx)
if err != nil {
t.Errorf("iamHandle.Policy: %v", err)
}
if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
t.Errorf("iamHandle.Policy: got %v, want %v", got, want)
}
// Test backup permissions.
permissions := []string{"bigtable.backups.get", "bigtable.backups.update"}
_, err = iamHandle.TestPermissions(ctx, permissions)
if err != nil {
t.Errorf("iamHandle.TestPermissions: %v", err)
}
}

func TestIntegration_AdminCreateInstance(t *testing.T) {
if instanceToCreate == "" {
t.Skip("instanceToCreate not set, skipping instance creation testing")
Expand Down Expand Up @@ -2068,9 +2133,10 @@ func TestIntegration_AdminBackup(t *testing.T) {
}

// Create backup
backupName := "mybackup"
defer adminClient.DeleteBackup(ctx, cluster, "mybackup")

if err = adminClient.CreateBackup(ctx, table, cluster, "mybackup", time.Now().Add(8*time.Hour)); err != nil {
if err = adminClient.CreateBackup(ctx, table, cluster, backupName, time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}

Expand All @@ -2082,7 +2148,7 @@ func TestIntegration_AdminBackup(t *testing.T) {
if got, want := len(backups), 1; got != want {
t.Fatalf("Listing backup count: %d, want: %d", got, want)
}
if got, want := backups[0].Name, "mybackup"; got != want {
if got, want := backups[0].Name, backupName; got != want {
t.Fatalf("Backup name: %s, want: %s", got, want)
}
if got, want := backups[0].SourceTable, table; got != want {
Expand All @@ -2093,7 +2159,7 @@ func TestIntegration_AdminBackup(t *testing.T) {
}

// Get backup
backup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
backup, err := adminClient.BackupInfo(ctx, cluster, backupName)
if err != nil {
t.Fatalf("BackupInfo: %v", backup)
}
Expand All @@ -2103,13 +2169,13 @@ func TestIntegration_AdminBackup(t *testing.T) {

// Update backup
newExpireTime := time.Now().Add(10 * time.Hour)
err = adminClient.UpdateBackup(ctx, cluster, "mybackup", newExpireTime)
err = adminClient.UpdateBackup(ctx, cluster, backupName, newExpireTime)
if err != nil {
t.Fatalf("UpdateBackup failed: %v", err)
}

// Check that updated backup has the correct expire time
updatedBackup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
updatedBackup, err := adminClient.BackupInfo(ctx, cluster, backupName)
if err != nil {
t.Fatalf("BackupInfo: %v", err)
}
Expand All @@ -2122,15 +2188,15 @@ func TestIntegration_AdminBackup(t *testing.T) {
// Restore backup
restoredTable := table + "-restored"
defer deleteTable(ctx, t, adminClient, restoredTable)
if err = adminClient.RestoreTable(ctx, restoredTable, cluster, "mybackup"); err != nil {
if err = adminClient.RestoreTable(ctx, restoredTable, cluster, backupName); err != nil {
t.Fatalf("RestoreTable: %v", err)
}
if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
t.Fatalf("Restored TableInfo: %v", err)
}

// Delete backup
if err = adminClient.DeleteBackup(ctx, cluster, "mybackup"); err != nil {
if err = adminClient.DeleteBackup(ctx, cluster, backupName); err != nil {
t.Fatalf("DeleteBackup: %v", err)
}
backups, err = list(cluster)
Expand Down