Skip to content

Commit

Permalink
Merge branch 'master' into regen_gocloud
Browse files Browse the repository at this point in the history
  • Loading branch information
tbpg committed Dec 1, 2020
2 parents 66d8525 + 8d3efa9 commit 2db396b
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 27 deletions.
10 changes: 8 additions & 2 deletions pubsublite/internal/wire/periodic_task.go
Expand Up @@ -59,10 +59,16 @@ func (pt *periodicTask) Stop() {

func (pt *periodicTask) poll(ticker *time.Ticker, stop chan struct{}) {
for {
// stop has higher priority.
select {
case <-stop:
// Ends the goroutine.
return
return // Ends the goroutine.
default:
}

select {
case <-stop:
return // Ends the goroutine.
case <-ticker.C:
pt.task()
}
Expand Down
3 changes: 3 additions & 0 deletions pubsublite/internal/wire/streams.go
Expand Up @@ -47,6 +47,9 @@ const (
// streamHandler methods must not be called while holding retryableStream.mu in
// order to prevent the streamHandler calling back into the retryableStream and
// deadlocking.
//
// If any streamHandler method implementations block, this will block the
// retryableStream.connectStream goroutine processing the underlying stream.
type streamHandler interface {
// newStream implementations must create the client stream with the given
// (cancellable) context.
Expand Down
124 changes: 99 additions & 25 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"reflect"
"sync"
"time"

"google.golang.org/grpc"
Expand All @@ -43,6 +44,81 @@ type ReceivedMessage struct {
// partition.
type MessageReceiverFunc func([]*ReceivedMessage)

const maxMessagesBufferSize = 1000

// messageDeliveryQueue delivers received messages to the client-provided
// MessageReceiverFunc sequentially.
type messageDeliveryQueue struct {
receiver MessageReceiverFunc
messagesC chan []*ReceivedMessage
stopC chan struct{}

// Fields below must be guarded with mu.
mu sync.Mutex
status serviceStatus
}

func newMessageDeliveryQueue(receiver MessageReceiverFunc, bufferSize int) *messageDeliveryQueue {
// The buffer size is based on ReceiveSettings.MaxOutstandingMessages to
// handle the worst case of single messages. But ensure there's a reasonable
// limit as channel buffer capacity is allocated on creation.
if bufferSize > maxMessagesBufferSize {
bufferSize = maxMessagesBufferSize
}
return &messageDeliveryQueue{
receiver: receiver,
messagesC: make(chan []*ReceivedMessage, bufferSize),
stopC: make(chan struct{}),
}
}

func (mq *messageDeliveryQueue) Start() {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status == serviceUninitialized {
go mq.deliverMessages()
mq.status = serviceActive
}
}

func (mq *messageDeliveryQueue) Stop() {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status < serviceTerminated {
close(mq.stopC)
mq.status = serviceTerminated
}
}

func (mq *messageDeliveryQueue) Add(messages []*ReceivedMessage) {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status == serviceActive {
mq.messagesC <- messages
}
}

func (mq *messageDeliveryQueue) deliverMessages() {
for {
// stopC has higher priority.
select {
case <-mq.stopC:
return // Ends the goroutine.
default:
}

select {
case <-mq.stopC:
return // Ends the goroutine.
case msgs := <-mq.messagesC:
mq.receiver(msgs)
}
}
}

// The frequency of sending batch flow control requests.
const batchFlowControlPeriod = 100 * time.Millisecond

Expand All @@ -58,9 +134,9 @@ type subscribeStream struct {
settings ReceiveSettings
subscription subscriptionPartition
initialReq *pb.SubscribeRequest
receiver MessageReceiverFunc
messageQueue *messageDeliveryQueue

// Fields below must be guarded with mutex.
// Fields below must be guarded with mu.
stream *retryableStream
acks *ackTracker
offsetTracker subscriberOffsetTracker
Expand All @@ -86,8 +162,8 @@ func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, s
},
},
},
receiver: receiver,
acks: acks,
messageQueue: newMessageDeliveryQueue(receiver, settings.MaxOutstandingMessages),
acks: acks,
}
s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{}))

Expand All @@ -108,6 +184,7 @@ func (s *subscribeStream) Start() {
if s.unsafeUpdateStatus(serviceStarting, nil) {
s.stream.Start()
s.pollFlowControl.Start()
s.messageQueue.Start()

s.flowControl.OnClientFlow(flowControlTokens{
Bytes: int64(s.settings.MaxOutstandingBytes),
Expand Down Expand Up @@ -150,13 +227,12 @@ func (s *subscribeStream) onStreamStatusChange(status streamStatus) {
// Reinitialize the offset and flow control tokens when a new subscribe
// stream instance is connected.
if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
// Note: If Send() returns false, the subscriber will either terminate or
// the stream will be reconnected.
if s.stream.Send(&pb.SubscribeRequest{
if !s.stream.Send(&pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
}) {
s.seekInFlight = true
return
}
s.seekInFlight = true
}
s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
s.pollFlowControl.Start()
Expand All @@ -171,30 +247,26 @@ func (s *subscribeStream) onStreamStatusChange(status streamStatus) {
}

func (s *subscribeStream) onResponse(response interface{}) {
var receivedMsgs []*ReceivedMessage
var err error
s.mu.Lock()
defer s.mu.Unlock()

if s.status >= serviceTerminating {
return
}

var err error
subscribeResponse, _ := response.(*pb.SubscribeResponse)
switch {
case subscribeResponse.GetMessages() != nil:
receivedMsgs, err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
case subscribeResponse.GetSeek() != nil:
err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
default:
err = errInvalidSubscribeResponse
}

if receivedMsgs != nil {
// Deliver messages without holding the mutex to prevent deadlocks.
s.mu.Unlock()
s.receiver(receivedMsgs)
return
}
if err != nil {
s.unsafeInitiateShutdown(serviceTerminated, err)
}
s.mu.Unlock()
}

func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
Expand All @@ -205,15 +277,15 @@ func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error
return nil
}

func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) ([]*ReceivedMessage, error) {
func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) error {
if len(response.Messages) == 0 {
return nil, errServerNoMessages
return errServerNoMessages
}
if err := s.offsetTracker.OnMessages(response.Messages); err != nil {
return nil, err
return err
}
if err := s.flowControl.OnMessages(response.Messages); err != nil {
return nil, err
return err
}

var receivedMsgs []*ReceivedMessage
Expand All @@ -222,11 +294,12 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse)
// `committer`.
ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
if err := s.acks.Push(ack); err != nil {
return nil, err
return err
}
receivedMsgs = append(receivedMsgs, &ReceivedMessage{Msg: msg, Ack: ack})
}
return receivedMsgs, nil
s.messageQueue.Add(receivedMsgs)
return nil
}

func (s *subscribeStream) onAck(ac *ackConsumer) {
Expand Down Expand Up @@ -276,6 +349,7 @@ func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err
}

// No data to send. Immediately terminate the stream.
s.messageQueue.Stop()
s.pollFlowControl.Stop()
s.stream.Stop()
}
Expand Down

0 comments on commit 2db396b

Please sign in to comment.