Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): wire in flow controller (#4501)
Browse files Browse the repository at this point in the history
* feat(bigquery/storage/managedwriter): wire in flow controller

This adds a flow controller to the managed stream, guarding AppendRows.

This change puts the release responsibility for the flow controller in
the markDone() method of the pendingWrite.  Flow controller code is
already well tested via unit tests, so this doesn't add any additional
tests.
  • Loading branch information
shollyman committed Jul 26, 2021
1 parent b085384 commit 40571fa
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 19 deletions.
7 changes: 6 additions & 1 deletion bigquery/storage/managedwriter/appendresult.go
Expand Up @@ -106,7 +106,7 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {

// markDone propagates finalization of an append request to associated
// AppendResult references.
func (pw *pendingWrite) markDone(startOffset int64, err error) {
func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowController) {
curOffset := startOffset
for _, ar := range pw.results {
if err != nil {
Expand All @@ -124,4 +124,9 @@ func (pw *pendingWrite) markDone(startOffset int64, err error) {
}
// Clear the reference to the request.
pw.request = nil
// if there's a flow controller, signal release. The only time this should be nil is when
// encountering issues with flow control during enqueuing the initial request.
if fc != nil {
fc.release(pw.reqSize)
}
}
4 changes: 2 additions & 2 deletions bigquery/storage/managedwriter/appendresult_test.go
Expand Up @@ -45,7 +45,7 @@ func TestPendingWrite(t *testing.T) {
if pending.request.GetOffset() != nil {
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
}
pending.markDone(NoStreamOffset, nil)
pending.markDone(NoStreamOffset, nil, nil)
for k, ar := range pending.results {
if ar.offset != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoStreamOffset)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestPendingWrite(t *testing.T) {
// verify completion behavior
reportedOffset := int64(101)
wantErr := fmt.Errorf("foo")
pending.markDone(reportedOffset, wantErr)
pending.markDone(reportedOffset, wantErr, nil)

if pending.request != nil {
t.Errorf("expected request to be cleared, is present: %#v", pending.request)
Expand Down
7 changes: 5 additions & 2 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -119,10 +119,13 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
streamName = resp.GetName()
}
ms.streamSettings.streamID = streamName
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
}
}

if ms.streamSettings != nil {
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
} else {
ms.fc = newFlowController(0, 0)
}
return ms, nil
}

Expand Down
4 changes: 2 additions & 2 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -177,7 +177,7 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(data, NoStreamOffset)
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
Expand All @@ -195,7 +195,7 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := append(data, b)
results, err = ms.AppendRows(data, NoStreamOffset)
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("grouped-row append failed: %v", err)
}
Expand Down
28 changes: 18 additions & 10 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -71,6 +71,7 @@ type ManagedStream struct {
schemaDescriptor *descriptorpb.DescriptorProto
destinationTable string
c *Client
fc *flowController

// aspects of the stream client
ctx context.Context // retained context for the stream
Expand Down Expand Up @@ -204,7 +205,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
// The channel relationship with its ARC is 1:1. If we get a new ARC, create a new chan
// and fire up the associated receive processor.
ch := make(chan *pendingWrite)
go recvProcessor(ms.ctx, arc, ch)
go recvProcessor(ms.ctx, arc, ms.fc, ch)
// Also, replace the sync.Once for setting up a new stream, as we need to do "special" work
// for every new connection.
ms.streamSetup = new(sync.Once)
Expand Down Expand Up @@ -293,11 +294,17 @@ func (ms *ManagedStream) Close() error {
}

// AppendRows sends the append requests to the service, and returns one AppendResult per row.
func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResult, error) {
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) {
pw := newPendingWrite(data, offset)
// check flow control
if err := ms.fc.acquire(ctx, pw.reqSize); err != nil {
// in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release.
pw.markDone(NoStreamOffset, err, nil)
}
// proceed to call
if err := ms.append(pw); err != nil {
// pending write is DOA, mark it done.
pw.markDone(NoStreamOffset, err)
// pending write is DOA.
pw.markDone(NoStreamOffset, err, ms.fc)
return nil, err
}
return pw.results, nil
Expand All @@ -307,7 +314,7 @@ func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResul
//
// The receive processor only deals with a single instance of a connection/channel, and thus should never interact
// with the mutex lock.
func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) {
// TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply
// ensure that pending writes get acknowledged with a terminal state.
for {
Expand All @@ -319,7 +326,7 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
if !ok {
return
}
pw.markDone(NoStreamOffset, ctx.Err())
pw.markDone(NoStreamOffset, ctx.Err(), fc)
}
case nextWrite, ok := <-ch:
if !ok {
Expand All @@ -330,19 +337,20 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
// block until we get a corresponding response or err from stream.
resp, err := arc.Recv()
if err != nil {
nextWrite.markDone(NoStreamOffset, err)
nextWrite.markDone(NoStreamOffset, err, fc)
}

if status := resp.GetError(); status != nil {
nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status))
fc.release(nextWrite.reqSize)
nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc)
continue
}
success := resp.GetAppendResult()
off := success.GetOffset()
if off != nil {
nextWrite.markDone(off.GetValue(), nil)
nextWrite.markDone(off.GetValue(), nil, fc)
}
nextWrite.markDone(NoStreamOffset, nil)
nextWrite.markDone(NoStreamOffset, nil, fc)
}
}
}
7 changes: 5 additions & 2 deletions bigquery/storage/managedwriter/managed_stream_test.go
Expand Up @@ -87,6 +87,8 @@ func TestManagedStream_OpenWithRetry(t *testing.T) {

func TestManagedStream_FirstAppendBehavior(t *testing.T) {

ctx := context.Background()

var testARC *testAppendRowsClient
testARC = &testAppendRowsClient{
recvF: func() (*storagepb.AppendRowsResponse, error) {
Expand All @@ -104,12 +106,13 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {
}

ms := &ManagedStream{
ctx: context.Background(),
ctx: ctx,
open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) {
testARC.openCount = testARC.openCount + 1
return testARC, nil
},
streamSettings: defaultStreamSettings(),
fc: newFlowController(0, 0),
}
ms.streamSettings.streamID = "FOO"
ms.streamSettings.TracePrefix = "TRACE"
Expand All @@ -123,7 +126,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {
wantReqs := 3

for i := 0; i < wantReqs; i++ {
_, err := ms.AppendRows(fakeData, NoStreamOffset)
_, err := ms.AppendRows(ctx, fakeData, NoStreamOffset)
if err != nil {
t.Errorf("AppendRows; %v", err)
}
Expand Down

0 comments on commit 40571fa

Please sign in to comment.