Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(pubsublite): notify subscriber clients on partition reassignment (
…#4777)

Adds ReceiveSettings.ReassignmentHandler for the SubscriberClient to receive notifications when the server sends a new partition reassignment to the client.
  • Loading branch information
tmdiep committed Oct 1, 2021
1 parent 320f497 commit 393b0a3
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 79 deletions.
41 changes: 30 additions & 11 deletions pubsublite/internal/wire/assigner.go
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"reflect"
"sort"

"github.com/google/uuid"
"google.golang.org/grpc"
Expand All @@ -26,26 +27,45 @@ import (
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// partitionSet is a set of partition numbers.
type partitionSet map[int]struct{}
// PartitionSet is a set of partition numbers.
type PartitionSet map[int]struct{}

func newPartitionSet(assignmentpb *pb.PartitionAssignment) partitionSet {
// NewPartitionSet creates a partition set initialized from the given partition
// numbers.
func NewPartitionSet(partitions []int) PartitionSet {
var void struct{}
partitions := make(map[int]struct{})
partitionSet := make(map[int]struct{})
for _, p := range partitions {
partitionSet[p] = void
}
return partitionSet
}

func newPartitionSet(assignmentpb *pb.PartitionAssignment) PartitionSet {
var partitions []int
for _, p := range assignmentpb.GetPartitions() {
partitions[int(p)] = void
partitions = append(partitions, int(p))
}
return partitionSet(partitions)
return NewPartitionSet(partitions)
}

func (ps partitionSet) Ints() (partitions []int) {
// Ints returns the partitions contained in this set as an unsorted slice.
func (ps PartitionSet) Ints() (partitions []int) {
for p := range ps {
partitions = append(partitions, p)
}
return
}

func (ps partitionSet) Contains(partition int) bool {
// SortedInts returns the partitions contained in this set as a sorted slice.
func (ps PartitionSet) SortedInts() (partitions []int) {
partitions = ps.Ints()
sort.Ints(partitions)
return
}

// Contains returns true if this set contains the specified partition.
func (ps PartitionSet) Contains(partition int) bool {
_, exists := ps[partition]
return exists
}
Expand All @@ -54,9 +74,8 @@ func (ps partitionSet) Contains(partition int) bool {
type generateUUIDFunc func() (uuid.UUID, error)

// partitionAssignmentReceiver must enact the received partition assignment from
// the server, or otherwise return an error, which will break the stream. The
// receiver must not call the assigner, as this would result in a deadlock.
type partitionAssignmentReceiver func(partitionSet) error
// the server, or otherwise return an error, which will break the stream.
type partitionAssignmentReceiver func(PartitionSet) error

// assigner wraps the partition assignment stream and notifies a receiver when
// the server sends a new set of partition assignments for a subscriber.
Expand Down
10 changes: 3 additions & 7 deletions pubsublite/internal/wire/assigner_test.go
Expand Up @@ -16,7 +16,6 @@ package wire
import (
"context"
"errors"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -46,9 +45,7 @@ func TestPartitionSet(t *testing.T) {
}
}

gotPartitions := partitions.Ints()
sort.Ints(gotPartitions)
if !testutil.Equal(gotPartitions, wantPartitions) {
if gotPartitions := partitions.SortedInts(); !testutil.Equal(gotPartitions, wantPartitions) {
t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions)
}
}
Expand Down Expand Up @@ -91,9 +88,8 @@ func newTestAssigner(t *testing.T, subscription string, recvErr error) *testAssi
return ta
}

func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
p := partitions.Ints()
sort.Ints(p)
func (ta *testAssigner) receiveAssignment(partitions PartitionSet) error {
p := partitions.SortedInts()
ta.partitions <- p

if ta.recvError != nil {
Expand Down
69 changes: 52 additions & 17 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -415,21 +415,22 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu
// partitions.
type multiPartitionSubscriber struct {
// Immutable after creation.
subscribers []*singlePartitionSubscriber
subscribers map[int]*singlePartitionSubscriber

apiClientService
}

func newMultiPartitionSubscriber(allClients apiClients, subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
ms := &multiPartitionSubscriber{
subscribers: make(map[int]*singlePartitionSubscriber),
apiClientService: apiClientService{clients: allClients},
}
ms.init()

for _, partition := range subFactory.settings.Partitions {
subscriber := subFactory.New(partition)
ms.unsafeAddServices(subscriber)
ms.subscribers = append(ms.subscribers, subscriber)
ms.subscribers[partition] = subscriber
}
return ms
}
Expand All @@ -445,13 +446,23 @@ func (ms *multiPartitionSubscriber) Terminate() {
}
}

// PartitionActive returns whether the partition is active.
func (ms *multiPartitionSubscriber) PartitionActive(partition int) bool {
_, exists := ms.subscribers[partition]
return exists
}

// ReassignmentHandlerFunc receives a partition assignment change.
type ReassignmentHandlerFunc func(before, after PartitionSet) error

// assigningSubscriber uses the Pub/Sub Lite partition assignment service to
// listen to its assigned partition numbers and dynamically add/remove
// singlePartitionSubscribers.
type assigningSubscriber struct {
// Immutable after creation.
subFactory *singlePartitionSubscriberFactory
assigner *assigner
reassignmentHandler ReassignmentHandlerFunc
subFactory *singlePartitionSubscriberFactory
assigner *assigner

// Fields below must be guarded with mu.
// Subscribers keyed by partition number. Updated as assignments change.
Expand All @@ -460,11 +471,13 @@ type assigningSubscriber struct {
apiClientService
}

func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) {
func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.PartitionAssignmentClient, reassignmentHandler ReassignmentHandlerFunc,
genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) {
as := &assigningSubscriber{
apiClientService: apiClientService{clients: allClients},
subFactory: subFactory,
subscribers: make(map[int]*singlePartitionSubscriber),
apiClientService: apiClientService{clients: allClients},
reassignmentHandler: reassignmentHandler,
subFactory: subFactory,
subscribers: make(map[int]*singlePartitionSubscriber),
}
as.init()

Expand All @@ -477,12 +490,17 @@ func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.Partit
return as, nil
}

func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
removedSubscribers, err := as.doHandleAssignment(partitions)
func (as *assigningSubscriber) handleAssignment(nextPartitions PartitionSet) error {
previousPartitions, removedSubscribers, err := as.doHandleAssignment(nextPartitions)
if err != nil {
return err
}

// Notify the user reassignment handler.
if err := as.reassignmentHandler(previousPartitions, nextPartitions); err != nil {
return err
}

// Wait for removed subscribers to completely stop (which waits for commit
// acknowledgments from the server) before acking the assignment. This avoids
// commits racing with the new assigned client.
Expand All @@ -492,17 +510,23 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
return nil
}

func (as *assigningSubscriber) doHandleAssignment(partitions partitionSet) ([]*singlePartitionSubscriber, error) {
// Returns the previous set of partitions and removed subscribers.
func (as *assigningSubscriber) doHandleAssignment(nextPartitions PartitionSet) (PartitionSet, []*singlePartitionSubscriber, error) {
as.mu.Lock()
defer as.mu.Unlock()

var previousPartitions []int
for partition := range as.subscribers {
previousPartitions = append(previousPartitions, partition)
}

// Handle new partitions.
for _, partition := range partitions.Ints() {
for _, partition := range nextPartitions.Ints() {
if _, exists := as.subscribers[partition]; !exists {
subscriber := as.subFactory.New(partition)
if err := as.unsafeAddServices(subscriber); err != nil {
// Occurs when the assigningSubscriber is stopping/stopped.
return nil, err
return nil, nil, err
}
as.subscribers[partition] = subscriber
}
Expand All @@ -511,7 +535,7 @@ func (as *assigningSubscriber) doHandleAssignment(partitions partitionSet) ([]*s
// Handle removed partitions.
var removedSubscribers []*singlePartitionSubscriber
for partition, subscriber := range as.subscribers {
if !partitions.Contains(partition) {
if !nextPartitions.Contains(partition) {
// Ignore unacked messages from this point on to avoid conflicting with
// the commits of the new subscriber that will be assigned this partition.
subscriber.Terminate()
Expand All @@ -523,7 +547,7 @@ func (as *assigningSubscriber) doHandleAssignment(partitions partitionSet) ([]*s
delete(as.subscribers, partition)
}
}
return removedSubscribers, nil
return NewPartitionSet(previousPartitions), removedSubscribers, nil
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
Expand All @@ -537,6 +561,15 @@ func (as *assigningSubscriber) Terminate() {
}
}

// PartitionActive returns whether the partition is still active.
func (as *assigningSubscriber) PartitionActive(partition int) bool {
as.mu.Lock()
defer as.mu.Unlock()

_, exists := as.subscribers[partition]
return exists
}

// Subscriber is the client interface exported from this package for receiving
// messages.
type Subscriber interface {
Expand All @@ -545,10 +578,12 @@ type Subscriber interface {
Stop()
WaitStopped() error
Terminate()
PartitionActive(int) bool
}

// NewSubscriber creates a new client for receiving messages.
func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc, region, subscriptionPath string, opts ...option.ClientOption) (Subscriber, error) {
func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc, reassignmentHandler ReassignmentHandlerFunc,
region, subscriptionPath string, opts ...option.ClientOption) (Subscriber, error) {
if err := ValidateRegion(region); err != nil {
return nil, err
}
Expand Down Expand Up @@ -588,5 +623,5 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa
return nil, err
}
allClients = append(allClients, partitionClient)
return newAssigningSubscriber(allClients, partitionClient, uuid.NewRandom, subFactory)
return newAssigningSubscriber(allClients, partitionClient, reassignmentHandler, uuid.NewRandom, subFactory)
}

0 comments on commit 393b0a3

Please sign in to comment.