Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): address possible panic due to fl…
Browse files Browse the repository at this point in the history
…ow (#5436)

control

An exhausted flow controller will emit error rather than block.  When
this occured, we were marking the write done, but allowing the append to
proceed, getting us into another double-close situation, which yields a
panic.
  • Loading branch information
shollyman committed Feb 3, 2022
1 parent 8e91ed1 commit 50c6e38
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
1 change: 1 addition & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -350,6 +350,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
if err := ms.fc.acquire(ctx, pw.reqSize); err != nil {
// in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release.
pw.markDone(NoStreamOffset, err, nil)
return nil, err
}
// if we've received an updated schema as part of a write, propagate it to both the cached schema and
// populate the schema in the request.
Expand Down
33 changes: 33 additions & 0 deletions bigquery/storage/managedwriter/managed_stream_test.go
Expand Up @@ -17,6 +17,7 @@ package managedwriter
import (
"context"
"testing"
"time"

"github.com/googleapis/gax-go/v2"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
Expand Down Expand Up @@ -193,3 +194,35 @@ func (tarc *testAppendRowsClient) Send(req *storagepb.AppendRowsRequest) error {
func (tarc *testAppendRowsClient) Recv() (*storagepb.AppendRowsResponse, error) {
return tarc.recvF()
}

func TestManagedStream_FlowControllerFailure(t *testing.T) {

ctx := context.Background()

// create a flowcontroller with 1 inflight message allowed, and exhaust it.
fc := newFlowController(1, 0)
fc.acquire(ctx, 0)

// Construct a skeleton ManagedStream. This doesn't include an ARC or open func
// because this test should never invoke it.
ms := &ManagedStream{
ctx: ctx,
streamSettings: defaultStreamSettings(),
fc: fc,
}

fakeData := [][]byte{
[]byte("foo"),
[]byte("bar"),
}

// Create a context that will expire during the append.
// This is expected to surface a flowcontroller error, as there's no
// capacity.
expireCtx, _ := context.WithTimeout(ctx, 100*time.Millisecond)
_, err := ms.AppendRows(expireCtx, fakeData)
if err == nil {
t.Errorf("expected AppendRows to error, but it succeeded")
}

}

0 comments on commit 50c6e38

Please sign in to comment.