diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go index e5dcc415158..ea4be1b7e43 100644 --- a/pubsublite/internal/wire/assigner.go +++ b/pubsublite/internal/wire/assigner.go @@ -139,30 +139,23 @@ func (a *assigner) onStreamStatusChange(status streamStatus) { } func (a *assigner) onResponse(response interface{}) { + assignment, _ := response.(*pb.PartitionAssignment) + err := a.receiveAssignment(newPartitionSet(assignment)) + a.mu.Lock() defer a.mu.Unlock() - if a.status >= serviceTerminating { return } - - assignment, _ := response.(*pb.PartitionAssignment) - if err := a.handleAssignment(assignment); err != nil { + if err != nil { a.unsafeInitiateShutdown(serviceTerminated, err) + return } -} - -func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error { - if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil { - return err - } - a.stream.Send(&pb.PartitionAssignmentRequest{ Request: &pb.PartitionAssignmentRequest_Ack{ Ack: &pb.PartitionAssignmentAck{}, }, }) - return nil } func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index d7b793688df..125eeec06be 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -465,6 +465,21 @@ func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.Partit } func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error { + removedSubscribers, err := as.doHandleAssignment(partitions) + if err != nil { + return err + } + + // Wait for removed subscribers to completely stop (which waits for commit + // acknowledgments from the server) before acking the assignment. This avoids + // commits racing with the new assigned client. + for _, subscriber := range removedSubscribers { + subscriber.WaitStopped() + } + return nil +} + +func (as *assigningSubscriber) doHandleAssignment(partitions partitionSet) ([]*singlePartitionSubscriber, error) { as.mu.Lock() defer as.mu.Unlock() @@ -474,18 +489,20 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error { subscriber := as.subFactory.New(partition) if err := as.unsafeAddServices(subscriber); err != nil { // Occurs when the assigningSubscriber is stopping/stopped. - return err + return nil, err } as.subscribers[partition] = subscriber } } // Handle removed partitions. + var removedSubscribers []*singlePartitionSubscriber for partition, subscriber := range as.subscribers { if !partitions.Contains(partition) { // Ignore unacked messages from this point on to avoid conflicting with // the commits of the new subscriber that will be assigned this partition. subscriber.Terminate() + removedSubscribers = append(removedSubscribers, subscriber) as.unsafeRemoveService(subscriber) // Safe to delete map entry during range loop: @@ -493,7 +510,7 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error { delete(as.subscribers, partition) } } - return nil + return removedSubscribers, nil } // Terminate shuts down all singlePartitionSubscribers without waiting for diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index 495e7edcc2a..a45bb281a4c 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -887,6 +887,17 @@ func (as *assigningSubscriber) Partitions() []int { return partitions } +func (as *assigningSubscriber) Subscribers() []*singlePartitionSubscriber { + as.mu.Lock() + defer as.mu.Unlock() + + var subscribers []*singlePartitionSubscriber + for _, s := range as.subscribers { + subscribers = append(subscribers, s) + } + return subscribers +} + func (as *assigningSubscriber) FlushCommits() { as.mu.Lock() defer as.mu.Unlock() @@ -1110,10 +1121,20 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { // Partition assignments are initially {1}. receiver.ValidateMsg(msg1).Ack() ack2 := receiver.ValidateMsg(msg2) + subscribers := sub.Subscribers() // Partition assignments will now be {}. assignmentBarrier1.Release() - assignmentBarrier2.Release() // Wait for ack to ensure the test is deterministic + assignmentBarrier2.ReleaseAfter(func() { + // Verify that the assignment is acked after the subscriber has terminated. + if got, want := len(subscribers), 1; got != want { + t.Errorf("singlePartitionSubcriber count: got %d, want %d", got, want) + return + } + if got, want := subscribers[0].Status(), serviceTerminated; got != want { + t.Errorf("singlePartitionSubcriber status: got %v, want %v", got, want) + } + }) // Partition 1 has already been unassigned, so this ack is discarded. ack2.Ack()