From dc0103baeaf168474b9e163f0aa5f7555170ffc4 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 6 May 2021 09:23:50 +1000 Subject: [PATCH] fix(pubsublite): prevent subscriber flow control token races (#4060) Ensure that we send flowControlBatcher.RequestForRestart immediately after a new Subscribe stream is connected, before any batched flow control tokens are sent. --- pubsublite/internal/test/verifier.go | 14 +++++- pubsublite/internal/wire/subscriber.go | 24 ++++++--- pubsublite/internal/wire/subscriber_test.go | 55 +++++++++++++++++++++ 3 files changed, 83 insertions(+), 10 deletions(-) diff --git a/pubsublite/internal/test/verifier.go b/pubsublite/internal/test/verifier.go index ebc742fe2ab..99ae135bc39 100644 --- a/pubsublite/internal/test/verifier.go +++ b/pubsublite/internal/test/verifier.go @@ -48,8 +48,8 @@ func newBarrier() *Barrier { } } -// Release should be called by the test. -func (b *Barrier) Release() { +// ReleaseAfter releases the barrier, after invoking f provided by the test. +func (b *Barrier) ReleaseAfter(f func()) { // Wait for the server to reach the barrier. select { case <-time.After(blockWaitTimeout): @@ -58,10 +58,20 @@ func (b *Barrier) Release() { case <-b.serverBlock: } + // Run any test-specific code. + if f != nil { + f() + } + // Then close the client block. close(b.clientBlock) } +// Release should be called by the test. +func (b *Barrier) Release() { + b.ReleaseAfter(nil) +} + func (b *Barrier) serverWait() error { if b.err != nil { return b.err diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 18fc516ee11..56344cae0d2 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -123,12 +123,13 @@ type subscribeStream struct { metadata pubsubMetadata // Fields below must be guarded with mu. - messageQueue *messageDeliveryQueue - stream *retryableStream - offsetTracker subscriberOffsetTracker - flowControl flowControlBatcher - pollFlowControl *periodicTask - seekInFlight bool + messageQueue *messageDeliveryQueue + stream *retryableStream + offsetTracker subscriberOffsetTracker + flowControl flowControlBatcher + pollFlowControl *periodicTask + seekInFlight bool + enableBatchFlowControl bool abstractService } @@ -223,10 +224,14 @@ func (s *subscribeStream) onStreamStatusChange(status streamStatus) { s.seekInFlight = true } s.unsafeSendFlowControl(s.flowControl.RequestForRestart()) + s.enableBatchFlowControl = true s.pollFlowControl.Start() case streamReconnecting: s.seekInFlight = false + // Ensure no batch flow control tokens are sent until the RequestForRestart + // is sent above when a new subscribe stream is initialized. + s.enableBatchFlowControl = false s.pollFlowControl.Stop() case streamTerminated: @@ -301,12 +306,15 @@ func (s *subscribeStream) onAckAsync(msgBytes int64) { func (s *subscribeStream) sendBatchFlowControl() { s.mu.Lock() defer s.mu.Unlock() - s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest()) + + if s.enableBatchFlowControl { + s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest()) + } } func (s *subscribeStream) unsafeAllowFlow(allow flowControlTokens) { s.flowControl.OnClientFlow(allow) - if s.flowControl.ShouldExpediteBatchRequest() { + if s.flowControl.ShouldExpediteBatchRequest() && s.enableBatchFlowControl { s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest()) } } diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index 5b4116a22be..dfd40cd7bcc 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -224,6 +224,12 @@ func (ts *testSubscribeStream) SendBatchFlowControl() { ts.sub.sendBatchFlowControl() } +func (ts *testSubscribeStream) PendingFlowControlRequest() *pb.FlowControlRequest { + ts.sub.mu.Lock() + defer ts.sub.mu.Unlock() + return ts.sub.flowControl.pendingTokens.ToFlowControlRequest() +} + func TestSubscribeStreamReconnect(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} acks := newAckTracker() @@ -329,6 +335,55 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) { } } +func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} + acks := newAckTracker() + // MaxOutstandingBytes = 1000, so this pushes the pending flow control bytes + // over the expediteBatchRequestRatio=50% threshold in flowControlBatcher. + msg := seqMsgWithOffsetAndSize(67, 800) + retryableErr := status.Error(codes.Unavailable, "unavailable") + serverErr := status.Error(codes.InvalidArgument, "verifies flow control received") + + verifiers := test.NewVerifiers(t) + + stream1 := test.NewRPCVerifier(t) + stream1.Push(initSubReq(subscription), initSubResp(), nil) + stream1.Push(initFlowControlReq(), msgSubResp(msg), nil) + // Break the stream immediately after sending the message. + stream1.Push(nil, nil, retryableErr) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream1) + + stream2 := test.NewRPCVerifier(t) + // The barrier is used to pause in the middle of stream reconnection. + barrier := stream2.PushWithBarrier(initSubReq(subscription), initSubResp(), nil) + stream2.Push(seekReq(68), seekResp(68), nil) + // Full flow control tokens should be sent after stream has connected. + stream2.Push(initFlowControlReq(), nil, serverErr) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + if gotErr := sub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + sub.Receiver.ValidateMsg(msg) + barrier.ReleaseAfter(func() { + // While the stream is not connected, the pending flow control request + // should not be released and sent to the stream. + sub.sub.onAckAsync(msg.SizeBytes) + if sub.PendingFlowControlRequest() == nil { + t.Errorf("Pending flow control request should not be cleared") + } + }) + + if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { + t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr) + } +} + func TestSubscribeStreamInvalidInitialResponse(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} acks := newAckTracker()