Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): allow restore backup to different instance (#3489) #4014

Merged
merged 22 commits into from May 3, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 20 additions & 11 deletions bigtable/admin.go
Expand Up @@ -116,8 +116,8 @@ func (ac *AdminClient) instancePrefix() string {
return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance)
}

func (ac *AdminClient) backupPath(cluster, backup string) string {
return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, ac.instance, cluster, backup)
func (ac *AdminClient) backupPath(cluster, instance, backup string) string {
return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, instance, cluster, backup)
}

// EncryptionInfo represents the encryption info of a table.
Expand Down Expand Up @@ -687,7 +687,7 @@ func (ac *AdminClient) TableIAM(tableID string) *iam.Handle {

// BackupIAM creates an IAM Handle specific to a given Cluster and Backup.
func (ac *AdminClient) BackupIAM(cluster, backup string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, backup))
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, ac.instance, backup))
}

const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
Expand Down Expand Up @@ -1583,15 +1583,24 @@ func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient
}

// RestoreTable creates a table from a backup. The table will be created in the same cluster as the backup.
// To restore a table to a different instance, see RestoreTableFrom.
func (ac *AdminClient) RestoreTable(ctx context.Context, table, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
backupPath := ac.backupPath(cluster, backup)
return ac.RestoreTableFrom(ctx, ac.instance, table, cluster, backup)
}

