Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): subscriber client #3442

Merged
merged 10 commits into from Jan 6, 2021
1 change: 1 addition & 0 deletions pubsublite/go.mod
Expand Up @@ -9,6 +9,7 @@ require (
github.com/google/go-cmp v0.5.4
github.com/google/uuid v1.1.2
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a
Expand Down
16 changes: 13 additions & 3 deletions pubsublite/internal/wire/committer.go
Expand Up @@ -98,6 +98,16 @@ func (c *committer) Stop() {
c.unsafeInitiateShutdown(serviceTerminating, nil)
}

// Terminate will discard outstanding acks and send the final commit offset to
// the server.
func (c *committer) Terminate() {
c.mu.Lock()
defer c.mu.Unlock()

c.acks.Release()
c.unsafeInitiateShutdown(serviceTerminating, nil)
}

func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) {
return c.cursorClient.StreamingCommitCursor(ctx)
}
Expand Down Expand Up @@ -201,18 +211,18 @@ func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error

// Otherwise discard outstanding acks and immediately terminate the stream.
c.acks.Release()
c.unsafeTerminate()
c.unsafeOnTerminated()
}

func (c *committer) unsafeCheckDone() {
// The commit stream can be closed once the final commit offset has been
// confirmed and there are no outstanding acks.
if c.status == serviceTerminating && c.cursorTracker.UpToDate() && c.acks.Empty() {
c.unsafeTerminate()
c.unsafeOnTerminated()
}
}

