Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed May 5, 2021
1 parent 9619b90 commit f17a2ba
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -224,6 +224,12 @@ func (ts *testSubscribeStream) SendBatchFlowControl() {
ts.sub.sendBatchFlowControl()
}

func (ts *testSubscribeStream) PendingFlowControlRequest() *pb.FlowControlRequest {
ts.sub.mu.Lock()
defer ts.sub.mu.Unlock()
return ts.sub.flowControl.pendingTokens.ToFlowControlRequest()
}

func TestSubscribeStreamReconnect(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
acks := newAckTracker()
Expand Down Expand Up @@ -329,6 +335,55 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) {
}
}

func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
acks := newAckTracker()
// MaxOutstandingBytes = 1000, so this pushes the pending flow control bytes
// over the expediteBatchRequestRatio=50% threshold in flowControlBatcher.
msg := seqMsgWithOffsetAndSize(67, 800)
retryableErr := status.Error(codes.Unavailable, "unavailable")
serverErr := status.Error(codes.InvalidArgument, "verifies flow control received")

verifiers := test.NewVerifiers(t)

stream1 := test.NewRPCVerifier(t)
stream1.Push(initSubReq(subscription), initSubResp(), nil)
stream1.Push(initFlowControlReq(), msgSubResp(msg), nil)
// Break the stream immediately after sending the message.
stream1.Push(nil, nil, retryableErr)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream1)

stream2 := test.NewRPCVerifier(t)
// The barrier is used to pause in the middle of stream reconnection.
barrier := stream2.PushWithBarrier(initSubReq(subscription), initSubResp(), nil)
stream2.Push(seekReq(68), seekResp(68), nil)
// Full flow control tokens should be sent after stream has connected.
stream2.Push(initFlowControlReq(), nil, serverErr)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream2)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
if gotErr := sub.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}

sub.Receiver.ValidateMsg(msg)
barrier.Release()

// While the stream is not connected, the pending flow control request should
// not be released and sent to the stream.
sub.sub.onAckAsync(msg.SizeBytes)
if sub.PendingFlowControlRequest() == nil {
t.Errorf("Pending flow control request should not be cleared")
}

if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
}
}

func TestSubscribeStreamInvalidInitialResponse(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
acks := newAckTracker()
Expand Down

0 comments on commit f17a2ba

Please sign in to comment.