Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): wire in flow controller #4501

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion bigquery/storage/managedwriter/appendresult.go
Expand Up @@ -106,7 +106,7 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {

// markDone propagates finalization of an append request to associated
// AppendResult references.
func (pw *pendingWrite) markDone(startOffset int64, err error) {
func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowController) {
curOffset := startOffset
for _, ar := range pw.results {
if err != nil {
Expand All @@ -124,4 +124,9 @@ func (pw *pendingWrite) markDone(startOffset int64, err error) {
}
// Clear the reference to the request.
pw.request = nil
// if there's a flow controller, signal release. The only time this should be nil is when
// encountering issues with flow control during enqueuing the initial request.
if fc != nil {
fc.release(pw.reqSize)
}
}
4 changes: 2 additions & 2 deletions bigquery/storage/managedwriter/appendresult_test.go
Expand Up @@ -45,7 +45,7 @@ func TestPendingWrite(t *testing.T) {
if pending.request.GetOffset() != nil {
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
}
pending.markDone(NoStreamOffset, nil)
pending.markDone(NoStreamOffset, nil, nil)
for k, ar := range pending.results {
if ar.offset != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoStreamOffset)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestPendingWrite(t *testing.T) {
// verify completion behavior
reportedOffset := int64(101)
wantErr := fmt.Errorf("foo")
pending.markDone(reportedOffset, wantErr)
pending.markDone(reportedOffset, wantErr, nil)

if pending.request != nil {
t.Errorf("expected request to be cleared, is present: %#v", pending.request)
Expand Down
7 changes: 5 additions & 2 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -119,10 +119,13 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
streamName = resp.GetName()
}
ms.streamSettings.streamID = streamName
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
}
}

if ms.streamSettings != nil {
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
} else {
ms.fc = newFlowController(0, 0)
}
return ms, nil
}

Expand Down
4 changes: 2 additions & 2 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -177,7 +177,7 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(data, NoStreamOffset)
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
Expand All @@ -195,7 +195,7 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := append(data, b)
results, err = ms.AppendRows(data, NoStreamOffset)
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("grouped-row append failed: %v", err)
}
Expand Down
28 changes: 18 additions & 10 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -71,6 +71,7 @@ type ManagedStream struct {
schemaDescriptor *descriptorpb.DescriptorProto
destinationTable string
c *Client
fc *flowController

// aspects of the stream client
ctx context.Context // retained context for the stream
Expand Down Expand Up @@ -204,7 +205,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
// The channel relationship with its ARC is 1:1. If we get a new ARC, create a new chan
// and fire up the associated receive processor.
ch := make(chan *pendingWrite)
go recvProcessor(ms.ctx, arc, ch)
go recvProcessor(ms.ctx, arc, ms.fc, ch)
// Also, replace the sync.Once for setting up a new stream, as we need to do "special" work
// for every new connection.
ms.streamSetup = new(sync.Once)
Expand Down Expand Up @@ -293,11 +294,17 @@ func (ms *ManagedStream) Close() error {
}

// AppendRows sends the append requests to the service, and returns one AppendResult per row.
func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResult, error) {
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) {
pw := newPendingWrite(data, offset)
// check flow control
if err := ms.fc.acquire(ctx, pw.reqSize); err != nil {
// in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release.
pw.markDone(NoStreamOffset, err, nil)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shollyman Is there a missing return here? We've observed the following panic, which seems like it could be related to markDone resetting pw.request above before the append call proceeds below.

runtime error: invalid memory address or nil pointer dereference
at cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).append ( /go/src/github.com/ScriptRock/webscan-golang/vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:284 )
at cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).AppendRows ( /go/src/github.com/ScriptRock/webscan-golang/vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:363 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #5428 to follow up on this. Sheer luck I saw this comment, but looks likely.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

// proceed to call
if err := ms.append(pw); err != nil {
// pending write is DOA, mark it done.
pw.markDone(NoStreamOffset, err)
// pending write is DOA.
pw.markDone(NoStreamOffset, err, ms.fc)
return nil, err
}
return pw.results, nil
Expand All @@ -307,7 +314,7 @@ func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResul
//
// The receive processor only deals with a single instance of a connection/channel, and thus should never interact
// with the mutex lock.
func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) {
// TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply
// ensure that pending writes get acknowledged with a terminal state.
for {
Expand All @@ -319,7 +326,7 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
if !ok {
return
}
pw.markDone(NoStreamOffset, ctx.Err())
pw.markDone(NoStreamOffset, ctx.Err(), fc)
}
case nextWrite, ok := <-ch:
if !ok {
Expand All @@ -330,19 +337,20 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
// block until we get a corresponding response or err from stream.
resp, err := arc.Recv()
if err != nil {
nextWrite.markDone(NoStreamOffset, err)
nextWrite.markDone(NoStreamOffset, err, fc)
}

if status := resp.GetError(); status != nil {
nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status))
fc.release(nextWrite.reqSize)
nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc)
continue
}
success := resp.GetAppendResult()
off := success.GetOffset()
if off != nil {
nextWrite.markDone(off.GetValue(), nil)
nextWrite.markDone(off.GetValue(), nil, fc)
}
nextWrite.markDone(NoStreamOffset, nil)
nextWrite.markDone(NoStreamOffset, nil, fc)
}
}
}
7 changes: 5 additions & 2 deletions bigquery/storage/managedwriter/managed_stream_test.go
Expand Up @@ -87,6 +87,8 @@ func TestManagedStream_OpenWithRetry(t *testing.T) {

func TestManagedStream_FirstAppendBehavior(t *testing.T) {

ctx := context.Background()

var testARC *testAppendRowsClient
testARC = &testAppendRowsClient{
recvF: func() (*storagepb.AppendRowsResponse, error) {
Expand All @@ -104,12 +106,13 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {
}

ms := &ManagedStream{
ctx: context.Background(),
ctx: ctx,
open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) {
testARC.openCount = testARC.openCount + 1
return testARC, nil
},
streamSettings: defaultStreamSettings(),
fc: newFlowController(0, 0),
}
ms.streamSettings.streamID = "FOO"
ms.streamSettings.TracePrefix = "TRACE"
Expand All @@ -123,7 +126,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {
wantReqs := 3

for i := 0; i < wantReqs; i++ {
_, err := ms.AppendRows(fakeData, NoStreamOffset)
_, err := ms.AppendRows(ctx, fakeData, NoStreamOffset)
if err != nil {
t.Errorf("AppendRows; %v", err)
}
Expand Down