// RestoreTableFrom creates a new table in the admin's instance by restoring from the specified backup in a different instance.
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
// To restore within the same instance, see RestoreTable.
// sourceInstance (ex. "my-instance") and sourceCluster (ex. "my-cluster") are the instance and cluster in which the new table will be restored from.
// tableName (ex. "my-restored-table") will be the name of the newly created table
// backupName (ex. "my-backup") is the name of the backup to restore
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
func (ac *AdminClient) RestoreTableFrom(ctx context.Context, sourceInstance, table, sourceCluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
parent := ac.instancePrefix()
sourceBackupPath := ac.backupPath(sourceCluster, sourceInstance, backup)
req := &btapb.RestoreTableRequest{
Parent: prefix,
Parent: parent,
TableId: table,
Source: &btapb.RestoreTableRequest_Backup{backupPath},
Source: &btapb.RestoreTableRequest_Backup{sourceBackupPath},
}
op, err := ac.tClient.RestoreTable(ctx, req)
if err != nil {
Expand Down Expand Up @@ -1750,7 +1759,7 @@ type BackupInfo struct {
// BackupInfo gets backup metadata.
func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (*BackupInfo, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
backupPath := ac.backupPath(cluster, backup)
backupPath := ac.backupPath(cluster, ac.instance, backup)

req := &btapb.GetBackupRequest{
Name: backupPath,
Expand All @@ -1772,7 +1781,7 @@ func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (
// DeleteBackup deletes a backup in a cluster.
func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
backupPath := ac.backupPath(cluster, backup)
backupPath := ac.backupPath(cluster, ac.instance, backup)

req := &btapb.DeleteBackupRequest{
Name: backupPath,
Expand All @@ -1784,7 +1793,7 @@ func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string)
// UpdateBackup updates the backup metadata in a cluster. The API only supports updating expire time.
func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string, expireTime time.Time) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
backupPath := ac.backupPath(cluster, backup)
backupPath := ac.backupPath(cluster, ac.instance, backup)

expireTimestamp, err := ptypes.TimestampProto(expireTime)
if err != nil {
Expand Down
108 changes: 83 additions & 25 deletions bigtable/integration_test.go
Expand Up @@ -2441,12 +2441,43 @@ func TestIntegration_AdminBackup(t *testing.T) {
}
defer adminClient.Close()

table := testEnv.Config().Table
cluster := testEnv.Config().Cluster

tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, table)
defer deleteTable(ctx, t, adminClient, tblConf.TableID)

sourceInstance := testEnv.Config().Instance
sourceCluster := testEnv.Config().Cluster

iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
diffInstance := testEnv.Config().Instance + "-diff"
diffCluster := sourceCluster + "-diff"
conf := &InstanceConf{
InstanceId: diffInstance,
ClusterId: diffCluster,
DisplayName: "different test sourceInstance",
Zone: instanceToCreateZone2,
InstanceType: DEVELOPMENT,
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
Labels: map[string]string{"test-label-key": "test-label-value"},
}
defer iAdminClient.DeleteInstance(ctx, diffInstance)
// Create different instance to restore table.
if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
t.Errorf("CreateInstance: %v", err)
}

list := func(cluster string) ([]*BackupInfo, error) {
infos := []*BackupInfo(nil)
Expand All @@ -2465,12 +2496,8 @@ func TestIntegration_AdminBackup(t *testing.T) {
return infos, err
}

if err := adminClient.CreateTable(ctx, table); err != nil {
t.Fatalf("Creating table: %v", err)
}

// Precondition: no backups
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
backups, err := list(cluster)
backups, err := list(sourceCluster)
if err != nil {
t.Fatalf("Initial backup list: %v", err)
}
Expand All @@ -2480,32 +2507,32 @@ func TestIntegration_AdminBackup(t *testing.T) {

// Create backup
backupName := "mybackup"
defer adminClient.DeleteBackup(ctx, cluster, "mybackup")
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
defer adminClient.DeleteBackup(ctx, sourceCluster, "mybackup")

if err = adminClient.CreateBackup(ctx, table, cluster, backupName, time.Now().Add(8*time.Hour)); err != nil {
if err = adminClient.CreateBackup(ctx, tblConf.TableID, sourceCluster, backupName, time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}

// List backup
backups, err = list(cluster)
backups, err = list(sourceCluster)
if err != nil {
t.Fatalf("Listing backups: %v", err)
}
if got, want := len(backups), 1; got != want {
t.Fatalf("Listing backup count: %d, want: %d", got, want)
}
if got, want := backups[0].Name, backupName; got != want {
t.Fatalf("Backup name: %s, want: %s", got, want)
t.Errorf("Backup name: %s, want: %s", got, want)
}
if got, want := backups[0].SourceTable, table; got != want {
t.Fatalf("Backup SourceTable: %s, want: %s", got, want)
if got, want := backups[0].SourceTable, tblConf.TableID; got != want {
t.Errorf("Backup SourceTable: %s, want: %s", got, want)
}
if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
t.Fatalf("Backup ExpireTime: %s, want: %s", got, want)
t.Errorf("Backup ExpireTime: %s, want: %s", got, want)
}

// Get backup
backup, err := adminClient.BackupInfo(ctx, cluster, backupName)
backup, err := adminClient.BackupInfo(ctx, sourceCluster, backupName)
if err != nil {
t.Fatalf("BackupInfo: %v", backup)
}
Expand All @@ -2515,42 +2542,73 @@ func TestIntegration_AdminBackup(t *testing.T) {

// Update backup
newExpireTime := time.Now().Add(10 * time.Hour)
err = adminClient.UpdateBackup(ctx, cluster, backupName, newExpireTime)
err = adminClient.UpdateBackup(ctx, sourceCluster, backupName, newExpireTime)
if err != nil {
t.Fatalf("UpdateBackup failed: %v", err)
}

// Check that updated backup has the correct expire time
updatedBackup, err := adminClient.BackupInfo(ctx, cluster, backupName)
updatedBackup, err := adminClient.BackupInfo(ctx, sourceCluster, backupName)
if err != nil {
t.Fatalf("BackupInfo: %v", err)
}
backup.ExpireTime = newExpireTime
// Server clock and local clock may not be perfectly sync'ed.
if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute {
t.Fatalf("BackupInfo: %v, want: %v", got, want)
t.Errorf("BackupInfo: %v, want: %v", got, want)
}

// Restore backup
restoredTable := table + "-restored"
restoredTable := tblConf.TableID + "-restored"
defer deleteTable(ctx, t, adminClient, restoredTable)
if err = adminClient.RestoreTable(ctx, restoredTable, cluster, backupName); err != nil {
if err = adminClient.RestoreTable(ctx, restoredTable, sourceCluster, backupName); err != nil {
t.Fatalf("RestoreTable: %v", err)
}
if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
t.Fatalf("Restored TableInfo: %v", err)
}
// Restore backup to different instance
restoreTableName := tblConf.TableID + "-diff-restored"
diffConf := IntegrationTestConfig{
Project: testEnv.Config().Project,
Instance: diffInstance,
Cluster: diffCluster,
Table: restoreTableName,
}
env := &ProdEnv{
config: diffConf,
}
dAdminClient, err := env.NewAdminClient()
if err != nil {
t.Errorf("NewAdminClient: %v", err)
}
defer dAdminClient.Close()

defer deleteTable(ctx, t, dAdminClient, restoreTableName)
if err = dAdminClient.RestoreTableFrom(ctx, sourceInstance, restoreTableName, sourceCluster, "mybackup"); err != nil {
t.Fatalf("RestoreTableFrom: %v", err)
}
tblInfo, err := dAdminClient.TableInfo(ctx, restoreTableName)
if err != nil {
t.Fatalf("Restored to different sourceInstance failed, TableInfo: %v", err)
}
families := tblInfo.Families
sort.Strings(tblInfo.Families)
wantFams := []string{"fam1", "fam2"}
if !testutil.Equal(families, wantFams) {
t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
}

// Delete backup
if err = adminClient.DeleteBackup(ctx, cluster, backupName); err != nil {
if err = adminClient.DeleteBackup(ctx, sourceCluster, backupName); err != nil {
t.Fatalf("DeleteBackup: %v", err)
}
backups, err = list(cluster)
backups, err = list(sourceCluster)
if err != nil {
t.Fatalf("List after Delete: %v", err)
}
if got, want := len(backups), 0; got != want {
t.Fatalf("List after delete len: %d, want: %d", got, want)
t.Errorf("List after delete len: %d, want: %d", got, want)
}
}

Expand Down