Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): support variadic appends (#5102)
Browse files Browse the repository at this point in the history
* feat(bigquery/storage/managedwriter): support variadic AppendOption
  • Loading branch information
shollyman committed Dec 1, 2021
1 parent c9cd984 commit 014b314
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 122 deletions.
30 changes: 25 additions & 5 deletions bigquery/storage/managedwriter/appendresult.go
Expand Up @@ -19,6 +19,7 @@ import (

storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

Expand Down Expand Up @@ -66,7 +67,9 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
// append request.
type pendingWrite struct {
request *storagepb.AppendRowsRequest
result *AppendResult
// for schema evolution cases, accept a new schema
newSchema *descriptorpb.DescriptorProto
result *AppendResult

// this is used by the flow controller.
reqSize int
Expand All @@ -77,7 +80,7 @@ type pendingWrite struct {
// that in the future, we may want to allow row batching to be managed by
// the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING
// streams, this should be managed by the user.
func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
func newPendingWrite(appends [][]byte) *pendingWrite {
pw := &pendingWrite{
request: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
Expand All @@ -90,9 +93,6 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
},
result: newAppendResult(appends),
}
if offset > 0 {
pw.request.Offset = &wrapperspb.Int64Value{Value: offset}
}
// We compute the size now for flow controller purposes, though
// the actual request size may be slightly larger (e.g. the first
// request in a new stream bears schema and stream id).
Expand All @@ -114,3 +114,23 @@ func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowControlle
fc.release(pw.reqSize)
}
}

// AppendOption are options that can be passed when appending data with a managed stream instance.
type AppendOption func(*pendingWrite)

// UpdateSchemaDescriptor is used to update the descriptor message schema associated
// with a given stream.
func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption {
return func(pw *pendingWrite) {
pw.newSchema = schema
}
}

// WithOffset sets an explicit offset value for this append request.
func WithOffset(offset int64) AppendOption {
return func(pw *pendingWrite) {
pw.request.Offset = &wrapperspb.Int64Value{
Value: offset,
}
}
}
60 changes: 32 additions & 28 deletions bigquery/storage/managedwriter/appendresult_test.go
Expand Up @@ -43,38 +43,33 @@ func TestPendingWrite(t *testing.T) {
[]byte("row3"),
}

var wantOffset int64 = 99

// first, verify no offset behavior
pending := newPendingWrite(wantRowData, NoStreamOffset)
// verify no offset behavior
pending := newPendingWrite(wantRowData)
if pending.request.GetOffset() != nil {
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
}
pending.markDone(NoStreamOffset, nil, nil)
if pending.result.offset != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset)
}
if pending.result.err != nil {
t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
}

// now, verify behavior with a valid offset
pending = newPendingWrite(wantRowData, 99)
if pending.request.GetOffset() == nil {
t.Errorf("offset not set, should be %d", wantOffset)
}
if gotOffset := pending.request.GetOffset().GetValue(); gotOffset != wantOffset {
t.Errorf("offset mismatch, got %d want %d", gotOffset, wantOffset)
}

// check request shape
gotRowCount := len(pending.request.GetProtoRows().GetRows().GetSerializedRows())
if gotRowCount != len(wantRowData) {
t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData))
}

// verify AppendResult
// Verify request is not acknowledged.
select {
case <-pending.result.Ready():
t.Errorf("got Ready() on incomplete AppendResult")
case <-time.After(100 * time.Millisecond):

}

// Mark completed, verify result.
pending.markDone(NoStreamOffset, nil, nil)
if pending.result.offset != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset)
}
if pending.result.err != nil {
t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
}
gotData := pending.result.rowData
if len(gotData) != len(wantRowData) {
t.Errorf("length mismatch on appendresult, got %d, want %d", len(gotData), len(wantRowData))
Expand All @@ -84,15 +79,24 @@ func TestPendingWrite(t *testing.T) {
t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i])
}
}
select {
case <-pending.result.Ready():
t.Errorf("got Ready() on incomplete AppendResult")
case <-time.After(100 * time.Millisecond):

}
// Create new write to verify error result.
pending = newPendingWrite(wantRowData)

// verify completion behavior
// Manually invoke option to apply offset to request.
// This would normally be appied as part of the AppendRows() method on the managed stream.
reportedOffset := int64(101)
f := WithOffset(reportedOffset)
f(pending)

if pending.request.GetOffset() == nil {
t.Errorf("expected offset, got none")
}
if pending.request.GetOffset().GetValue() != reportedOffset {
t.Errorf("offset mismatch, got %d wanted %d", pending.request.GetOffset().GetValue(), reportedOffset)
}

// Verify completion behavior with an error.
wantErr := fmt.Errorf("foo")
pending.markDone(reportedOffset, wantErr, nil)

Expand Down
108 changes: 99 additions & 9 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -144,6 +144,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testPendingStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("SchemaEvolution", func(t *testing.T) {
t.Parallel()
testSchemaEvolution(ctx, t, mwClient, bqClient, dataset)
})
t.Run("Instrumentation", func(t *testing.T) {
// Don't run this in parallel, we only want to collect stats from this subtest.
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
Expand Down Expand Up @@ -181,7 +185,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
Expand All @@ -201,7 +205,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
}
data = append(data, b)
}
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("grouped-row append failed: %v", err)
}
Expand Down Expand Up @@ -256,7 +260,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
if err != nil {
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
}
result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
Expand Down Expand Up @@ -305,7 +309,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err := ms.AppendRows(ctx, data, NoStreamOffset)
results, err := ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
Expand Down Expand Up @@ -358,7 +362,7 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
Expand Down Expand Up @@ -397,12 +401,19 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
// be explicit about waiting/checking each response.
off, err := result.GetResult(ctx)
if err != nil {
t.Errorf("response %d error: %v", k, err)
}
if off != int64(k) {
t.Errorf("offset mismatch, got %d want %d", off, k)
}
}
result.Ready()
wantRows := int64(len(testSimpleData))

// Mark stream complete.
Expand Down Expand Up @@ -468,7 +479,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
Expand Down Expand Up @@ -513,6 +524,85 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
}
}

func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData))))

// Now, evolve the underlying table schema.
_, err = testTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: testdata.SimpleMessageEvolvedSchema}, "")
if err != nil {
t.Errorf("failed to evolve table schema: %v", err)
}

// TODO: we need a more elegant mechanism for detecting when the backend has registered the schema change.
// In the continuous case, we'd get it from the response, but the change-and-wait case needs something more.
time.Sleep(6 * time.Second)

// ready descriptor, send an additional append
m2 := &testdata.SimpleMessageEvolvedProto2{
Name: proto.String("evolved"),
Value: proto.Int64(180),
Other: proto.String("hello evolution"),
}
descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor())
b, err := proto.Marshal(m2)
if err != nil {
t.Errorf("failed to marshal evolved message: %v", err)
}
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on evolved append: %v", err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData)+1)),
withNullCount("name", 0),
withNonNullCount("other", 1),
)
}

func TestIntegration_DetectProjectID(t *testing.T) {
ctx := context.Background()
testCreds := testutil.Credentials(ctx)
Expand Down Expand Up @@ -613,7 +703,7 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client,
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset)
result, err := ms.AppendRows(ctx, [][]byte{sampleRow})
if err != nil {
t.Errorf("append failed: %v", err)
}
Expand Down

0 comments on commit 014b314

Please sign in to comment.