Skip to content

Commit

Permalink
fix(pubsublite): ignore outstanding acks for unassigned partition sub…
Browse files Browse the repository at this point in the history
…scribers (#3597)

When the assigningSubscriber removes a singlePartitionSubscriber, it should ignore unacked messages. This avoids conflicting with the commits from the new subscriber that will be assigned the partition.
  • Loading branch information
tmdiep committed Jan 28, 2021
1 parent e7ab014 commit eb91f1f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -469,6 +469,10 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
// Handle removed partitions.
for partition, subscriber := range as.subscribers {
if !partitions.Contains(partition) {
// Ignore unacked messages from this point on to avoid conflicting with
// the commits of the new subscriber that will be assigned this partition.
subscriber.Terminate()

as.unsafeRemoveService(subscriber)
// Safe to delete map entry during range loop:
// https://golang.org/ref/spec#For_statements
Expand Down
61 changes: 61 additions & 0 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -955,6 +955,67 @@ func TestAssigningSubscriberPermanentError(t *testing.T) {
}
}

func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
receiver := newTestMessageReceiver(t)
msg1 := seqMsgWithOffsetAndSize(11, 100)
msg2 := seqMsgWithOffsetAndSize(22, 200)

verifiers := test.NewVerifiers(t)

// Assignment stream
asnStream := test.NewRPCVerifier(t)
asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil)
assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil)
asnStream.Push(assignmentAckReq(), nil, nil)
verifiers.AddAssignmentStream(subscription, asnStream)

// Partition 1
subStream1 := test.NewRPCVerifier(t)
subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream1)

cmtStream1 := test.NewRPCVerifier(t)
cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
// Note: no commit expected.
verifiers.AddCommitStream(subscription, 1, cmtStream1)

// Partition 2
subStream2 := test.NewRPCVerifier(t)
subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription, 2, subStream2)

cmtStream2 := test.NewRPCVerifier(t)
cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil)
cmtStream2.Push(commitReq(23), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 2, cmtStream2)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription)
if gotErr := sub.WaitStarted(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}

// Partition assignments are initially {1}.
ack1 := receiver.ValidateMsg(msg1)

// Partition assignments will now be {2}.
assignmentBarrier.Release()
receiver.ValidateMsg(msg2).Ack()

// Partition 1 has already been unassigned, so this ack is discarded.
ack1.Ack()

sub.Stop()
if gotErr := sub.WaitStopped(); gotErr != nil {
t.Errorf("Stop() got err: (%v)", gotErr)
}
}

func TestNewSubscriberCreatesCorrectImpl(t *testing.T) {
const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
const region = "us-central1"
Expand Down

0 comments on commit eb91f1f

Please sign in to comment.