Skip to content

Commit

Permalink
feat(pubsublite): subscriber client (#3442)
Browse files Browse the repository at this point in the history
SubscriberClient wraps Pub/Sub Lite's internal wire.Subscriber and emulates the pubsub.Subscription.Receive API.
  • Loading branch information
tmdiep committed Jan 6, 2021
1 parent 443884c commit 221bfba
Show file tree
Hide file tree
Showing 8 changed files with 881 additions and 11 deletions.
1 change: 1 addition & 0 deletions pubsublite/go.mod
Expand Up @@ -9,6 +9,7 @@ require (
github.com/google/go-cmp v0.5.4
github.com/google/uuid v1.1.4
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/tools v0.0.0-20210105210202-9ed45478a130 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.36.0
Expand Down
16 changes: 13 additions & 3 deletions pubsublite/internal/wire/committer.go
Expand Up @@ -98,6 +98,16 @@ func (c *committer) Stop() {
c.unsafeInitiateShutdown(serviceTerminating, nil)
}

// Terminate will discard outstanding acks and send the final commit offset to
// the server.
func (c *committer) Terminate() {
c.mu.Lock()
defer c.mu.Unlock()

c.acks.Release()
c.unsafeInitiateShutdown(serviceTerminating, nil)
}

func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) {
return c.cursorClient.StreamingCommitCursor(ctx)
}
Expand Down Expand Up @@ -201,18 +211,18 @@ func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error

// Otherwise discard outstanding acks and immediately terminate the stream.
c.acks.Release()
c.unsafeTerminate()
c.unsafeOnTerminated()
}

func (c *committer) unsafeCheckDone() {
// The commit stream can be closed once the final commit offset has been
// confirmed and there are no outstanding acks.
if c.status == serviceTerminating && c.cursorTracker.UpToDate() && c.acks.Empty() {
c.unsafeTerminate()
c.unsafeOnTerminated()
}
}

