Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add support for snapshot/restore #4112

Merged
merged 9 commits into from Jun 22, 2021
19 changes: 19 additions & 0 deletions bigquery/copy.go
Expand Up @@ -20,6 +20,19 @@ import (
bq "google.golang.org/api/bigquery/v2"
)

// TableCopyOperationType is used to indicate the type of operation performed by a BigQuery
// copy job.
type TableCopyOperationType string

var (
// CopyOperation indicates normal table to table copying.
CopyOperation TableCopyOperationType = "COPY"
// SnapshotOperation indicates creating a snapshot from a regular table.
SnapshotOperation TableCopyOperationType = "SNAPSHOT"
// RestoreOperation indicates creating/restoring a table from a snapshot.
RestoreOperation TableCopyOperationType = "RESTORE"
)

// CopyConfig holds the configuration for a copy job.
type CopyConfig struct {
// Srcs are the tables from which data will be copied.
Expand All @@ -41,6 +54,10 @@ type CopyConfig struct {

// Custom encryption configuration (e.g., Cloud KMS keys).
DestinationEncryptionConfig *EncryptionConfig

// One of the supported operation types when executing a Table Copy jobs. By default this
// copies tables, but can also be set to perform snapshot or restore operations.
OperationType TableCopyOperationType
}

func (c *CopyConfig) toBQ() *bq.JobConfiguration {
Expand All @@ -56,6 +73,7 @@ func (c *CopyConfig) toBQ() *bq.JobConfiguration {
DestinationTable: c.Dst.toBQ(),
DestinationEncryptionConfiguration: c.DestinationEncryptionConfig.toBQ(),
SourceTables: ts,
OperationType: string(c.OperationType),
},
}
}
Expand All @@ -67,6 +85,7 @@ func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig {
WriteDisposition: TableWriteDisposition(q.Copy.WriteDisposition),
Dst: bqToTable(q.Copy.DestinationTable, c),
DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration),
OperationType: TableCopyOperationType(q.Copy.OperationType),
}
for _, t := range q.Copy.SourceTables {
cc.Srcs = append(cc.Srcs, bqToTable(t, c))
Expand Down
22 changes: 22 additions & 0 deletions bigquery/copy_test.go
Expand Up @@ -137,6 +137,28 @@ func TestCopy(t *testing.T) {
return j
}(),
},
{
dst: &Table{
ProjectID: "d-project-id",
DatasetID: "d-dataset-id",
TableID: "d-table-id",
},
srcs: []*Table{
{
ProjectID: "s-project-id",
DatasetID: "s-dataset-id",
TableID: "s-table-id",
},
},
config: CopyConfig{
OperationType: SnapshotOperation,
},
want: func() *bq.Job {
j := defaultCopyJob()
j.Configuration.Copy.OperationType = "SNAPSHOT"
return j
}(),
},
}
c := &Client{projectID: "client-project-id"}
for i, tc := range testCases {
Expand Down
90 changes: 90 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -438,6 +438,96 @@ func TestIntegration_TableMetadata(t *testing.T) {

}

func TestIntegration_SnapshotAndRestore(t *testing.T) {

if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

// instantiate a base table via a CTAS
baseTableID := tableIDs.New()
qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID)
sql := fmt.Sprintf(`
CREATE TABLE %s
(
sample_value INT64,
groupid STRING,
)
AS
SELECT
CAST(RAND() * 100 AS INT64),
CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING))
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if err := runQueryJob(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

// Create a snapshot. We'll select our snapshot time explicitly to validate the snapshot time is the same.
targetTime := time.Now()
snapshotID := tableIDs.New()
copier := dataset.Table(snapshotID).CopierFrom(dataset.Table(fmt.Sprintf("%s@%d", baseTableID, targetTime.UnixNano()/1e6)))
copier.OperationType = SnapshotOperation
job, err := copier.Run(ctx)
if err != nil {
t.Fatalf("couldn't run snapshot: %v", err)
}
status, err := job.Wait(ctx)
if err != nil {
t.Fatalf("polling snapshot failed: %v", err)
}
if status.Err() != nil {
t.Fatalf("snapshot failed in error: %v", status.Err())
}

// verify metadata on the snapshot
meta, err := dataset.Table(snapshotID).Metadata(ctx)
if err != nil {
t.Fatalf("couldn't get metadata from snapshot: %v", err)
}
if meta.Type != Snapshot {
t.Errorf("expected snapshot table type, got %s", meta.Type)
}
want := &SnapshotDefinition{
BaseTableReference: dataset.Table(baseTableID),
SnapshotTime: targetTime,
}
if diff := testutil.Diff(meta.SnapshotDefinition, want, cmp.AllowUnexported(Table{}), cmpopts.IgnoreUnexported(Client{}), cmpopts.EquateApproxTime(time.Millisecond)); diff != "" {
t.Fatalf("got=-, want=+:\n%s", diff)
shollyman marked this conversation as resolved.
Show resolved Hide resolved
}

// execute a restore using the snapshot.
restoreID := tableIDs.New()
restorer := dataset.Table(restoreID).CopierFrom(dataset.Table(snapshotID))
restorer.OperationType = RestoreOperation
job, err = restorer.Run(ctx)
if err != nil {
t.Fatalf("couldn't run restore: %v", err)
}
status, err = job.Wait(ctx)
if err != nil {
t.Fatalf("polling restore failed: %v", err)
}
if status.Err() != nil {
t.Fatalf("restore failed in error: %v", status.Err())
}

meta2, err := dataset.Table(restoreID).Metadata(ctx)
shollyman marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("couldn't get restored table metadata: %v", err)
}

if meta.NumBytes != meta2.NumBytes {
t.Errorf("bytes mismatch. snap had %d bytes, restore had %d bytes", meta.NumBytes, meta2.NumBytes)
}
if meta.NumRows != meta2.NumRows {
t.Errorf("row counts mismatch. snap had %d rows, restore had %d rows", meta.NumRows, meta2.NumRows)
}

}

func TestIntegration_HourTimePartitioning(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down
49 changes: 46 additions & 3 deletions bigquery/table.go
Expand Up @@ -120,6 +120,10 @@ type TableMetadata struct {
// This does not include data that is being buffered during a streaming insert.
NumRows uint64

// SnapshotDefinition contains additional information about the provenance of a
// given snapshot table.
SnapshotDefinition *SnapshotDefinition

// Contains information regarding this table's streaming buffer, if one is
// present. This field will be nil if the table is not being streamed to or if
// there is no data in the streaming buffer.
Expand Down Expand Up @@ -177,6 +181,9 @@ const (
// MaterializedView represents a managed storage table that's derived from
// a base table.
MaterializedView TableType = "MATERIALIZED_VIEW"
// Snapshot represents an immutable point in time snapshot of some other
// table.
Snapshot TableType = "SNAPSHOT"
)

// MaterializedViewDefinition contains information for materialized views.
Expand Down Expand Up @@ -223,6 +230,40 @@ func bqToMaterializedViewDefinition(q *bq.MaterializedViewDefinition) *Materiali
}
}

// SnapshotDefinition provides metadata related to the origin of a snapshot.
type SnapshotDefinition struct {

// BaseTableReference describes the ID of the table that this snapshot
// came from.
BaseTableReference *Table

// SnapshotTime indicates when the base table was snapshot.
SnapshotTime time.Time
}

func (sd *SnapshotDefinition) toBQ() *bq.SnapshotDefinition {
if sd == nil {
return nil
}
return &bq.SnapshotDefinition{
BaseTableReference: sd.BaseTableReference.toBQ(),
SnapshotTime: sd.SnapshotTime.Format(time.RFC3339),
}
}

func bqToSnapshotDefinition(q *bq.SnapshotDefinition, c *Client) *SnapshotDefinition {
if q == nil {
return nil
}
sd := &SnapshotDefinition{
BaseTableReference: bqToTable(q.BaseTableReference, c),
}
if t, err := time.Parse(time.RFC3339, q.SnapshotTime); err == nil {
shollyman marked this conversation as resolved.
Show resolved Hide resolved
sd.SnapshotTime = t
}
return sd
}

// TimePartitioningType defines the interval used to partition managed data.
type TimePartitioningType string

Expand Down Expand Up @@ -496,6 +537,7 @@ func (tm *TableMetadata) toBQ() (*bq.Table, error) {
t.RangePartitioning = tm.RangePartitioning.toBQ()
t.Clustering = tm.Clustering.toBQ()
t.RequirePartitionFilter = tm.RequirePartitionFilter
t.SnapshotDefinition = tm.SnapshotDefinition.toBQ()

if !validExpiration(tm.ExpirationTime) {
return nil, fmt.Errorf("invalid expiration time: %v.\n"+
Expand Down Expand Up @@ -554,10 +596,10 @@ func (t *Table) Metadata(ctx context.Context) (md *TableMetadata, err error) {
if err != nil {
return nil, err
}
return bqToTableMetadata(table)
return bqToTableMetadata(table, t.c)
}

func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) {
func bqToTableMetadata(t *bq.Table, c *Client) (*TableMetadata, error) {
md := &TableMetadata{
Description: t.Description,
Name: t.FriendlyName,
Expand All @@ -574,6 +616,7 @@ func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) {
ETag: t.Etag,
EncryptionConfig: bqToEncryptionConfig(t.EncryptionConfiguration),
RequirePartitionFilter: t.RequirePartitionFilter,
SnapshotDefinition: bqToSnapshotDefinition(t.SnapshotDefinition, c),
}
if t.MaterializedView != nil {
md.MaterializedView = bqToMaterializedViewDefinition(t.MaterializedView)
Expand Down Expand Up @@ -652,7 +695,7 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin
}); err != nil {
return nil, err
}
return bqToTableMetadata(res)
return bqToTableMetadata(res, t.c)
}

func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {
Expand Down
2 changes: 1 addition & 1 deletion bigquery/table_test.go
Expand Up @@ -113,7 +113,7 @@ func TestBQToTableMetadata(t *testing.T) {
},
},
} {
got, err := bqToTableMetadata(test.in)
got, err := bqToTableMetadata(test.in, &Client{})
if err != nil {
t.Fatal(err)
}
Expand Down