diff --git a/bigquery/copy.go b/bigquery/copy.go index 44cc68d12ad..38d4eee8398 100644 --- a/bigquery/copy.go +++ b/bigquery/copy.go @@ -20,6 +20,19 @@ import ( bq "google.golang.org/api/bigquery/v2" ) +// TableCopyOperationType is used to indicate the type of operation performed by a BigQuery +// copy job. +type TableCopyOperationType string + +var ( + // CopyOperation indicates normal table to table copying. + CopyOperation TableCopyOperationType = "COPY" + // SnapshotOperation indicates creating a snapshot from a regular table. + SnapshotOperation TableCopyOperationType = "SNAPSHOT" + // RestoreOperation indicates creating/restoring a table from a snapshot. + RestoreOperation TableCopyOperationType = "RESTORE" +) + // CopyConfig holds the configuration for a copy job. type CopyConfig struct { // Srcs are the tables from which data will be copied. @@ -41,6 +54,10 @@ type CopyConfig struct { // Custom encryption configuration (e.g., Cloud KMS keys). DestinationEncryptionConfig *EncryptionConfig + + // One of the supported operation types when executing a Table Copy jobs. By default this + // copies tables, but can also be set to perform snapshot or restore operations. + OperationType TableCopyOperationType } func (c *CopyConfig) toBQ() *bq.JobConfiguration { @@ -56,6 +73,7 @@ func (c *CopyConfig) toBQ() *bq.JobConfiguration { DestinationTable: c.Dst.toBQ(), DestinationEncryptionConfiguration: c.DestinationEncryptionConfig.toBQ(), SourceTables: ts, + OperationType: string(c.OperationType), }, } } @@ -67,6 +85,7 @@ func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig { WriteDisposition: TableWriteDisposition(q.Copy.WriteDisposition), Dst: bqToTable(q.Copy.DestinationTable, c), DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration), + OperationType: TableCopyOperationType(q.Copy.OperationType), } for _, t := range q.Copy.SourceTables { cc.Srcs = append(cc.Srcs, bqToTable(t, c)) diff --git a/bigquery/copy_test.go b/bigquery/copy_test.go index 5b4ef878694..9b16ebf2d3a 100644 --- a/bigquery/copy_test.go +++ b/bigquery/copy_test.go @@ -137,6 +137,28 @@ func TestCopy(t *testing.T) { return j }(), }, + { + dst: &Table{ + ProjectID: "d-project-id", + DatasetID: "d-dataset-id", + TableID: "d-table-id", + }, + srcs: []*Table{ + { + ProjectID: "s-project-id", + DatasetID: "s-dataset-id", + TableID: "s-table-id", + }, + }, + config: CopyConfig{ + OperationType: SnapshotOperation, + }, + want: func() *bq.Job { + j := defaultCopyJob() + j.Configuration.Copy.OperationType = "SNAPSHOT" + return j + }(), + }, } c := &Client{projectID: "client-project-id"} for i, tc := range testCases { diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 4273e84ec01..c9756b194cd 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -438,6 +438,96 @@ func TestIntegration_TableMetadata(t *testing.T) { } +func TestIntegration_SnapshotAndRestore(t *testing.T) { + + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + // instantiate a base table via a CTAS + baseTableID := tableIDs.New() + qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID) + sql := fmt.Sprintf(` + CREATE TABLE %s + ( + sample_value INT64, + groupid STRING, + ) + AS + SELECT + CAST(RAND() * 100 AS INT64), + CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING)) + FROM + UNNEST(GENERATE_ARRAY(0,999)) +`, qualified) + if err := runQueryJob(ctx, sql); err != nil { + t.Fatalf("couldn't instantiate base table: %v", err) + } + + // Create a snapshot. We'll select our snapshot time explicitly to validate the snapshot time is the same. + targetTime := time.Now() + snapshotID := tableIDs.New() + copier := dataset.Table(snapshotID).CopierFrom(dataset.Table(fmt.Sprintf("%s@%d", baseTableID, targetTime.UnixNano()/1e6))) + copier.OperationType = SnapshotOperation + job, err := copier.Run(ctx) + if err != nil { + t.Fatalf("couldn't run snapshot: %v", err) + } + status, err := job.Wait(ctx) + if err != nil { + t.Fatalf("polling snapshot failed: %v", err) + } + if status.Err() != nil { + t.Fatalf("snapshot failed in error: %v", status.Err()) + } + + // verify metadata on the snapshot + meta, err := dataset.Table(snapshotID).Metadata(ctx) + if err != nil { + t.Fatalf("couldn't get metadata from snapshot: %v", err) + } + if meta.Type != Snapshot { + t.Errorf("expected snapshot table type, got %s", meta.Type) + } + want := &SnapshotDefinition{ + BaseTableReference: dataset.Table(baseTableID), + SnapshotTime: targetTime, + } + if diff := testutil.Diff(meta.SnapshotDefinition, want, cmp.AllowUnexported(Table{}), cmpopts.IgnoreUnexported(Client{}), cmpopts.EquateApproxTime(time.Millisecond)); diff != "" { + t.Fatalf("SnapshotDefinition differs. got=-, want=+:\n%s", diff) + } + + // execute a restore using the snapshot. + restoreID := tableIDs.New() + restorer := dataset.Table(restoreID).CopierFrom(dataset.Table(snapshotID)) + restorer.OperationType = RestoreOperation + job, err = restorer.Run(ctx) + if err != nil { + t.Fatalf("couldn't run restore: %v", err) + } + status, err = job.Wait(ctx) + if err != nil { + t.Fatalf("polling restore failed: %v", err) + } + if status.Err() != nil { + t.Fatalf("restore failed in error: %v", status.Err()) + } + + restoreMeta, err := dataset.Table(restoreID).Metadata(ctx) + if err != nil { + t.Fatalf("couldn't get restored table metadata: %v", err) + } + + if meta.NumBytes != restoreMeta.NumBytes { + t.Errorf("bytes mismatch. snap had %d bytes, restore had %d bytes", meta.NumBytes, restoreMeta.NumBytes) + } + if meta.NumRows != restoreMeta.NumRows { + t.Errorf("row counts mismatch. snap had %d rows, restore had %d rows", meta.NumRows, restoreMeta.NumRows) + } + +} + func TestIntegration_HourTimePartitioning(t *testing.T) { if client == nil { t.Skip("Integration tests skipped") diff --git a/bigquery/table.go b/bigquery/table.go index f3493aed7ba..202b2f737f2 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -120,6 +120,10 @@ type TableMetadata struct { // This does not include data that is being buffered during a streaming insert. NumRows uint64 + // SnapshotDefinition contains additional information about the provenance of a + // given snapshot table. + SnapshotDefinition *SnapshotDefinition + // Contains information regarding this table's streaming buffer, if one is // present. This field will be nil if the table is not being streamed to or if // there is no data in the streaming buffer. @@ -177,6 +181,9 @@ const ( // MaterializedView represents a managed storage table that's derived from // a base table. MaterializedView TableType = "MATERIALIZED_VIEW" + // Snapshot represents an immutable point in time snapshot of some other + // table. + Snapshot TableType = "SNAPSHOT" ) // MaterializedViewDefinition contains information for materialized views. @@ -223,6 +230,42 @@ func bqToMaterializedViewDefinition(q *bq.MaterializedViewDefinition) *Materiali } } +// SnapshotDefinition provides metadata related to the origin of a snapshot. +type SnapshotDefinition struct { + + // BaseTableReference describes the ID of the table that this snapshot + // came from. + BaseTableReference *Table + + // SnapshotTime indicates when the base table was snapshot. + SnapshotTime time.Time +} + +func (sd *SnapshotDefinition) toBQ() *bq.SnapshotDefinition { + if sd == nil { + return nil + } + return &bq.SnapshotDefinition{ + BaseTableReference: sd.BaseTableReference.toBQ(), + SnapshotTime: sd.SnapshotTime.Format(time.RFC3339), + } +} + +func bqToSnapshotDefinition(q *bq.SnapshotDefinition, c *Client) *SnapshotDefinition { + if q == nil { + return nil + } + sd := &SnapshotDefinition{ + BaseTableReference: bqToTable(q.BaseTableReference, c), + } + // It's possible we could fail to populate SnapshotTime if we fail to parse + // the backend representation. + if t, err := time.Parse(time.RFC3339, q.SnapshotTime); err == nil { + sd.SnapshotTime = t + } + return sd +} + // TimePartitioningType defines the interval used to partition managed data. type TimePartitioningType string @@ -496,6 +539,7 @@ func (tm *TableMetadata) toBQ() (*bq.Table, error) { t.RangePartitioning = tm.RangePartitioning.toBQ() t.Clustering = tm.Clustering.toBQ() t.RequirePartitionFilter = tm.RequirePartitionFilter + t.SnapshotDefinition = tm.SnapshotDefinition.toBQ() if !validExpiration(tm.ExpirationTime) { return nil, fmt.Errorf("invalid expiration time: %v.\n"+ @@ -554,10 +598,10 @@ func (t *Table) Metadata(ctx context.Context) (md *TableMetadata, err error) { if err != nil { return nil, err } - return bqToTableMetadata(table) + return bqToTableMetadata(table, t.c) } -func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) { +func bqToTableMetadata(t *bq.Table, c *Client) (*TableMetadata, error) { md := &TableMetadata{ Description: t.Description, Name: t.FriendlyName, @@ -574,6 +618,7 @@ func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) { ETag: t.Etag, EncryptionConfig: bqToEncryptionConfig(t.EncryptionConfiguration), RequirePartitionFilter: t.RequirePartitionFilter, + SnapshotDefinition: bqToSnapshotDefinition(t.SnapshotDefinition, c), } if t.MaterializedView != nil { md.MaterializedView = bqToMaterializedViewDefinition(t.MaterializedView) @@ -652,7 +697,7 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin }); err != nil { return nil, err } - return bqToTableMetadata(res) + return bqToTableMetadata(res, t.c) } func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) { diff --git a/bigquery/table_test.go b/bigquery/table_test.go index 1ce48fdfca8..bc0d47e66ad 100644 --- a/bigquery/table_test.go +++ b/bigquery/table_test.go @@ -113,7 +113,7 @@ func TestBQToTableMetadata(t *testing.T) { }, }, } { - got, err := bqToTableMetadata(test.in) + got, err := bqToTableMetadata(test.in, &Client{}) if err != nil { t.Fatal(err) }