Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): extend managedstream to support…
Browse files Browse the repository at this point in the history
… call options (#5078)

BREAKING CHANGE:  changes function signatures to add variadic call options

This plumbs the ability to pass gax.CallOption opts to the
underlying client underpinning the ManagedStream.  It also
adds a WithAppendRowsCallOption option to the constructor,
as well as adding direct option passing for operations like
Finalize() and FlushRows().

Towards: #4366
  • Loading branch information
shollyman committed Nov 4, 2021
1 parent bbf4d04 commit fbc2717
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 14 deletions.
8 changes: 5 additions & 3 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -98,11 +98,13 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
c: c,
ctx: ctx,
cancel: cancel,
open: func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
callOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
open: func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
arc, err := streamFunc(
// Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually.
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)),
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)))
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)))
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -79,9 +79,10 @@ type ManagedStream struct {
fc *flowController

// aspects of the stream client
ctx context.Context // retained context for the stream
cancel context.CancelFunc
open func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
ctx context.Context // retained context for the stream
cancel context.CancelFunc
callOptions []gax.CallOption // options passed when opening an append client
open func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection

mu sync.Mutex
arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
Expand Down Expand Up @@ -141,14 +142,14 @@ func (ms *ManagedStream) StreamType() StreamType {

// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling
// this method for other stream types yields an error.
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) {
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) {
req := &storagepb.FlushRowsRequest{
WriteStream: ms.streamSettings.streamID,
Offset: &wrapperspb.Int64Value{
Value: offset,
},
}
resp, err := ms.c.rawClient.FlushRows(ctx, req)
resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...)
recordStat(ms.ctx, FlushRequests, 1)
if err != nil {
return 0, err
Expand All @@ -161,12 +162,12 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, er
//
// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
// data in a PendingStream.
func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) {
func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) {
// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
req := &storagepb.FinalizeWriteStreamRequest{
Name: ms.streamSettings.streamID,
}
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req)
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req, opts...)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -208,7 +209,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
if ms.streamSettings != nil {
streamID = ms.streamSettings.streamID
}
arc, err := ms.open(streamID)
arc, err := ms.open(streamID, ms.callOptions...)
bo, shouldRetry := r.Retry(err)
if err != nil && shouldRetry {
recordStat(ms.ctx, AppendClientOpenRetryCount, 1)
Expand Down
5 changes: 3 additions & 2 deletions bigquery/storage/managedwriter/managed_stream_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/googleapis/gax-go/v2"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestManagedStream_OpenWithRetry(t *testing.T) {
for _, tc := range testCases {
ms := &ManagedStream{
ctx: context.Background(),
open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
if len(tc.errors) == 0 {
panic("out of errors")
}
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {

ms := &ManagedStream{
ctx: ctx,
open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
testARC.openCount = testARC.openCount + 1
return testARC, nil
},
Expand Down
13 changes: 12 additions & 1 deletion bigquery/storage/managedwriter/writer_option.go
Expand Up @@ -14,7 +14,10 @@

package managedwriter

import "google.golang.org/protobuf/types/descriptorpb"
import (
"github.com/googleapis/gax-go/v2"
"google.golang.org/protobuf/types/descriptorpb"
)

// WriterOption are variadic options used to configure a ManagedStream instance.
type WriterOption func(*ManagedStream)
Expand Down Expand Up @@ -85,3 +88,11 @@ func WithDataOrigin(dataOrigin string) WriterOption {
ms.streamSettings.dataOrigin = dataOrigin
}
}

// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when
// it opens the underlying append stream.
func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
return func(ms *ManagedStream) {
ms.callOptions = append(ms.callOptions, o)
}
}
15 changes: 15 additions & 0 deletions bigquery/storage/managedwriter/writer_option_test.go
Expand Up @@ -19,6 +19,8 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc"
)

func TestWriterOptions(t *testing.T) {
Expand Down Expand Up @@ -94,6 +96,19 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "WithCallOption",
options: []WriterOption{WithAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
callOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
}
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
Expand Down

0 comments on commit fbc2717

Please sign in to comment.