Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): Single and multi partition subscribers #3221

Merged
merged 8 commits into from Nov 19, 2020
6 changes: 3 additions & 3 deletions pubsublite/internal/wire/flow_control.go
Expand Up @@ -32,7 +32,7 @@ type flowControlTokens struct {
Messages int64
}

// A TokenCounter stores the amount of outstanding byte and message flow control
// A tokenCounter stores the amount of outstanding byte and message flow control
// tokens that the client believes exists for the stream.
type tokenCounter struct {
Bytes int64
Expand Down Expand Up @@ -81,7 +81,7 @@ func (tc *tokenCounter) ToFlowControlRequest() *pb.FlowControlRequest {

// flowControlBatcher tracks flow control tokens and manages batching of flow
// control requests to avoid overwhelming the server. It is only accessed by
// the wireSubscriber.
// the subscribeStream.
type flowControlBatcher struct {
// The current amount of outstanding byte and message flow control tokens.
clientTokens tokenCounter
Expand Down Expand Up @@ -142,7 +142,7 @@ func (fc *flowControlBatcher) ShouldExpediteBatchRequest() bool {
}

// subscriberOffsetTracker tracks the expected offset of the next message
// received from the server. It is only accessed by the wireSubscriber.
// received from the server. It is only accessed by the subscribeStream.
type subscriberOffsetTracker struct {
minNextOffset int64
}
Expand Down
327 changes: 327 additions & 0 deletions pubsublite/internal/wire/subscriber.go
@@ -0,0 +1,327 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"errors"
"reflect"
"time"

"google.golang.org/grpc"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
errServerNoMessages = errors.New("pubsublite: server delivered no messages")
errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe")
errInvalidSubscribeResponse = errors.New("pubsublite: received invalid subscribe response from server")
errNoInFlightSeek = errors.New("pubsublite: received seek response for no in-flight seek")
)

// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
// acknowledging the message.
type ReceivedMessage struct {
Msg *pb.SequencedMessage
Ack AckConsumer
}

// MessageReceiverFunc receives a batch of Pub/Sub messages from a topic
// partition.
type MessageReceiverFunc func([]*ReceivedMessage)

// The frequency of sending batch flow control requests.
const batchFlowControlPeriod = 100 * time.Millisecond

// subscribeStream directly wraps the subscribe client stream. It passes
// messages to the message receiver and manages flow control. Flow control
// tokens are batched and sent to the stream via a periodic background task,
// although it can be expedited if the user is rapidly acking messages.
//
// Client-initiated seek unsupported.
type subscribeStream struct {
// Immutable after creation.
subClient *vkit.SubscriberClient
settings ReceiveSettings
subscription subscriptionPartition
initialReq *pb.SubscribeRequest
receiver MessageReceiverFunc

// Fields below must be guarded with mutex.
stream *retryableStream
acks *ackTracker
offsetTracker subscriberOffsetTracker
flowControl flowControlBatcher
pollFlowControl *periodicTask
seekInFlight bool

abstractService
}

func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings,
receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream {

s := &subscribeStream{
subClient: subClient,
settings: settings,
subscription: subscription,
initialReq: &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
},
},
},
receiver: receiver,
acks: acks,
}
s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{}))

backgroundTask := s.sendBatchFlowControl
if disableTasks {
backgroundTask = func() {}
}
s.pollFlowControl = newPeriodicTask(batchFlowControlPeriod, backgroundTask)
return s
}

// Start establishes a subscribe stream connection and initializes flow control
// tokens from ReceiveSettings.
func (s *subscribeStream) Start() {
s.mu.Lock()
defer s.mu.Unlock()

if s.unsafeUpdateStatus(serviceStarting, nil) {
s.stream.Start()
s.pollFlowControl.Start()

s.flowControl.OnClientFlow(flowControlTokens{
Bytes: int64(s.settings.MaxOutstandingBytes),
Messages: int64(s.settings.MaxOutstandingMessages),
})
}
}

// Stop immediately terminates the subscribe stream.
func (s *subscribeStream) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
s.unsafeInitiateShutdown(serviceTerminating, nil)
}

func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, error) {
return s.subClient.Subscribe(addSubscriptionRoutingMetadata(ctx, s.subscription))
}

func (s *subscribeStream) initialRequest() (interface{}, bool) {
return s.initialReq, true
}

func (s *subscribeStream) validateInitialResponse(response interface{}) error {
subscribeResponse, _ := response.(*pb.SubscribeResponse)
if subscribeResponse.GetInitial() == nil {
return errInvalidInitialSubscribeResponse
}
return nil
}

