diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 4da86acb7d2..d75b02df5c2 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -469,6 +469,10 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error { // Handle removed partitions. for partition, subscriber := range as.subscribers { if !partitions.Contains(partition) { + // Ignore unacked messages from this point on to avoid conflicting with + // the commits of the new subscriber that will be assigned this partition. + subscriber.Terminate() + as.unsafeRemoveService(subscriber) // Safe to delete map entry during range loop: // https://golang.org/ref/spec#For_statements diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index 5ebed5c1c82..0e9633d4873 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -955,6 +955,67 @@ func TestAssigningSubscriberPermanentError(t *testing.T) { } } +func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { + const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" + receiver := newTestMessageReceiver(t) + msg1 := seqMsgWithOffsetAndSize(11, 100) + msg2 := seqMsgWithOffsetAndSize(22, 200) + + verifiers := test.NewVerifiers(t) + + // Assignment stream + asnStream := test.NewRPCVerifier(t) + asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil) + assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil) + asnStream.Push(assignmentAckReq(), nil, nil) + verifiers.AddAssignmentStream(subscription, asnStream) + + // Partition 1 + subStream1 := test.NewRPCVerifier(t) + subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) + subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) + verifiers.AddSubscribeStream(subscription, 1, subStream1) + + cmtStream1 := test.NewRPCVerifier(t) + cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) + // Note: no commit expected. + verifiers.AddCommitStream(subscription, 1, cmtStream1) + + // Partition 2 + subStream2 := test.NewRPCVerifier(t) + subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) + subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil) + verifiers.AddSubscribeStream(subscription, 2, subStream2) + + cmtStream2 := test.NewRPCVerifier(t) + cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil) + cmtStream2.Push(commitReq(23), commitResp(1), nil) + verifiers.AddCommitStream(subscription, 2, cmtStream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription) + if gotErr := sub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + // Partition assignments are initially {1}. + ack1 := receiver.ValidateMsg(msg1) + + // Partition assignments will now be {2}. + assignmentBarrier.Release() + receiver.ValidateMsg(msg2).Ack() + + // Partition 1 has already been unassigned, so this ack is discarded. + ack1.Ack() + + sub.Stop() + if gotErr := sub.WaitStopped(); gotErr != nil { + t.Errorf("Stop() got err: (%v)", gotErr) + } +} + func TestNewSubscriberCreatesCorrectImpl(t *testing.T) { const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" const region = "us-central1"