Skip to content

Commit

Permalink
feat(bigtable): Backup Level IAM (#3222)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlisskaPie committed Dec 29, 2020
1 parent 1648ea0 commit c77c822
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 18 deletions.
25 changes: 14 additions & 11 deletions bigtable/admin.go
Expand Up @@ -115,6 +115,10 @@ func (ac *AdminClient) instancePrefix() string {
return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance)
}

func (ac *AdminClient) backupPath(cluster, backup string) string {
return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, ac.instance, cluster, backup)
}

// Tables returns a list of the tables in the instance.
func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
Expand Down Expand Up @@ -597,12 +601,17 @@ func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) err
}
}

// TableIAM creates an IAM client specific to a given Instance and Table within the configured project.
// TableIAM creates an IAM Handle specific to a given Instance and Table within the configured project.
func (ac *AdminClient) TableIAM(tableID string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient,
"projects/"+ac.project+"/instances/"+ac.instance+"/tables/"+tableID)
}

// BackupIAM creates an IAM Handle specific to a given Cluster and Backup.
func (ac *AdminClient) BackupIAM(cluster, backup string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, backup))
}

const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
const mtlsInstanceAdminAddr = "bigtableadmin.mtls.googleapis.com:443"

Expand Down Expand Up @@ -1443,7 +1452,7 @@ func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient
func (ac *AdminClient) RestoreTable(ctx context.Context, table, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
backupPath := prefix + "/clusters/" + cluster + "/backups/" + backup
backupPath := ac.backupPath(cluster, backup)

req := &btapb.RestoreTableRequest{
Parent: prefix,
Expand Down Expand Up @@ -1603,9 +1612,7 @@ type BackupInfo struct {
// BackupInfo gets backup metadata.
func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (*BackupInfo, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup
backupPath := ac.backupPath(cluster, backup)

req := &btapb.GetBackupRequest{
Name: backupPath,
Expand All @@ -1627,9 +1634,7 @@ func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (
// DeleteBackup deletes a backup in a cluster.
func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup
backupPath := ac.backupPath(cluster, backup)

req := &btapb.DeleteBackupRequest{
Name: backupPath,
Expand All @@ -1641,9 +1646,7 @@ func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string)
// UpdateBackup updates the backup metadata in a cluster. The API only supports updating expire time.
func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string, expireTime time.Time) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup
backupPath := ac.backupPath(cluster, backup)

expireTimestamp, err := ptypes.TimestampProto(expireTime)
if err != nil {
Expand Down
80 changes: 73 additions & 7 deletions bigtable/integration_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

"cloud.google.com/go/iam"
"cloud.google.com/go/internal"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
Expand Down Expand Up @@ -1266,6 +1267,70 @@ func TestIntegration_TableIam(t *testing.T) {
}
}

func TestIntegration_BackupIAM(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()

if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support IAM Policy creation")
}
timeout := 5 * time.Minute
ctx, _ := context.WithTimeout(context.Background(), timeout)

adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()

table := testEnv.Config().Table
cluster := testEnv.Config().Cluster

defer deleteTable(ctx, t, adminClient, table)
if err := adminClient.CreateTable(ctx, table); err != nil {
t.Fatalf("Creating table: %v", err)
}
// Create backup.
backup := "backup"
defer adminClient.DeleteBackup(ctx, cluster, backup)
if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}
iamHandle := adminClient.BackupIAM(cluster, backup)
// Get backup policy.
p, err := iamHandle.Policy(ctx)
if err != nil {
t.Errorf("iamHandle.Policy: %v", err)
}
// The resource is new, so the policy should be empty.
if got := p.Roles(); len(got) > 0 {
t.Errorf("got roles %v, want none", got)
}
// Set backup policy.
member := "domain:google.com"
// Add a member, set the policy, then check that the member is present.
p.Add(member, iam.Viewer)
if err = iamHandle.SetPolicy(ctx, p); err != nil {
t.Errorf("iamHandle.SetPolicy: %v", err)
}
p, err = iamHandle.Policy(ctx)
if err != nil {
t.Errorf("iamHandle.Policy: %v", err)
}
if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
t.Errorf("iamHandle.Policy: got %v, want %v", got, want)
}
// Test backup permissions.
permissions := []string{"bigtable.backups.get", "bigtable.backups.update"}
_, err = iamHandle.TestPermissions(ctx, permissions)
if err != nil {
t.Errorf("iamHandle.TestPermissions: %v", err)
}
}

func TestIntegration_AdminCreateInstance(t *testing.T) {
if instanceToCreate == "" {
t.Skip("instanceToCreate not set, skipping instance creation testing")
Expand Down Expand Up @@ -2068,9 +2133,10 @@ func TestIntegration_AdminBackup(t *testing.T) {
}

// Create backup
backupName := "mybackup"
defer adminClient.DeleteBackup(ctx, cluster, "mybackup")

if err = adminClient.CreateBackup(ctx, table, cluster, "mybackup", time.Now().Add(8*time.Hour)); err != nil {
if err = adminClient.CreateBackup(ctx, table, cluster, backupName, time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}

Expand All @@ -2082,7 +2148,7 @@ func TestIntegration_AdminBackup(t *testing.T) {
if got, want := len(backups), 1; got != want {
t.Fatalf("Listing backup count: %d, want: %d", got, want)
}
if got, want := backups[0].Name, "mybackup"; got != want {
if got, want := backups[0].Name, backupName; got != want {
t.Fatalf("Backup name: %s, want: %s", got, want)
}
if got, want := backups[0].SourceTable, table; got != want {
Expand All @@ -2093,7 +2159,7 @@ func TestIntegration_AdminBackup(t *testing.T) {
}

// Get backup
backup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
backup, err := adminClient.BackupInfo(ctx, cluster, backupName)
if err != nil {
t.Fatalf("BackupInfo: %v", backup)
}
Expand All @@ -2103,13 +2169,13 @@ func TestIntegration_AdminBackup(t *testing.T) {

// Update backup
newExpireTime := time.Now().Add(10 * time.Hour)
err = adminClient.UpdateBackup(ctx, cluster, "mybackup", newExpireTime)
err = adminClient.UpdateBackup(ctx, cluster, backupName, newExpireTime)
if err != nil {
t.Fatalf("UpdateBackup failed: %v", err)
}

// Check that updated backup has the correct expire time
updatedBackup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
updatedBackup, err := adminClient.BackupInfo(ctx, cluster, backupName)
if err != nil {
t.Fatalf("BackupInfo: %v", err)
}
Expand All @@ -2122,15 +2188,15 @@ func TestIntegration_AdminBackup(t *testing.T) {
// Restore backup
restoredTable := table + "-restored"
defer deleteTable(ctx, t, adminClient, restoredTable)
if err = adminClient.RestoreTable(ctx, restoredTable, cluster, "mybackup"); err != nil {
if err = adminClient.RestoreTable(ctx, restoredTable, cluster, backupName); err != nil {
t.Fatalf("RestoreTable: %v", err)
}
if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
t.Fatalf("Restored TableInfo: %v", err)
}

// Delete backup
if err = adminClient.DeleteBackup(ctx, cluster, "mybackup"); err != nil {
if err = adminClient.DeleteBackup(ctx, cluster, backupName); err != nil {
t.Fatalf("DeleteBackup: %v", err)
}
backups, err = list(cluster)
Expand Down

0 comments on commit c77c822

Please sign in to comment.