func (c *committer) unsafeTerminate() {
func (c *committer) unsafeOnTerminated() {
c.pollCommits.Stop()
c.stream.Stop()
}
35 changes: 35 additions & 0 deletions pubsublite/internal/wire/committer_test.go
Expand Up @@ -48,6 +48,10 @@ func (tc *testCommitter) SendBatchCommit() {
tc.cmt.commitOffsetToStream()
}

func (tc *testCommitter) Terminate() {
tc.cmt.Terminate()
}

func TestCommitterStreamReconnect(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
Expand Down Expand Up @@ -123,6 +127,37 @@ func TestCommitterStopFlushesCommits(t *testing.T) {
}
}

func TestCommitterTerminateDiscardsOutstandingAcks(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
ack2 := newAckConsumer(55, 0, nil)
acks := newAckTracker()
acks.Push(ack1)
acks.Push(ack2)

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()
cmt.Terminate() // Terminate should flush the first offset
ack2.Ack() // Acks after Terminate() are discarded
cmt.SendBatchCommit() // Should do nothing (server does not expect second commit)
if gotErr := cmt.FinalError(); gotErr != nil {
t.Errorf("Final err: (%v), want: <nil>", gotErr)
}
}

func TestCommitterPermanentStreamError(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
acks := newAckTracker()
Expand Down
33 changes: 33 additions & 0 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -353,6 +353,13 @@ type singlePartitionSubscriber struct {
compositeService
}

// Terminate shuts down the singlePartitionSubscriber without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (s *singlePartitionSubscriber) Terminate() {
s.subscriber.Stop()
s.committer.Terminate()
}

type singlePartitionSubscriberFactory struct {
ctx context.Context
subClient *vkit.SubscriberClient
Expand Down Expand Up @@ -380,6 +387,8 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu
// multiPartitionSubscriber receives messages from a fixed set of topic
// partitions.
type multiPartitionSubscriber struct {
subscribers []*singlePartitionSubscriber

compositeService
}

Expand All @@ -390,10 +399,22 @@ func newMultiPartitionSubscriber(subFactory *singlePartitionSubscriberFactory) *
for _, partition := range subFactory.settings.Partitions {
subscriber := subFactory.New(partition)
ms.unsafeAddServices(subscriber)
ms.subscribers = append(ms.subscribers, subscriber)
}
return ms
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (ms *multiPartitionSubscriber) Terminate() {
ms.mu.Lock()
defer ms.mu.Unlock()

for _, sub := range ms.subscribers {
sub.Terminate()
}
}

// assigningSubscriber uses the Pub/Sub Lite partition assignment service to
// listen to its assigned partition numbers and dynamically add/remove
// singlePartitionSubscribers.
Expand Down Expand Up @@ -453,13 +474,25 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
return nil
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (as *assigningSubscriber) Terminate() {
as.mu.Lock()
defer as.mu.Unlock()

for _, sub := range as.subscribers {
sub.Terminate()
}
}

// Subscriber is the client interface exported from this package for receiving
// messages.
type Subscriber interface {
Start()
WaitStarted() error
Stop()
WaitStopped() error
Terminate()
}

// NewSubscriber creates a new client for receiving messages.
Expand Down
70 changes: 68 additions & 2 deletions pubsublite/ps/example_test.go
Expand Up @@ -24,7 +24,11 @@ import (

func ExamplePublisherClient_Publish() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
topic := pubsublite.TopicPath{
Project: "project-id",
Zone: "zone",
TopicID: "topic-id",
}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
Expand All @@ -48,7 +52,11 @@ func ExamplePublisherClient_Publish() {

func ExamplePublisherClient_Error() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
topic := pubsublite.TopicPath{
Project: "project-id",
Zone: "zone",
TopicID: "topic-id",
}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
Expand All @@ -72,3 +80,61 @@ func ExamplePublisherClient_Error() {
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}

func ExampleSubscriberClient_Receive() {
ctx := context.Background()
subscription := pubsublite.SubscriptionPath{
Project: "project-id",
Zone: "zone",
SubscriptionID: "subscription-id",
}
subscriber, err := ps.NewSubscriberClient(ctx, ps.DefaultReceiveSettings, subscription)
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// TODO: Handle error.
}
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
cancel()
}

// This example shows how to throttle SubscriberClient.Receive, which aims for
// high throughput by default. By limiting the number of messages and/or bytes
// being processed at once, you can bound your program's resource consumption.
// Note that ReceiveSettings apply per partition, so keep in mind the number of
// partitions in the associated topic.
func ExampleSubscriberClient_Receive_maxOutstanding() {
ctx := context.Background()
subscription := pubsublite.SubscriptionPath{
Project: "project-id",
Zone: "zone",
SubscriptionID: "subscription-id",
}
settings := ps.DefaultReceiveSettings
settings.MaxOutstandingMessages = 5
settings.MaxOutstandingBytes = 10e6
subscriber, err := ps.NewSubscriberClient(ctx, settings, subscription)
if err != nil {
// TODO: Handle error.
}
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
cancel()
}
19 changes: 13 additions & 6 deletions pubsublite/ps/settings.go
Expand Up @@ -41,9 +41,9 @@ type KeyExtractorFunc func(*pubsub.Message) []byte
// terminate.
type PublishMessageTransformerFunc func(*pubsub.Message, *pb.PubSubMessage) error

// PublishSettings configure the PublisherClient. These settings apply per
// partition. If BufferedByteLimit is being used to bound memory usage, keep in
// mind the number of partitions in the topic.
// PublishSettings configure the PublisherClient. Batching settings
// (DelayThreshold, CountThreshold, ByteThreshold, BufferedByteLimit) apply per
// partition.
//
// A zero PublishSettings will result in values equivalent to
// DefaultPublishSettings.
Expand Down Expand Up @@ -76,6 +76,10 @@ type PublishSettings struct {
// returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as
// DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0.
//
// Note that this setting applies per partition. If BufferedByteLimit is being
// used to bound memory usage, keep in mind the number of partitions in the
// topic.
//
// Note that Pub/Sub Lite topics are provisioned a publishing throughput
// capacity, per partition, shared by all publisher clients. Setting a large
// buffer size can mitigate transient publish spikes. However, consistently
Expand Down Expand Up @@ -146,9 +150,8 @@ type NackHandler func(*pubsub.Message) error
// will consider this a fatal error and terminate.
type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error

// ReceiveSettings configure the SubscriberClient. These settings apply per
// partition. If MaxOutstandingBytes is being used to bound memory usage, keep
// in mind the number of partitions in the associated topic.
// ReceiveSettings configure the SubscriberClient. Flow control settings
// (MaxOutstandingMessages, MaxOutstandingBytes) apply per partition.
//
// A zero ReceiveSettings will result in values equivalent to
// DefaultReceiveSettings.
Expand All @@ -161,6 +164,10 @@ type ReceiveSettings struct {
// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
// messages. If MaxOutstandingBytes is 0, it will be treated as
// DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0.
//
// Note that this setting applies per partition. If MaxOutstandingBytes is
// being used to bound memory usage, keep in mind the number of partitions in
// the associated topic.
MaxOutstandingBytes int

// The maximum time that the client will attempt to establish a subscribe
Expand Down