func (c *committer) unsafeTerminate() {
func (c *committer) unsafeOnTerminated() {
c.pollCommits.Stop()
c.stream.Stop()
}
35 changes: 35 additions & 0 deletions pubsublite/internal/wire/committer_test.go
Expand Up @@ -48,6 +48,10 @@ func (tc *testCommitter) SendBatchCommit() {
tc.cmt.commitOffsetToStream()
}

func (tc *testCommitter) Terminate() {
tc.cmt.Terminate()
}

func TestCommitterStreamReconnect(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
Expand Down Expand Up @@ -123,6 +127,37 @@ func TestCommitterStopFlushesCommits(t *testing.T) {
}
}

func TestCommitterTerminateDiscardsOutstandingAcks(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
ack2 := newAckConsumer(55, 0, nil)
acks := newAckTracker()
acks.Push(ack1)
acks.Push(ack2)

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()
cmt.Terminate() // Terminate should flush the first offset
ack2.Ack() // Acks after Terminate() are discarded
cmt.SendBatchCommit() // Should do nothing (server does not expect second commit)
if gotErr := cmt.FinalError(); gotErr != nil {
t.Errorf("Final err: (%v), want: <nil>", gotErr)
}
}

func TestCommitterPermanentStreamError(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
acks := newAckTracker()
Expand Down
33 changes: 33 additions & 0 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -353,6 +353,13 @@ type singlePartitionSubscriber struct {
compositeService
}

// Terminate shuts down the singlePartitionSubscriber without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (s *singlePartitionSubscriber) Terminate() {
s.subscriber.Stop()
s.committer.Terminate()
}

type singlePartitionSubscriberFactory struct {
ctx context.Context
subClient *vkit.SubscriberClient
Expand Down Expand Up @@ -380,6 +387,8 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu
// multiPartitionSubscriber receives messages from a fixed set of topic
// partitions.
type multiPartitionSubscriber struct {
subscribers []*singlePartitionSubscriber

compositeService
}

Expand All @@ -390,10 +399,22 @@ func newMultiPartitionSubscriber(subFactory *singlePartitionSubscriberFactory) *
for _, partition := range subFactory.settings.Partitions {
subscriber := subFactory.New(partition)
ms.unsafeAddServices(subscriber)
ms.subscribers = append(ms.subscribers, subscriber)
}
return ms
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (ms *multiPartitionSubscriber) Terminate() {
ms.mu.Lock()
defer ms.mu.Unlock()

for _, sub := range ms.subscribers {
sub.Terminate()
}
}

// assigningSubscriber uses the Pub/Sub Lite partition assignment service to
// listen to its assigned partition numbers and dynamically add/remove
// singlePartitionSubscribers.
Expand Down Expand Up @@ -453,13 +474,25 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
return nil
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (as *assigningSubscriber) Terminate() {
as.mu.Lock()
defer as.mu.Unlock()

for _, sub := range as.subscribers {
sub.Terminate()
}
}

// Subscriber is the client interface exported from this package for receiving
// messages.
type Subscriber interface {
Start()
WaitStarted() error
Stop()
WaitStopped() error
Terminate()
}

// NewSubscriber creates a new client for receiving messages.
Expand Down
72 changes: 70 additions & 2 deletions pubsublite/ps/example_test.go
Expand Up @@ -24,7 +24,12 @@ import (

func ExamplePublisherClient_Publish() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
topic := pubsublite.TopicPath{
Project: "project-id",
Zone: "zone",
TopicID: "topic-id",
}
// NOTE: DefaultPublishSettings and empty PublishSettings{} are equivalent.
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
Expand All @@ -48,7 +53,11 @@ func ExamplePublisherClient_Publish() {

func ExamplePublisherClient_Error() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
topic := pubsublite.TopicPath{
Project: "project-id",
Zone: "zone",
TopicID: "topic-id",
}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
Expand All @@ -72,3 +81,62 @@ func ExamplePublisherClient_Error() {
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}

func ExampleSubscriberClient_Receive() {
ctx := context.Background()
subscription := pubsublite.SubscriptionPath{
Project: "project-id",
Zone: "zone",
SubscriptionID: "subscription-id",
}
// NOTE: DefaultReceiveSettings and empty ReceiveSettings{} are equivalent.
subscriber, err := ps.NewSubscriberClient(ctx, ps.DefaultReceiveSettings, subscription)
if err != nil {
// TODO: Handle error.
}
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
cancel()
}

// This example shows how to throttle SubscriberClient.Receive, which aims for
// high throughput by default. By limiting the number of messages and/or bytes
// being processed at once, you can bound your program's resource consumption.
// Note that ReceiveSettings apply per partition, so keep in mind the number of
// partitions in the associated topic.
func ExampleSubscriberClient_Receive_maxOutstanding() {
ctx := context.Background()
subscription := pubsublite.SubscriptionPath{
Project: "project-id",
Zone: "zone",
SubscriptionID: "subscription-id",
}
settings := ps.DefaultReceiveSettings
settings.MaxOutstandingMessages = 5
settings.MaxOutstandingBytes = 10e6
subscriber, err := ps.NewSubscriberClient(ctx, settings, subscription)
if err != nil {
// TODO: Handle error.
}
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
cancel()
}
19 changes: 13 additions & 6 deletions pubsublite/ps/settings.go
Expand Up @@ -41,9 +41,9 @@ type KeyExtractorFunc func(*pubsub.Message) []byte
// terminate.
type PublishMessageTransformerFunc func(*pubsub.Message, *pb.PubSubMessage) error

// PublishSettings configure the PublisherClient. These settings apply per
// partition. If BufferedByteLimit is being used to bound memory usage, keep in
// mind the number of partitions in the topic.
// PublishSettings configure the PublisherClient. Batching settings
// (DelayThreshold, CountThreshold, ByteThreshold, BufferedByteLimit) apply per
// partition.
//
// A zero PublishSettings will result in values equivalent to
// DefaultPublishSettings.
Expand Down Expand Up @@ -76,6 +76,10 @@ type PublishSettings struct {
// returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as
// DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0.
//
// Note that this setting applies per partition. If BufferedByteLimit is being
// used to bound memory usage, keep in mind the number of partitions in the
// topic.
//
// Note that Pub/Sub Lite topics are provisioned a publishing throughput
// capacity, per partition, shared by all publisher clients. Setting a large
// buffer size can mitigate transient publish spikes. However, consistently
Expand Down Expand Up @@ -146,9 +150,8 @@ type NackHandler func(*pubsub.Message) error
// will consider this a fatal error and terminate.
type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error

// ReceiveSettings configure the SubscriberClient. These settings apply per
// partition. If MaxOutstandingBytes is being used to bound memory usage, keep
// in mind the number of partitions in the associated topic.
// ReceiveSettings configure the SubscriberClient. Flow control settings
// (MaxOutstandingMessages, MaxOutstandingBytes) apply per partition.
//
// A zero ReceiveSettings will result in values equivalent to
// DefaultReceiveSettings.
Expand All @@ -161,6 +164,10 @@ type ReceiveSettings struct {
// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
// messages. If MaxOutstandingBytes is 0, it will be treated as
// DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0.
//
// Note that this setting applies per partition. If MaxOutstandingBytes is
// being used to bound memory usage, keep in mind the number of partitions in
// the associated topic.
MaxOutstandingBytes int

// The maximum time that the client will attempt to establish a subscribe
Expand Down

0 comments on commit 221bfba

Please sign in to comment.