Skip to content

Commit

Permalink
feat(bigquery): add support for snapshot/restore (#4112)
Browse files Browse the repository at this point in the history
* feat(bigquery): add support for snapshot/restore

Secondary changes:  This complicated some internal wrapping functions
for TableMetadata.  The fact that TableMetadata can now contain a Table
reference for other tables means we need to pass around an additional
instance of the Client, as user may want to do something like call
Metadata() on the embedded reference to the base table in a
snapshot definition.

* add comments to exported operation types

* doc SnapshotDefinition

* address reviewer comments

Co-authored-by: Tyler Bui-Palsulich <26876514+tbpg@users.noreply.github.com>
  • Loading branch information
shollyman and tbpg committed Jun 22, 2021
1 parent 83337b8 commit 4c12b42
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 4 deletions.
19 changes: 19 additions & 0 deletions bigquery/copy.go
Expand Up @@ -20,6 +20,19 @@ import (
bq "google.golang.org/api/bigquery/v2"
)

// TableCopyOperationType is used to indicate the type of operation performed by a BigQuery
// copy job.
type TableCopyOperationType string

var (
// CopyOperation indicates normal table to table copying.
CopyOperation TableCopyOperationType = "COPY"
// SnapshotOperation indicates creating a snapshot from a regular table.
SnapshotOperation TableCopyOperationType = "SNAPSHOT"
// RestoreOperation indicates creating/restoring a table from a snapshot.
RestoreOperation TableCopyOperationType = "RESTORE"
)

// CopyConfig holds the configuration for a copy job.
type CopyConfig struct {
// Srcs are the tables from which data will be copied.
Expand All @@ -41,6 +54,10 @@ type CopyConfig struct {

// Custom encryption configuration (e.g., Cloud KMS keys).
DestinationEncryptionConfig *EncryptionConfig

// One of the supported operation types when executing a Table Copy jobs. By default this
// copies tables, but can also be set to perform snapshot or restore operations.
OperationType TableCopyOperationType
}

func (c *CopyConfig) toBQ() *bq.JobConfiguration {
Expand All @@ -56,6 +73,7 @@ func (c *CopyConfig) toBQ() *bq.JobConfiguration {
DestinationTable: c.Dst.toBQ(),
DestinationEncryptionConfiguration: c.DestinationEncryptionConfig.toBQ(),
SourceTables: ts,
OperationType: string(c.OperationType),
},
}
}
Expand All @@ -67,6 +85,7 @@ func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig {
WriteDisposition: TableWriteDisposition(q.Copy.WriteDisposition),
Dst: bqToTable(q.Copy.DestinationTable, c),
DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration),
OperationType: TableCopyOperationType(q.Copy.OperationType),
}
for _, t := range q.Copy.SourceTables {
cc.Srcs = append(cc.Srcs, bqToTable(t, c))
Expand Down
22 changes: 22 additions & 0 deletions bigquery/copy_test.go
Expand Up @@ -137,6 +137,28 @@ func TestCopy(t *testing.T) {
return j
}(),
},
{
dst: &Table{
ProjectID: "d-project-id",
DatasetID: "d-dataset-id",
TableID: "d-table-id",
},
srcs: []*Table{
{
ProjectID: "s-project-id",
DatasetID: "s-dataset-id",
TableID: "s-table-id",
},
},
config: CopyConfig{
OperationType: SnapshotOperation,
},
want: func() *bq.Job {
j := defaultCopyJob()
j.Configuration.Copy.OperationType = "SNAPSHOT"
return j
}(),
},
}
c := &Client{projectID: "client-project-id"}
for i, tc := range testCases {
Expand Down
90 changes: 90 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -438,6 +438,96 @@ func TestIntegration_TableMetadata(t *testing.T) {

}

func TestIntegration_SnapshotAndRestore(t *testing.T) {

if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

// instantiate a base table via a CTAS
baseTableID := tableIDs.New()
qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID)
sql := fmt.Sprintf(`
CREATE TABLE %s
(
sample_value INT64,
groupid STRING,
)
AS
SELECT
CAST(RAND() * 100 AS INT64),
CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING))
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if err := runQueryJob(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

// Create a snapshot. We'll select our snapshot time explicitly to validate the snapshot time is the same.
targetTime := time.Now()
snapshotID := tableIDs.New()
copier := dataset.Table(snapshotID).CopierFrom(dataset.Table(fmt.Sprintf("%s@%d", baseTableID, targetTime.UnixNano()/1e6)))
copier.OperationType = SnapshotOperation
job, err := copier.Run(ctx)
if err != nil {
t.Fatalf("couldn't run snapshot: %v", err)
}
status, err := job.Wait(ctx)
if err != nil {
t.Fatalf("polling snapshot failed: %v", err)
}
if status.Err() != nil {
t.Fatalf("snapshot failed in error: %v", status.Err())
}

// verify metadata on the snapshot
meta, err := dataset.Table(snapshotID).Metadata(ctx)
if err != nil {
t.Fatalf("couldn't get metadata from snapshot: %v", err)
}
if meta.Type != Snapshot {
t.Errorf("expected snapshot table type, got %s", meta.Type)
}
want := &SnapshotDefinition{
BaseTableReference: dataset.Table(baseTableID),
SnapshotTime: targetTime,
}
if diff := testutil.Diff(meta.SnapshotDefinition, want, cmp.AllowUnexported(Table{}), cmpopts.IgnoreUnexported(Client{}), cmpopts.EquateApproxTime(time.Millisecond)); diff != "" {
t.Fatalf("SnapshotDefinition differs. got=-, want=+:\n%s", diff)
}

// execute a restore using the snapshot.
restoreID := tableIDs.New()
restorer := dataset.Table(restoreID).CopierFrom(dataset.Table(snapshotID))
restorer.OperationType = RestoreOperation
job, err = restorer.Run(ctx)
if err != nil {
t.Fatalf("couldn't run restore: %v", err)
}
status, err = job.Wait(ctx)
if err != nil {
t.Fatalf("polling restore failed: %v", err)
}
if status.Err() != nil {
t.Fatalf("restore failed in error: %v", status.Err())
}

restoreMeta, err := dataset.Table(restoreID).Metadata(ctx)
if err != nil {
t.Fatalf("couldn't get restored table metadata: %v", err)
}

if meta.NumBytes != restoreMeta.NumBytes {
t.Errorf("bytes mismatch. snap had %d bytes, restore had %d bytes", meta.NumBytes, restoreMeta.NumBytes)
}
if meta.NumRows != restoreMeta.NumRows {
t.Errorf("row counts mismatch. snap had %d rows, restore had %d rows", meta.NumRows, restoreMeta.NumRows)
}

}

func TestIntegration_HourTimePartitioning(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down
51 changes: 48 additions & 3 deletions bigquery/table.go
Expand Up @@ -120,6 +120,10 @@ type TableMetadata struct {
// This does not include data that is being buffered during a streaming insert.
NumRows uint64

// SnapshotDefinition contains additional information about the provenance of a
// given snapshot table.
SnapshotDefinition *SnapshotDefinition

// Contains information regarding this table's streaming buffer, if one is
// present. This field will be nil if the table is not being streamed to or if
// there is no data in the streaming buffer.
Expand Down Expand Up @@ -177,6 +181,9 @@ const (
// MaterializedView represents a managed storage table that's derived from
// a base table.
MaterializedView TableType = "MATERIALIZED_VIEW"
// Snapshot represents an immutable point in time snapshot of some other
// table.
Snapshot TableType = "SNAPSHOT"
)

// MaterializedViewDefinition contains information for materialized views.
Expand Down Expand Up @@ -223,6 +230,42 @@ func bqToMaterializedViewDefinition(q *bq.MaterializedViewDefinition) *Materiali
}
}

// SnapshotDefinition provides metadata related to the origin of a snapshot.
type SnapshotDefinition struct {

// BaseTableReference describes the ID of the table that this snapshot
// came from.
BaseTableReference *Table

// SnapshotTime indicates when the base table was snapshot.
SnapshotTime time.Time
}

func (sd *SnapshotDefinition) toBQ() *bq.SnapshotDefinition {
if sd == nil {
return nil
}
return &bq.SnapshotDefinition{
BaseTableReference: sd.BaseTableReference.toBQ(),
SnapshotTime: sd.SnapshotTime.Format(time.RFC3339),
}
}

func bqToSnapshotDefinition(q *bq.SnapshotDefinition, c *Client) *SnapshotDefinition {
if q == nil {
return nil
}
sd := &SnapshotDefinition{
BaseTableReference: bqToTable(q.BaseTableReference, c),
}
// It's possible we could fail to populate SnapshotTime if we fail to parse
// the backend representation.
if t, err := time.Parse(time.RFC3339, q.SnapshotTime); err == nil {
sd.SnapshotTime = t
}
return sd
}

// TimePartitioningType defines the interval used to partition managed data.
type TimePartitioningType string

Expand Down Expand Up @@ -496,6 +539,7 @@ func (tm *TableMetadata) toBQ() (*bq.Table, error) {
t.RangePartitioning = tm.RangePartitioning.toBQ()
t.Clustering = tm.Clustering.toBQ()
t.RequirePartitionFilter = tm.RequirePartitionFilter
t.SnapshotDefinition = tm.SnapshotDefinition.toBQ()

if !validExpiration(tm.ExpirationTime) {
return nil, fmt.Errorf("invalid expiration time: %v.\n"+
Expand Down Expand Up @@ -554,10 +598,10 @@ func (t *Table) Metadata(ctx context.Context) (md *TableMetadata, err error) {
if err != nil {
return nil, err
}
return bqToTableMetadata(table)
return bqToTableMetadata(table, t.c)
}

func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) {
func bqToTableMetadata(t *bq.Table, c *Client) (*TableMetadata, error) {
md := &TableMetadata{
Description: t.Description,
Name: t.FriendlyName,
Expand All @@ -574,6 +618,7 @@ func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) {
ETag: t.Etag,
EncryptionConfig: bqToEncryptionConfig(t.EncryptionConfiguration),
RequirePartitionFilter: t.RequirePartitionFilter,
SnapshotDefinition: bqToSnapshotDefinition(t.SnapshotDefinition, c),
}
if t.MaterializedView != nil {
md.MaterializedView = bqToMaterializedViewDefinition(t.MaterializedView)
Expand Down Expand Up @@ -652,7 +697,7 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin
}); err != nil {
return nil, err
}
return bqToTableMetadata(res)
return bqToTableMetadata(res, t.c)
}

func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {
Expand Down
2 changes: 1 addition & 1 deletion bigquery/table_test.go
Expand Up @@ -113,7 +113,7 @@ func TestBQToTableMetadata(t *testing.T) {
},
},
} {
got, err := bqToTableMetadata(test.in)
got, err := bqToTableMetadata(test.in, &Client{})
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 4c12b42

Please sign in to comment.