Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): support schema change notificat…
Browse files Browse the repository at this point in the history
…ion (#5253)

* feat(bigquery/storage/managedwriter): support schema change notification

This adds a way for user to get schema changes by checking AppendResult.
  • Loading branch information
shollyman committed Dec 30, 2021
1 parent c276428 commit 70e40db
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
15 changes: 15 additions & 0 deletions bigquery/storage/managedwriter/appendresult.go
Expand Up @@ -16,6 +16,7 @@ package managedwriter

import (
"context"
"fmt"

storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/proto"
Expand All @@ -38,6 +39,9 @@ type AppendResult struct {

// the stream offset
offset int64

// retains the updated schema from backend response. Used for schema change notification.
updatedSchema *storagepb.TableSchema
}

func newAppendResult(data [][]byte) *AppendResult {
Expand All @@ -62,6 +66,17 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
}
}

// UpdatedSchema returns the updated schema for a table if supplied by the backend as part
// of the append response. It blocks until the result is ready.
func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSchema, error) {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context done")
case <-ar.Ready():
return ar.updatedSchema, nil
}
}

// pendingWrite tracks state for a set of rows that are part of a single
// append request.
type pendingWrite struct {
Expand Down
37 changes: 31 additions & 6 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -546,16 +546,20 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
withExactRowCount(0))

var result *AppendResult
var curOffset int64
var latestRow []byte
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
latestRow = b
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data)
result, err = ms.AppendRows(ctx, data, WithOffset(curOffset))
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
curOffset = curOffset + int64(len(data))
}
// wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
Expand All @@ -572,9 +576,30 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
t.Errorf("failed to evolve table schema: %v", err)
}

// TODO: we need a more elegant mechanism for detecting when the backend has registered the schema change.
// In the continuous case, we'd get it from the response, but the change-and-wait case needs something more.
time.Sleep(6 * time.Second)
// Resend latest row until we get a new schema notification.
// It _should_ be possible to send duplicates, but this currently will not propagate the schema error.
// Internal issue: b/211899346
for {
resp, err := ms.AppendRows(ctx, [][]byte{latestRow}, WithOffset(curOffset))
if err != nil {
t.Errorf("got error on dupe append: %v", err)
break
}
curOffset = curOffset + 1
if err != nil {
t.Errorf("got error on offset %d: %v", curOffset, err)
break
}
s, err := resp.UpdatedSchema(ctx)
if err != nil {
t.Errorf("getting schema error: %v", err)
break
}
if s != nil {
break
}

}

// ready descriptor, send an additional append
m2 := &testdata.SimpleMessageEvolvedProto2{
Expand All @@ -587,7 +612,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
if err != nil {
t.Errorf("failed to marshal evolved message: %v", err)
}
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto), WithOffset(curOffset))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
Expand All @@ -597,7 +622,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
}

validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData)+1)),
withExactRowCount(int64(curOffset+1)),
withNullCount("name", 0),
withNonNullCount("other", 1),
)
Expand Down
5 changes: 5 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -400,6 +400,11 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
}
recordStat(ctx, AppendResponses, 1)

// Retain the updated schema if present, for eventual presentation to the user.
if resp.GetUpdatedSchema() != nil {
nextWrite.result.updatedSchema = resp.GetUpdatedSchema()
}

if status := resp.GetError(); status != nil {
tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String()))
if err != nil {
Expand Down

0 comments on commit 70e40db

Please sign in to comment.