Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed May 5, 2021
1 parent 9619b90 commit 233c54d
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -329,6 +329,57 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) {
}
}

func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
acks := newAckTracker()
// MaxOutstandingBytes = 1000, so this pushes the pending flow control bytes
// over the expediteBatchRequestRatio=50% threshold in flowControlBatcher.
msg := seqMsgWithOffsetAndSize(67, 800)
retryableErr := status.Error(codes.Unavailable, "unavailable")
serverErr := status.Error(codes.InvalidArgument, "verifies flow control received")

verifiers := test.NewVerifiers(t)

stream1 := test.NewRPCVerifier(t)
stream1.Push(initSubReq(subscription), initSubResp(), nil)
stream1.Push(initFlowControlReq(), msgSubResp(msg), nil)
// Break the stream immediately after sending the message.
stream1.Push(nil, nil, retryableErr)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream1)

stream2 := test.NewRPCVerifier(t)
// The barrier is used to pause in the middle of stream reconnection.
barrier := stream2.PushWithBarrier(initSubReq(subscription), initSubResp(), nil)
stream2.Push(seekReq(68), seekResp(68), nil)
// Full flow control tokens should be sent after stream has connected.
stream2.Push(initFlowControlReq(), nil, serverErr)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream2)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
if gotErr := sub.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}

sub.Receiver.ValidateMsg(msg)
barrier.Release()

// While the stream is not connected, the pending tokens should not be
// released and sent to the stream.
sub.sub.onAckAsync(msg.SizeBytes)
sub.sub.mu.Lock()
if sub.sub.flowControl.pendingTokens.ToFlowControlRequest() == nil {
t.Errorf("Pending flow control tokens should not be cleared")
}
sub.sub.mu.Unlock()

if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
}
}

func TestSubscribeStreamInvalidInitialResponse(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
acks := newAckTracker()
Expand Down

0 comments on commit 233c54d

Please sign in to comment.