Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): ack assignment after removed subscribers have terminated #4217

Merged
merged 3 commits into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 5 additions & 12 deletions pubsublite/internal/wire/assigner.go
Expand Up @@ -139,30 +139,23 @@ func (a *assigner) onStreamStatusChange(status streamStatus) {
}

func (a *assigner) onResponse(response interface{}) {
assignment, _ := response.(*pb.PartitionAssignment)
err := a.receiveAssignment(newPartitionSet(assignment))

a.mu.Lock()
defer a.mu.Unlock()

if a.status >= serviceTerminating {
return
}

assignment, _ := response.(*pb.PartitionAssignment)
if err := a.handleAssignment(assignment); err != nil {
if err != nil {
a.unsafeInitiateShutdown(serviceTerminated, err)
return
}
}

func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error {
if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil {
return err
}

a.stream.Send(&pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Ack{
Ack: &pb.PartitionAssignmentAck{},
},
})
return nil
}

func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
Expand Down
21 changes: 19 additions & 2 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -465,6 +465,21 @@ func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.Partit
}

func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
removedSubscribers, err := as.doHandleAssignment(partitions)
if err != nil {
return err
}

// Wait for removed subscribers to completely stop (which waits for commit
// acknowledgments from the server) before acking the assignment. This avoids
// commits racing with the new assigned client.
for _, subscriber := range removedSubscribers {
subscriber.WaitStopped()
}
return nil
}

func (as *assigningSubscriber) doHandleAssignment(partitions partitionSet) ([]*singlePartitionSubscriber, error) {
as.mu.Lock()
defer as.mu.Unlock()

Expand All @@ -474,26 +489,28 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
subscriber := as.subFactory.New(partition)
if err := as.unsafeAddServices(subscriber); err != nil {
// Occurs when the assigningSubscriber is stopping/stopped.
return err
return nil, err
}
as.subscribers[partition] = subscriber
}
}

// Handle removed partitions.
var removedSubscribers []*singlePartitionSubscriber
for partition, subscriber := range as.subscribers {
if !partitions.Contains(partition) {
// Ignore unacked messages from this point on to avoid conflicting with
// the commits of the new subscriber that will be assigned this partition.
subscriber.Terminate()
removedSubscribers = append(removedSubscribers, subscriber)

as.unsafeRemoveService(subscriber)
// Safe to delete map entry during range loop:
// https://golang.org/ref/spec#For_statements
delete(as.subscribers, partition)
}
}
return nil
return removedSubscribers, nil
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
Expand Down
23 changes: 22 additions & 1 deletion pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -887,6 +887,17 @@ func (as *assigningSubscriber) Partitions() []int {
return partitions
}

func (as *assigningSubscriber) Subscribers() []*singlePartitionSubscriber {
as.mu.Lock()
defer as.mu.Unlock()

var subscribers []*singlePartitionSubscriber
for _, s := range as.subscribers {
subscribers = append(subscribers, s)
}
return subscribers
}

func (as *assigningSubscriber) FlushCommits() {
as.mu.Lock()
defer as.mu.Unlock()
Expand Down Expand Up @@ -1110,10 +1121,20 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
// Partition assignments are initially {1}.
receiver.ValidateMsg(msg1).Ack()
ack2 := receiver.ValidateMsg(msg2)
subscribers := sub.Subscribers()

// Partition assignments will now be {}.
assignmentBarrier1.Release()
assignmentBarrier2.Release() // Wait for ack to ensure the test is deterministic
assignmentBarrier2.ReleaseAfter(func() {
// Verify that the assignment is acked after the subscriber has terminated.
if got, want := len(subscribers), 1; got != want {
t.Errorf("singlePartitionSubcriber count: got %d, want %d", got, want)
return
}
if got, want := subscribers[0].Status(), serviceTerminated; got != want {
t.Errorf("singlePartitionSubcriber status: got %v, want %v", got, want)
}
})

// Partition 1 has already been unassigned, so this ack is discarded.
ack2.Ack()
Expand Down