func (s *subscribeStream) onStreamStatusChange(status streamStatus) {
s.mu.Lock()
defer s.mu.Unlock()

switch status {
case streamConnected:
s.unsafeUpdateStatus(serviceActive, nil)

// Reinitialize the offset and flow control tokens when a new subscribe
// stream instance is connected.
if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
// Note: If Send() returns false, the subscriber will either terminate or
// the stream will be reconnected.
if s.stream.Send(&pb.SubscribeRequest{
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
}) {
s.seekInFlight = true
}
}
s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
s.pollFlowControl.Start()

case streamReconnecting:
s.seekInFlight = false
s.pollFlowControl.Stop()

case streamTerminated:
s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error())
}
}

func (s *subscribeStream) onResponse(response interface{}) {
var receivedMsgs []*ReceivedMessage
var err error
s.mu.Lock()

subscribeResponse, _ := response.(*pb.SubscribeResponse)
switch {
case subscribeResponse.GetMessages() != nil:
receivedMsgs, err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
case subscribeResponse.GetSeek() != nil:
err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
default:
err = errInvalidSubscribeResponse
}

if receivedMsgs != nil {
// Deliver messages without holding the mutex to prevent deadlocks.
s.mu.Unlock()
s.receiver(receivedMsgs)
return
}
if err != nil {
s.unsafeInitiateShutdown(serviceTerminated, err)
}
s.mu.Unlock()
}

func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
if !s.seekInFlight {
return errNoInFlightSeek
}
s.seekInFlight = false
return nil
}

func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) ([]*ReceivedMessage, error) {
if len(response.Messages) == 0 {
return nil, errServerNoMessages
}
if err := s.offsetTracker.OnMessages(response.Messages); err != nil {
return nil, err
}
if err := s.flowControl.OnMessages(response.Messages); err != nil {
return nil, err
}

var receivedMsgs []*ReceivedMessage
for _, msg := range response.Messages {
// Register outstanding acks, which are primarily handled by the
// `committer`.
ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
if err := s.acks.Push(ack); err != nil {
return nil, err
}
receivedMsgs = append(receivedMsgs, &ReceivedMessage{Msg: msg, Ack: ack})
}
return receivedMsgs, nil
}

func (s *subscribeStream) onAck(ac *ackConsumer) {
// Don't block the user's goroutine with potentially expensive ack processing.
go s.onAckAsync(ac.MsgBytes)
}

func (s *subscribeStream) onAckAsync(msgBytes int64) {
s.mu.Lock()
defer s.mu.Unlock()

if s.status == serviceActive {
s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1})
}
}

// sendBatchFlowControl is called by the periodic background task.
func (s *subscribeStream) sendBatchFlowControl() {
s.mu.Lock()
defer s.mu.Unlock()
s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest())
}

func (s *subscribeStream) unsafeAllowFlow(allow flowControlTokens) {
s.flowControl.OnClientFlow(allow)
if s.flowControl.ShouldExpediteBatchRequest() {
s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest())
}
}

func (s *subscribeStream) unsafeSendFlowControl(req *pb.FlowControlRequest) {
if req == nil {
return
}

// Note: If Send() returns false, the stream will be reconnected and
// flowControlBatcher.RequestForRestart() will be sent when the stream
// reconnects. So its return value is ignored.
s.stream.Send(&pb.SubscribeRequest{
Request: &pb.SubscribeRequest_FlowControl{FlowControl: req},
})
}

func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !s.unsafeUpdateStatus(targetStatus, err) {
return
}

// No data to send. Immediately terminate the stream.
s.pollFlowControl.Stop()
s.stream.Stop()
}

// singlePartitionSubscriber receives messages from a single topic partition.
// It requires 2 child services:
// - subscribeStream to receive messages from the subscribe stream.
// - committer to commit cursor offsets to the streaming commit cursor stream.
type singlePartitionSubscriber struct {
compositeService
}

type singlePartitionSubscriberFactory struct {
ctx context.Context
subClient *vkit.SubscriberClient
cursorClient *vkit.CursorClient
settings ReceiveSettings
subscriptionPath string
receiver MessageReceiverFunc
disableTasks bool
}

func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSubscriber {
subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition}
acks := newAckTracker()
commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks)
sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks)
ps := new(singlePartitionSubscriber)
ps.init()
ps.unsafeAddServices(sub, commit)
return ps
}

// multiPartitionSubscriber receives messages from a fixed set of topic
// partitions.
type multiPartitionSubscriber struct {
compositeService
}

func newMultiPartitionSubscriber(subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
ms := new(multiPartitionSubscriber)
ms.init()

for _, partition := range subFactory.settings.Partitions {
subscriber := subFactory.New(partition)
ms.unsafeAddServices(subscriber)
}
return ms
}