diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go index 5344e45edff..2d2a3244658 100644 --- a/pubsublite/internal/wire/committer.go +++ b/pubsublite/internal/wire/committer.go @@ -123,6 +123,7 @@ func (c *committer) BlockingReset() error { c.mu.Lock() defer c.mu.Unlock() + c.acks.Release() // Discard outstanding acks for !c.cursorTracker.UpToDate() && c.status < serviceTerminating { c.unsafeCommitOffsetToStream() c.flushPending.Wait() diff --git a/pubsublite/internal/wire/committer_test.go b/pubsublite/internal/wire/committer_test.go index 304b676626d..018886f355f 100644 --- a/pubsublite/internal/wire/committer_test.go +++ b/pubsublite/internal/wire/committer_test.go @@ -328,8 +328,7 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) { verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) stream.Push(initCommitReq(subscription), initCommitResp(), nil) - stream.Push(commitReq(34), commitResp(1), nil) - barrier := stream.PushWithBarrier(commitReq(56), commitResp(1), nil) + barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil) verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream) mockServer.OnTestStart(verifiers) @@ -340,6 +339,8 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) { t.Errorf("Start() got err: (%v)", gotErr) } + ack1.Ack() + complete := test.NewCondition("blocking reset complete") go func() { if err := cmt.BlockingReset(); err != nil { @@ -350,14 +351,8 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) { }() complete.VerifyNotDone(t) - ack1.Ack() - cmt.SendBatchCommit() - complete.VerifyNotDone(t) - ack2.Ack() - cmt.SendBatchCommit() - - // Until the final commit response is received, committer.BlockingReset should - // not return. + // Until the commit response is received, committer.BlockingReset should not + // return. barrier.ReleaseAfter(func() { complete.VerifyNotDone(t) }) @@ -371,6 +366,9 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) { t.Errorf("ackTracker.Empty() got %v, want %v", got, want) } + // This ack should have been discarded. + ack2.Ack() + // Calling committer.BlockingReset again should immediately return. if err := cmt.BlockingReset(); err != nil { t.Errorf("BlockingReset() got err: (%v), want: ", err) @@ -390,7 +388,7 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) { verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) stream.Push(initCommitReq(subscription), initCommitResp(), nil) - stream.Push(commitReq(34), commitResp(1), nil) + barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil) verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream) mockServer.OnTestStart(verifiers) @@ -401,6 +399,8 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) { t.Errorf("Start() got err: (%v)", gotErr) } + ack1.Ack() + complete := test.NewCondition("blocking reset complete") go func() { if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) { @@ -410,18 +410,11 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) { }() complete.VerifyNotDone(t) - ack1.Ack() - cmt.SendBatchCommit() - complete.VerifyNotDone(t) - // committer.BlockingReset should return when the committer is stopped. - cmt.Stop() - complete.WaitUntilDone(t, serviceTestWaitTimeout) - - // Ack tracker should not be reset. - if got, want := acks.Empty(), false; got != want { - t.Errorf("ackTracker.Empty() got %v, want %v", got, want) - } + barrier.ReleaseAfter(func() { + cmt.Stop() + complete.WaitUntilDone(t, serviceTestWaitTimeout) + }) cmt.Terminate() if gotErr := cmt.FinalError(); gotErr != nil { @@ -452,6 +445,8 @@ func TestCommitterBlockingResetFatalError(t *testing.T) { t.Errorf("Start() got err: (%v)", gotErr) } + ack1.Ack() + complete := test.NewCondition("blocking reset complete") go func() { if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) { @@ -459,10 +454,6 @@ func TestCommitterBlockingResetFatalError(t *testing.T) { } complete.SetDone() }() - complete.VerifyNotDone(t) - - ack1.Ack() - cmt.SendBatchCommit() // committer.BlockingReset should return when the committer terminates due to // fatal server error. diff --git a/pubsublite/internal/wire/flow_control.go b/pubsublite/internal/wire/flow_control.go index 7a0421392e0..b7cb005e65a 100644 --- a/pubsublite/internal/wire/flow_control.go +++ b/pubsublite/internal/wire/flow_control.go @@ -160,11 +160,14 @@ func (ot *subscriberOffsetTracker) Reset() { } // RequestForRestart returns the seek request to send when a new subscribe -// stream reconnects. Returns nil if the subscriber has just started, in which -// case the server returns the offset of the last committed cursor. +// stream reconnects. func (ot *subscriberOffsetTracker) RequestForRestart() *pb.SeekRequest { if ot.minNextOffset <= 0 { - return nil + return &pb.SeekRequest{ + Target: &pb.SeekRequest_NamedTarget_{ + NamedTarget: pb.SeekRequest_COMMITTED_CURSOR, + }, + } } return &pb.SeekRequest{ Target: &pb.SeekRequest_Cursor{ diff --git a/pubsublite/internal/wire/flow_control_test.go b/pubsublite/internal/wire/flow_control_test.go index d070f1fde0b..24f878ce4c0 100644 --- a/pubsublite/internal/wire/flow_control_test.go +++ b/pubsublite/internal/wire/flow_control_test.go @@ -253,7 +253,11 @@ func TestOffsetTrackerRequestForRestart(t *testing.T) { { desc: "Uninitialized tracker", tracker: subscriberOffsetTracker{}, - want: nil, + want: &pb.SeekRequest{ + Target: &pb.SeekRequest_NamedTarget_{ + NamedTarget: pb.SeekRequest_COMMITTED_CURSOR, + }, + }, }, { desc: "Next offset positive", diff --git a/pubsublite/internal/wire/requests_test.go b/pubsublite/internal/wire/requests_test.go index 94944a9066b..d265f7b2db8 100644 --- a/pubsublite/internal/wire/requests_test.go +++ b/pubsublite/internal/wire/requests_test.go @@ -134,33 +134,42 @@ func msgPubResp(cursor int64) *pb.PublishResponse { // SubscriberService -func initSubReq(subscription subscriptionPartition) *pb.SubscribeRequest { +func initSubReqCommit(subscription subscriptionPartition) *pb.SubscribeRequest { return &pb.SubscribeRequest{ Request: &pb.SubscribeRequest_Initial{ Initial: &pb.InitialSubscribeRequest{ Subscription: subscription.Path, Partition: int64(subscription.Partition), + InitialLocation: &pb.SeekRequest{ + Target: &pb.SeekRequest_NamedTarget_{ + NamedTarget: pb.SeekRequest_COMMITTED_CURSOR, + }, + }, }, }, } } -func initSubResp() *pb.SubscribeResponse { - return &pb.SubscribeResponse{ - Response: &pb.SubscribeResponse_Initial{ - Initial: &pb.InitialSubscribeResponse{}, +func initSubReqCursor(subscription subscriptionPartition, offset int64) *pb.SubscribeRequest { + return &pb.SubscribeRequest{ + Request: &pb.SubscribeRequest_Initial{ + Initial: &pb.InitialSubscribeRequest{ + Subscription: subscription.Path, + Partition: int64(subscription.Partition), + InitialLocation: &pb.SeekRequest{ + Target: &pb.SeekRequest_Cursor{ + Cursor: &pb.Cursor{Offset: offset}, + }, + }, + }, }, } } -func seekReq(offset int64) *pb.SubscribeRequest { - return &pb.SubscribeRequest{ - Request: &pb.SubscribeRequest_Seek{ - Seek: &pb.SeekRequest{ - Target: &pb.SeekRequest_Cursor{ - Cursor: &pb.Cursor{Offset: offset}, - }, - }, +func initSubResp() *pb.SubscribeResponse { + return &pb.SubscribeResponse{ + Response: &pb.SubscribeResponse_Initial{ + Initial: &pb.InitialSubscribeResponse{}, }, } } diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 125eeec06be..b6860a6dc21 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -31,8 +31,7 @@ import ( var ( errServerNoMessages = errors.New("pubsublite: server delivered no messages") errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe") - errInvalidSubscribeResponse = errors.New("pubsublite: received invalid subscribe response from server") - errNoInFlightSeek = errors.New("pubsublite: received seek response for no in-flight seek") + errInvalidSubscribeResponse = errors.New("pubsublite: received unexpected subscribe response from server") ) // ReceivedMessage stores a received Pub/Sub message and AckConsumer for @@ -126,6 +125,10 @@ func (mq *messageDeliveryQueue) deliverMessages(messagesC chan *ReceivedMessage, // The frequency of sending batch flow control requests. const batchFlowControlPeriod = 100 * time.Millisecond +// Handles subscriber reset actions that are external to the subscribeStream +// (e.g. wait for the committer to flush commits). +type subscriberResetHandler func() error + // subscribeStream directly wraps the subscribe client stream. It passes // messages to the message receiver and manages flow control. Flow control // tokens are batched and sent to the stream via a periodic background task, @@ -137,7 +140,7 @@ type subscribeStream struct { subClient *vkit.SubscriberClient settings ReceiveSettings subscription subscriptionPartition - initialReq *pb.SubscribeRequest + handleReset subscriberResetHandler metadata pubsubMetadata // Fields below must be guarded with mu. @@ -146,27 +149,20 @@ type subscribeStream struct { offsetTracker subscriberOffsetTracker flowControl flowControlBatcher pollFlowControl *periodicTask - seekInFlight bool enableBatchFlowControl bool abstractService } func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings, - receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream { + receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, + handleReset subscriberResetHandler, disableTasks bool) *subscribeStream { s := &subscribeStream{ subClient: subClient, settings: settings, subscription: subscription, - initialReq: &pb.SubscribeRequest{ - Request: &pb.SubscribeRequest_Initial{ - Initial: &pb.InitialSubscribeRequest{ - Subscription: subscription.Path, - Partition: int64(subscription.Partition), - }, - }, - }, + handleReset: handleReset, messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages), metadata: newPubsubMetadata(), } @@ -212,7 +208,18 @@ func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, err } func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) { - return s.initialReq, initialResponseRequired(true) + s.mu.Lock() + defer s.mu.Unlock() + initReq := &pb.SubscribeRequest{ + Request: &pb.SubscribeRequest_Initial{ + Initial: &pb.InitialSubscribeRequest{ + Subscription: s.subscription.Path, + Partition: int64(s.subscription.Partition), + InitialLocation: s.offsetTracker.RequestForRestart(), + }, + }, + } + return initReq, initialResponseRequired(true) } func (s *subscribeStream) validateInitialResponse(response interface{}) error { @@ -231,27 +238,43 @@ func (s *subscribeStream) onStreamStatusChange(status streamStatus) { case streamConnected: s.unsafeUpdateStatus(serviceActive, nil) - // Reinitialize the offset and flow control tokens when a new subscribe - // stream instance is connected. - if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil { - if !s.stream.Send(&pb.SubscribeRequest{ - Request: &pb.SubscribeRequest_Seek{Seek: seekReq}, - }) { - return - } - s.seekInFlight = true - } + // Reinitialize the flow control tokens when a new subscribe stream instance + // is connected. s.unsafeSendFlowControl(s.flowControl.RequestForRestart()) s.enableBatchFlowControl = true s.pollFlowControl.Start() case streamReconnecting: - s.seekInFlight = false // Ensure no batch flow control tokens are sent until the RequestForRestart // is sent above when a new subscribe stream is initialized. s.enableBatchFlowControl = false s.pollFlowControl.Stop() + case streamResetState: + // Handle out-of-band seek notifications from the server. Committer and + // subscriber state are reset. + + s.messageQueue.Stop() + + // Wait for all message receiver callbacks to finish and the committer to + // flush pending commits and reset its state. Release the mutex while + // waiting. + s.mu.Unlock() + s.messageQueue.Wait() + err := s.handleReset() + s.mu.Lock() + + if err != nil { + s.unsafeInitiateShutdown(serviceTerminating, nil) + return + } + s.messageQueue.Start() + s.offsetTracker.Reset() + s.flowControl.Reset(flowControlTokens{ + Bytes: int64(s.settings.MaxOutstandingBytes), + Messages: int64(s.settings.MaxOutstandingMessages), + }) + case streamTerminated: s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error()) } @@ -270,8 +293,6 @@ func (s *subscribeStream) onResponse(response interface{}) { switch { case subscribeResponse.GetMessages() != nil: err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages()) - case subscribeResponse.GetSeek() != nil: - err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek()) default: err = errInvalidSubscribeResponse } @@ -280,14 +301,6 @@ func (s *subscribeStream) onResponse(response interface{}) { } } -func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error { - if !s.seekInFlight { - return errNoInFlightSeek - } - s.seekInFlight = false - return nil -} - func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) error { if len(response.Messages) == 0 { return errServerNoMessages @@ -388,7 +401,7 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition} acks := newAckTracker() commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks) - sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks) + sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, commit.BlockingReset, f.disableTasks) ps := &singlePartitionSubscriber{ subscriber: sub, committer: commit, diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index a45bb281a4c..0c30fdbee2c 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -16,6 +16,7 @@ package wire import ( "context" "sort" + "sync" "testing" "time" @@ -244,11 +245,14 @@ func TestMessageDeliveryQueueDiscardMessages(t *testing.T) { type testSubscribeStream struct { Receiver *testMessageReceiver t *testing.T + acks *ackTracker sub *subscribeStream + mu sync.Mutex + resetErr error serviceTestProxy } -func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings, acks *ackTracker) *testSubscribeStream { +func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings) *testSubscribeStream { ctx := context.Background() subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) if err != nil { @@ -258,8 +262,9 @@ func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, se ts := &testSubscribeStream{ Receiver: newTestMessageReceiver(t), t: t, + acks: newAckTracker(), } - ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, acks, true) + ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, ts.acks, ts.handleReset, true) ts.initAndStart(t, ts.sub, "Subscriber", subClient) return ts } @@ -276,9 +281,20 @@ func (ts *testSubscribeStream) PendingFlowControlRequest() *pb.FlowControlReques return ts.sub.flowControl.pendingTokens.ToFlowControlRequest() } +func (ts *testSubscribeStream) SetResetErr(err error) { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.resetErr = err +} + +func (ts *testSubscribeStream) handleReset() error { + ts.mu.Lock() + defer ts.mu.Unlock() + return ts.resetErr +} + func TestSubscribeStreamReconnect(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() msg1 := seqMsgWithOffsetAndSize(67, 200) msg2 := seqMsgWithOffsetAndSize(68, 100) permanentErr := status.Error(codes.FailedPrecondition, "permanent failure") @@ -286,16 +302,15 @@ func TestSubscribeStreamReconnect(t *testing.T) { verifiers := test.NewVerifiers(t) stream1 := test.NewRPCVerifier(t) - stream1.Push(initSubReq(subscription), initSubResp(), nil) + stream1.Push(initSubReqCommit(subscription), initSubResp(), nil) stream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) stream1.Push(nil, nil, status.Error(codes.Unavailable, "server unavailable")) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream1) - // When reconnected, the subscribeStream should seek to msg2 and have - // subtracted flow control tokens. + // When reconnected, the subscribeStream should set initial cursor to msg2 and + // have subtracted flow control tokens. stream2 := test.NewRPCVerifier(t) - stream2.Push(initSubReq(subscription), initSubResp(), nil) - stream2.Push(seekReq(68), seekResp(68), nil) + stream2.Push(initSubReqCursor(subscription, 68), initSubResp(), nil) stream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 9}), msgSubResp(msg2), nil) // Subscriber should terminate on permanent error. stream2.Push(nil, nil, permanentErr) @@ -304,7 +319,7 @@ func TestSubscribeStreamReconnect(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) if gotErr := sub.StartError(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) } @@ -317,14 +332,13 @@ func TestSubscribeStreamReconnect(t *testing.T) { func TestSubscribeStreamFlowControlBatching(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() msg1 := seqMsgWithOffsetAndSize(67, 200) msg2 := seqMsgWithOffsetAndSize(68, 100) serverErr := status.Error(codes.InvalidArgument, "verifies flow control received") verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) - stream.Push(initSubReq(subscription), initSubResp(), nil) + stream.Push(initSubReqCommit(subscription), initSubResp(), nil) stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) // Batch flow control request expected. stream.Push(flowControlSubReq(flowControlTokens{Bytes: 300, Messages: 2}), nil, serverErr) @@ -333,7 +347,7 @@ func TestSubscribeStreamFlowControlBatching(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) if gotErr := sub.StartError(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) } @@ -349,7 +363,6 @@ func TestSubscribeStreamFlowControlBatching(t *testing.T) { func TestSubscribeStreamExpediteFlowControl(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() msg1 := seqMsgWithOffsetAndSize(67, 250) // MaxOutstandingBytes = 1000, so msg2 pushes the pending flow control bytes // over the expediteBatchRequestRatio=50% threshold in flowControlBatcher. @@ -358,7 +371,7 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) { verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) - stream.Push(initSubReq(subscription), initSubResp(), nil) + stream.Push(initSubReqCommit(subscription), initSubResp(), nil) stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) // Batch flow control request expected. stream.Push(flowControlSubReq(flowControlTokens{Bytes: 501, Messages: 2}), nil, serverErr) @@ -367,7 +380,7 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) if gotErr := sub.StartError(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) } @@ -383,7 +396,6 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) { func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() // MaxOutstandingBytes = 1000, so this pushes the pending flow control bytes // over the expediteBatchRequestRatio=50% threshold in flowControlBatcher. msg := seqMsgWithOffsetAndSize(67, 800) @@ -393,7 +405,7 @@ func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) { verifiers := test.NewVerifiers(t) stream1 := test.NewRPCVerifier(t) - stream1.Push(initSubReq(subscription), initSubResp(), nil) + stream1.Push(initSubReqCommit(subscription), initSubResp(), nil) stream1.Push(initFlowControlReq(), msgSubResp(msg), nil) // Break the stream immediately after sending the message. stream1.Push(nil, nil, retryableErr) @@ -401,8 +413,7 @@ func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) { stream2 := test.NewRPCVerifier(t) // The barrier is used to pause in the middle of stream reconnection. - barrier := stream2.PushWithBarrier(initSubReq(subscription), initSubResp(), nil) - stream2.Push(seekReq(68), seekResp(68), nil) + barrier := stream2.PushWithBarrier(initSubReqCursor(subscription, 68), initSubResp(), nil) // Full flow control tokens should be sent after stream has connected. stream2.Push(initFlowControlReq(), nil, serverErr) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream2) @@ -410,7 +421,7 @@ func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) if gotErr := sub.StartError(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) } @@ -432,17 +443,16 @@ func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) { func TestSubscribeStreamInvalidInitialResponse(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) - stream.Push(initSubReq(subscription), seekResp(0), nil) // Seek instead of init response + stream.Push(initSubReqCommit(subscription), seekResp(0), nil) // Seek instead of init response verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) if gotErr, wantErr := sub.StartError(), errInvalidInitialSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) { t.Errorf("Start got err: (%v), want: (%v)", gotErr, wantErr) } @@ -450,18 +460,17 @@ func TestSubscribeStreamInvalidInitialResponse(t *testing.T) { func TestSubscribeStreamDuplicateInitialResponse(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) - stream.Push(initSubReq(subscription), initSubResp(), nil) + stream.Push(initSubReqCommit(subscription), initSubResp(), nil) stream.Push(initFlowControlReq(), initSubResp(), nil) // Second initial response verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) if gotErr, wantErr := sub.FinalError(), errInvalidSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) { t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) } @@ -469,37 +478,35 @@ func TestSubscribeStreamDuplicateInitialResponse(t *testing.T) { func TestSubscribeStreamSpuriousSeekResponse(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) - stream.Push(initSubReq(subscription), initSubResp(), nil) + stream.Push(initSubReqCommit(subscription), initSubResp(), nil) stream.Push(initFlowControlReq(), seekResp(1), nil) // Seek response with no seek request verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) - if gotErr, wantErr := sub.FinalError(), errNoInFlightSeek; !test.ErrorEqual(gotErr, wantErr) { + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) + if gotErr, wantErr := sub.FinalError(), errInvalidSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) { t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) } } func TestSubscribeStreamNoMessages(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) - stream.Push(initSubReq(subscription), initSubResp(), nil) + stream.Push(initSubReqCommit(subscription), initSubResp(), nil) stream.Push(initFlowControlReq(), msgSubResp(), nil) // No messages in response verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) if gotErr, wantErr := sub.FinalError(), errServerNoMessages; !test.ErrorEqual(gotErr, wantErr) { t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) } @@ -507,13 +514,12 @@ func TestSubscribeStreamNoMessages(t *testing.T) { func TestSubscribeStreamMessagesOutOfOrder(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() msg1 := seqMsgWithOffsetAndSize(56, 100) msg2 := seqMsgWithOffsetAndSize(55, 100) // Offset before msg1 verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) - stream.Push(initSubReq(subscription), initSubResp(), nil) + stream.Push(initSubReqCommit(subscription), initSubResp(), nil) stream.Push(initFlowControlReq(), msgSubResp(msg1), nil) stream.Push(nil, msgSubResp(msg2), nil) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) @@ -521,7 +527,7 @@ func TestSubscribeStreamMessagesOutOfOrder(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) sub.Receiver.ValidateMsg(msg1) if gotErr, msg := sub.FinalError(), "start offset = 55, expected >= 57"; !test.ErrorHasMsg(gotErr, msg) { t.Errorf("Final err: (%v), want msg: %q", gotErr, msg) @@ -530,13 +536,12 @@ func TestSubscribeStreamMessagesOutOfOrder(t *testing.T) { func TestSubscribeStreamFlowControlOverflow(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} - acks := newAckTracker() msg1 := seqMsgWithOffsetAndSize(56, 900) msg2 := seqMsgWithOffsetAndSize(57, 101) // Overflows ReceiveSettings.MaxOutstandingBytes = 1000 verifiers := test.NewVerifiers(t) stream := test.NewRPCVerifier(t) - stream.Push(initSubReq(subscription), initSubResp(), nil) + stream.Push(initSubReqCommit(subscription), initSubResp(), nil) stream.Push(initFlowControlReq(), msgSubResp(msg1), nil) stream.Push(nil, msgSubResp(msg2), nil) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) @@ -544,13 +549,40 @@ func TestSubscribeStreamFlowControlOverflow(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) sub.Receiver.ValidateMsg(msg1) if gotErr, wantErr := sub.FinalError(), errTokenCounterBytesNegative; !test.ErrorEqual(gotErr, wantErr) { t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) } } +func TestSubscribeStreamHandleResetError(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} + msg := seqMsgWithOffsetAndSize(67, 100) + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initSubReqCommit(subscription), initSubResp(), nil) + stream.Push(initFlowControlReq(), msgSubResp(msg), nil) + barrier := stream.PushWithBarrier(nil, nil, makeStreamResetSignal()) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) + // No reconnect expected because the reset handler func will fail. + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + sub := newTestSubscribeStream(t, subscription, testSubscriberSettings()) + sub.SetResetErr(status.Error(codes.FailedPrecondition, "reset handler failed")) + if gotErr := sub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + sub.Receiver.ValidateMsg(msg) + barrier.Release() + if gotErr := sub.FinalError(); gotErr != nil { + t.Errorf("Final err: (%v), want: ", gotErr) + } +} + type testSinglePartitionSubscriber singlePartitionSubscriber func (t *testSinglePartitionSubscriber) WaitStopped() error { @@ -595,7 +627,7 @@ func TestSinglePartitionSubscriberStartStop(t *testing.T) { // Verifies the behavior of the subscribeStream and committer when they are // stopped before any messages are received. subStream := test.NewRPCVerifier(t) - subStream.Push(initSubReq(subscription), initSubResp(), nil) + subStream.Push(initSubReqCommit(subscription), initSubResp(), nil) barrier := subStream.PushWithBarrier(initFlowControlReq(), nil, nil) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream) @@ -626,7 +658,7 @@ func TestSinglePartitionSubscriberSimpleMsgAck(t *testing.T) { verifiers := test.NewVerifiers(t) subStream := test.NewRPCVerifier(t) - subStream.Push(initSubReq(subscription), initSubResp(), nil) + subStream.Push(initSubReqCommit(subscription), initSubResp(), nil) subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream) @@ -661,17 +693,16 @@ func TestSinglePartitionSubscriberMessageQueue(t *testing.T) { verifiers := test.NewVerifiers(t) subStream1 := test.NewRPCVerifier(t) - subStream1.Push(initSubReq(subscription), initSubResp(), nil) + subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil) subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) subStream1.Push(nil, msgSubResp(msg2), nil) subStream1.Push(nil, nil, retryableErr) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1) - // When reconnected, the subscribeStream should seek to msg3 and have - // subtracted flow control tokens for msg1 and msg2. + // When reconnected, the subscribeStream should set initial cursor to msg3 and + // have subtracted flow control tokens for msg1 and msg2. subStream2 := test.NewRPCVerifier(t) - subStream2.Push(initSubReq(subscription), initSubResp(), nil) - subStream2.Push(seekReq(3), nil, nil) + subStream2.Push(initSubReqCursor(subscription, 3), initSubResp(), nil) subStream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 8}), msgSubResp(msg3), nil) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2) @@ -719,7 +750,7 @@ func TestSinglePartitionSubscriberStopDuringReceive(t *testing.T) { verifiers := test.NewVerifiers(t) subStream := test.NewRPCVerifier(t) - subStream.Push(initSubReq(subscription), initSubResp(), nil) + subStream.Push(initSubReqCommit(subscription), initSubResp(), nil) subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream) @@ -748,6 +779,156 @@ func TestSinglePartitionSubscriberStopDuringReceive(t *testing.T) { receiver.VerifyNoMsgs() // msg2 should not be received } +func TestSinglePartitionSubscriberAdminSeekWhileConnected(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} + receiver := newTestMessageReceiver(t) + msg1 := seqMsgWithOffsetAndSize(1, 100) + msg2 := seqMsgWithOffsetAndSize(2, 100) + msg3 := seqMsgWithOffsetAndSize(3, 100) + + verifiers := test.NewVerifiers(t) + + subStream1 := test.NewRPCVerifier(t) + subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil) + subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2, msg3), nil) + // Server disconnects the stream with the RESET signal. + barrier := subStream1.PushWithBarrier(nil, nil, makeStreamResetSignal()) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1) + + subStream2 := test.NewRPCVerifier(t) + // Reconnected stream reads from commit cursor. + subStream2.Push(initSubReqCommit(subscription), initSubResp(), nil) + // Ensure that the subscriber resets state and can handle seeking back to + // msg1. + subStream2.Push(initFlowControlReq(), msgSubResp(msg1), nil) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2) + + cmtStream := test.NewRPCVerifier(t) + cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil) + cmtStream.Push(commitReq(4), commitResp(1), nil) + cmtStream.Push(commitReq(2), commitResp(1), nil) + verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription) + if gotErr := sub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + receiver.ValidateMsg(msg1).Ack() + receiver.ValidateMsg(msg2).Ack() + receiver.ValidateMsg(msg3).Ack() + barrier.Release() + receiver.ValidateMsg(msg1).Ack() + + sub.Stop() + if gotErr := sub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } +} + +func TestSinglePartitionSubscriberAdminSeekWhileReconnecting(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} + receiver := newTestMessageReceiver(t) + msg1 := seqMsgWithOffsetAndSize(1, 100) + msg2 := seqMsgWithOffsetAndSize(2, 100) + msg3 := seqMsgWithOffsetAndSize(3, 100) + + verifiers := test.NewVerifiers(t) + + subStream1 := test.NewRPCVerifier(t) + subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil) + subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2, msg3), nil) + // Normal stream breakage. + barrier := subStream1.PushWithBarrier(nil, nil, status.Error(codes.DeadlineExceeded, "")) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1) + + subStream2 := test.NewRPCVerifier(t) + // The server sends the RESET signal during stream initialization. + subStream2.Push(initSubReqCursor(subscription, 4), nil, makeStreamResetSignal()) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2) + + subStream3 := test.NewRPCVerifier(t) + // Reconnected stream reads from commit cursor. + subStream3.Push(initSubReqCommit(subscription), initSubResp(), nil) + // Ensure that the subscriber resets state and can handle seeking back to + // msg1. + subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream3) + + cmtStream := test.NewRPCVerifier(t) + cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil) + cmtStream.Push(commitReq(3), commitResp(1), nil) + cmtStream.Push(commitReq(2), commitResp(1), nil) + verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription) + if gotErr := sub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + receiver.ValidateMsg(msg1).Ack() + receiver.ValidateMsg(msg2).Ack() + ack := receiver.ValidateMsg(msg3) // Unacked message discarded + barrier.Release() + receiver.ValidateMsg(msg1).Ack() + ack.Ack() // Should be ignored + + sub.Stop() + if gotErr := sub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } +} + +func TestSinglePartitionSubscriberStopDuringAdminSeek(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} + receiver := newTestMessageReceiver(t) + msg1 := seqMsgWithOffsetAndSize(1, 100) + msg2 := seqMsgWithOffsetAndSize(2, 100) + + verifiers := test.NewVerifiers(t) + + subStream := test.NewRPCVerifier(t) + subStream.Push(initSubReqCommit(subscription), initSubResp(), nil) + subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) + // Server disconnects the stream with the RESET signal. + subBarrier := subStream.PushWithBarrier(nil, nil, makeStreamResetSignal()) + verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream) + + cmtStream := test.NewRPCVerifier(t) + cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil) + cmtBarrier := cmtStream.PushWithBarrier(commitReq(3), commitResp(1), nil) + verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription) + if gotErr := sub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + receiver.ValidateMsg(msg1).Ack() + receiver.ValidateMsg(msg2).Ack() + subBarrier.Release() + + // Ensure that the user is able to call Stop while a reset is in progress. + // Verifies that the subscribeStream is not holding mutexes while waiting and + // that the subscribe stream is not reconnected. + cmtBarrier.ReleaseAfter(func() { + sub.Stop() + }) + + if gotErr := sub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } +} + func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string, partitions []int) *multiPartitionSubscriber { ctx := context.Background() subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) @@ -787,7 +968,7 @@ func TestMultiPartitionSubscriberMultipleMessages(t *testing.T) { // Partition 1 subStream1 := test.NewRPCVerifier(t) - subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) + subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) subStream1.Push(nil, msgSubResp(msg2), nil) verifiers.AddSubscribeStream(subscription, 1, subStream1) @@ -799,7 +980,7 @@ func TestMultiPartitionSubscriberMultipleMessages(t *testing.T) { // Partition 2 subStream2 := test.NewRPCVerifier(t) - subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) + subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil) subStream2.Push(nil, msgSubResp(msg4), nil) verifiers.AddSubscribeStream(subscription, 2, subStream2) @@ -835,7 +1016,7 @@ func TestMultiPartitionSubscriberPermanentError(t *testing.T) { // Partition 1 subStream1 := test.NewRPCVerifier(t) - subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) + subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) msg2Barrier := subStream1.PushWithBarrier(nil, msgSubResp(msg2), nil) verifiers.AddSubscribeStream(subscription, 1, subStream1) @@ -847,7 +1028,7 @@ func TestMultiPartitionSubscriberPermanentError(t *testing.T) { // Partition 2 subStream2 := test.NewRPCVerifier(t) - subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) + subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil) errorBarrier := subStream2.PushWithBarrier(nil, nil, serverErr) verifiers.AddSubscribeStream(subscription, 2, subStream2) @@ -960,7 +1141,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { // Partition 3 subStream3 := test.NewRPCVerifier(t) - subStream3.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil) + subStream3.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil) subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil) msg2Barrier := subStream3.PushWithBarrier(nil, msgSubResp(msg2), nil) verifiers.AddSubscribeStream(subscription, 3, subStream3) @@ -973,7 +1154,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { // Partition 6 subStream6 := test.NewRPCVerifier(t) - subStream6.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil) + subStream6.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil) subStream6.Push(initFlowControlReq(), msgSubResp(msg3), nil) // msg4 should not be received. msg4Barrier := subStream6.PushWithBarrier(nil, msgSubResp(msg4), nil) @@ -986,7 +1167,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { // Partition 8 subStream8 := test.NewRPCVerifier(t) - subStream8.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil) + subStream8.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil) subStream8.Push(initFlowControlReq(), msgSubResp(msg5), nil) verifiers.AddSubscribeStream(subscription, 8, subStream8) @@ -1047,7 +1228,7 @@ func TestAssigningSubscriberPermanentError(t *testing.T) { // Partition 1 subStream1 := test.NewRPCVerifier(t) - subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) + subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) verifiers.AddSubscribeStream(subscription, 1, subStream1) @@ -1058,7 +1239,7 @@ func TestAssigningSubscriberPermanentError(t *testing.T) { // Partition 2 subStream2 := test.NewRPCVerifier(t) - subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) + subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil) verifiers.AddSubscribeStream(subscription, 2, subStream2) @@ -1101,7 +1282,7 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { // Partition 1 subStream := test.NewRPCVerifier(t) - subStream.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) + subStream.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) verifiers.AddSubscribeStream(subscription, 1, subStream)