From 4c12b424eec06c7d87244eaa922995bbe6e46e7e Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 22 Jun 2021 09:56:46 -0700 Subject: [PATCH] feat(bigquery): add support for snapshot/restore (#4112) * feat(bigquery): add support for snapshot/restore Secondary changes: This complicated some internal wrapping functions for TableMetadata. The fact that TableMetadata can now contain a Table reference for other tables means we need to pass around an additional instance of the Client, as user may want to do something like call Metadata() on the embedded reference to the base table in a snapshot definition. * add comments to exported operation types * doc SnapshotDefinition * address reviewer comments Co-authored-by: Tyler Bui-Palsulich <26876514+tbpg@users.noreply.github.com> --- bigquery/copy.go | 19 ++++++++ bigquery/copy_test.go | 22 +++++++++ bigquery/integration_test.go | 90 ++++++++++++++++++++++++++++++++++++ bigquery/table.go | 51 ++++++++++++++++++-- bigquery/table_test.go | 2 +- 5 files changed, 180 insertions(+), 4 deletions(-) diff --git a/bigquery/copy.go b/bigquery/copy.go index 44cc68d12ad..38d4eee8398 100644 --- a/bigquery/copy.go +++ b/bigquery/copy.go @@ -20,6 +20,19 @@ import ( bq "google.golang.org/api/bigquery/v2" ) +// TableCopyOperationType is used to indicate the type of operation performed by a BigQuery +// copy job. +type TableCopyOperationType string + +var ( + // CopyOperation indicates normal table to table copying. + CopyOperation TableCopyOperationType = "COPY" + // SnapshotOperation indicates creating a snapshot from a regular table. + SnapshotOperation TableCopyOperationType = "SNAPSHOT" + // RestoreOperation indicates creating/restoring a table from a snapshot. + RestoreOperation TableCopyOperationType = "RESTORE" +) + // CopyConfig holds the configuration for a copy job. type CopyConfig struct { // Srcs are the tables from which data will be copied. @@ -41,6 +54,10 @@ type CopyConfig struct { // Custom encryption configuration (e.g., Cloud KMS keys). DestinationEncryptionConfig *EncryptionConfig + + // One of the supported operation types when executing a Table Copy jobs. By default this + // copies tables, but can also be set to perform snapshot or restore operations. + OperationType TableCopyOperationType } func (c *CopyConfig) toBQ() *bq.JobConfiguration { @@ -56,6 +73,7 @@ func (c *CopyConfig) toBQ() *bq.JobConfiguration { DestinationTable: c.Dst.toBQ(), DestinationEncryptionConfiguration: c.DestinationEncryptionConfig.toBQ(), SourceTables: ts, + OperationType: string(c.OperationType), }, } } @@ -67,6 +85,7 @@ func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig { WriteDisposition: TableWriteDisposition(q.Copy.WriteDisposition), Dst: bqToTable(q.Copy.DestinationTable, c), DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration), + OperationType: TableCopyOperationType(q.Copy.OperationType), } for _, t := range q.Copy.SourceTables { cc.Srcs = append(cc.Srcs, bqToTable(t, c)) diff --git a/bigquery/copy_test.go b/bigquery/copy_test.go index 5b4ef878694..9b16ebf2d3a 100644 --- a/bigquery/copy_test.go +++ b/bigquery/copy_test.go @@ -137,6 +137,28 @@ func TestCopy(t *testing.T) { return j }(), }, + { + dst: &Table{ + ProjectID: "d-project-id", + DatasetID: "d-dataset-id", + TableID: "d-table-id", + }, + srcs: []*Table{ + { + ProjectID: "s-project-id", + DatasetID: "s-dataset-id", + TableID: "s-table-id", + }, + }, + config: CopyConfig{ + OperationType: SnapshotOperation, + }, + want: func() *bq.Job { + j := defaultCopyJob() + j.Configuration.Copy.OperationType = "SNAPSHOT" + return j + }(), + }, } c := &Client{projectID: "client-project-id"} for i, tc := range testCases { diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 4273e84ec01..c9756b194cd 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -438,6 +438,96 @@ func TestIntegration_TableMetadata(t *testing.T) { } +func TestIntegration_SnapshotAndRestore(t *testing.T) { + + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + // instantiate a base table via a CTAS + baseTableID := tableIDs.New() + qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID) + sql := fmt.Sprintf(` + CREATE TABLE %s + ( + sample_value INT64, + groupid STRING, + ) + AS + SELECT + CAST(RAND() * 100 AS INT64), + CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING)) + FROM + UNNEST(GENERATE_ARRAY(0,999)) +`, qualified) + if err := runQueryJob(ctx, sql); err != nil { + t.Fatalf("couldn't instantiate base table: %v", err) + } + + // Create a snapshot. We'll select our snapshot time explicitly to validate the snapshot time is the same. + targetTime := time.Now() + snapshotID := tableIDs.New() + copier := dataset.Table(snapshotID).CopierFrom(dataset.Table(fmt.Sprintf("%s@%d", baseTableID, targetTime.UnixNano()/1e6))) + copier.OperationType = SnapshotOperation + job, err := copier.Run(ctx) + if err != nil { + t.Fatalf("couldn't run snapshot: %v", err) + } + status, err := job.Wait(ctx) + if err != nil { + t.Fatalf("polling snapshot failed: %v", err) + } + if status.Err() != nil { + t.Fatalf("snapshot failed in error: %v", status.Err()) + } + + // verify metadata on the snapshot + meta, err := dataset.Table(snapshotID).Metadata(ctx) + if err != nil { + t.Fatalf("couldn't get metadata from snapshot: %v", err) + } + if meta.Type != Snapshot { + t.Errorf("expected snapshot table type, got %s", meta.Type) + } + want := &SnapshotDefinition{ + BaseTableReference: dataset.Table(baseTableID), + SnapshotTime: targetTime, + } + if diff := testutil.Diff(meta.SnapshotDefinition, want, cmp.AllowUnexported(Table{}), cmpopts.IgnoreUnexported(Client{}), cmpopts.EquateApproxTime(time.Millisecond)); diff != "" { + t.Fatalf("SnapshotDefinition differs. got=-, want=+:\n%s", diff) + } + + // execute a restore using the snapshot. + restoreID := tableIDs.New() + restorer := dataset.Table(restoreID).CopierFrom(dataset.Table(snapshotID)) + restorer.OperationType = RestoreOperation + job, err = restorer.Run(ctx) + if err != nil { + t.Fatalf("couldn't run restore: %v", err) + } + status, err = job.Wait(ctx) + if err != nil { + t.Fatalf("polling restore failed: %v", err) + } + if status.Err() != nil { + t.Fatalf("restore failed in error: %v", status.Err()) + } + + restoreMeta, err := dataset.Table(restoreID).Metadata(ctx) + if err != nil { + t.Fatalf("couldn't get restored table metadata: %v", err) + } + + if meta.NumBytes != restoreMeta.NumBytes { + t.Errorf("bytes mismatch. snap had %d bytes, restore had %d bytes", meta.NumBytes, restoreMeta.NumBytes) + } + if meta.NumRows != restoreMeta.NumRows { + t.Errorf("row counts mismatch. snap had %d rows, restore had %d rows", meta.NumRows, restoreMeta.NumRows) + } + +} + func TestIntegration_HourTimePartitioning(t *testing.T) { if client == nil { t.Skip("Integration tests skipped") diff --git a/bigquery/table.go b/bigquery/table.go index f3493aed7ba..202b2f737f2 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -120,6 +120,10 @@ type TableMetadata struct { // This does not include data that is being buffered during a streaming insert. NumRows uint64 + // SnapshotDefinition contains additional information about the provenance of a + // given snapshot table. + SnapshotDefinition *SnapshotDefinition + // Contains information regarding this table's streaming buffer, if one is // present. This field will be nil if the table is not being streamed to or if // there is no data in the streaming buffer. @@ -177,6 +181,9 @@ const ( // MaterializedView represents a managed storage table that's derived from // a base table. MaterializedView TableType = "MATERIALIZED_VIEW" + // Snapshot represents an immutable point in time snapshot of some other + // table. + Snapshot TableType = "SNAPSHOT" ) // MaterializedViewDefinition contains information for materialized views. @@ -223,6 +230,42 @@ func bqToMaterializedViewDefinition(q *bq.MaterializedViewDefinition) *Materiali } } +// SnapshotDefinition provides metadata related to the origin of a snapshot. +type SnapshotDefinition struct { + + // BaseTableReference describes the ID of the table that this snapshot + // came from. + BaseTableReference *Table + + // SnapshotTime indicates when the base table was snapshot. + SnapshotTime time.Time +} + +func (sd *SnapshotDefinition) toBQ() *bq.SnapshotDefinition { + if sd == nil { + return nil + } + return &bq.SnapshotDefinition{ + BaseTableReference: sd.BaseTableReference.toBQ(), + SnapshotTime: sd.SnapshotTime.Format(time.RFC3339), + } +} + +func bqToSnapshotDefinition(q *bq.SnapshotDefinition, c *Client) *SnapshotDefinition { + if q == nil { + return nil + } + sd := &SnapshotDefinition{ + BaseTableReference: bqToTable(q.BaseTableReference, c), + } + // It's possible we could fail to populate SnapshotTime if we fail to parse + // the backend representation. + if t, err := time.Parse(time.RFC3339, q.SnapshotTime); err == nil { + sd.SnapshotTime = t + } + return sd +} + // TimePartitioningType defines the interval used to partition managed data. type TimePartitioningType string @@ -496,6 +539,7 @@ func (tm *TableMetadata) toBQ() (*bq.Table, error) { t.RangePartitioning = tm.RangePartitioning.toBQ() t.Clustering = tm.Clustering.toBQ() t.RequirePartitionFilter = tm.RequirePartitionFilter + t.SnapshotDefinition = tm.SnapshotDefinition.toBQ() if !validExpiration(tm.ExpirationTime) { return nil, fmt.Errorf("invalid expiration time: %v.\n"+ @@ -554,10 +598,10 @@ func (t *Table) Metadata(ctx context.Context) (md *TableMetadata, err error) { if err != nil { return nil, err } - return bqToTableMetadata(table) + return bqToTableMetadata(table, t.c) } -func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) { +func bqToTableMetadata(t *bq.Table, c *Client) (*TableMetadata, error) { md := &TableMetadata{ Description: t.Description, Name: t.FriendlyName, @@ -574,6 +618,7 @@ func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) { ETag: t.Etag, EncryptionConfig: bqToEncryptionConfig(t.EncryptionConfiguration), RequirePartitionFilter: t.RequirePartitionFilter, + SnapshotDefinition: bqToSnapshotDefinition(t.SnapshotDefinition, c), } if t.MaterializedView != nil { md.MaterializedView = bqToMaterializedViewDefinition(t.MaterializedView) @@ -652,7 +697,7 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin }); err != nil { return nil, err } - return bqToTableMetadata(res) + return bqToTableMetadata(res, t.c) } func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) { diff --git a/bigquery/table_test.go b/bigquery/table_test.go index 1ce48fdfca8..bc0d47e66ad 100644 --- a/bigquery/table_test.go +++ b/bigquery/table_test.go @@ -113,7 +113,7 @@ func TestBQToTableMetadata(t *testing.T) { }, }, } { - got, err := bqToTableMetadata(test.in) + got, err := bqToTableMetadata(test.in, &Client{}) if err != nil { t.Fatal(err) }