Navigation Menu

Skip to content

Commit

Permalink
feat(pubsublite): support out of band seeks (#4208)
Browse files Browse the repository at this point in the history
- Supports admin/out of band seeks pushed from the server.
- Sets the new InitialSubscribeRequest.initial_location field for subscribe streams.
  • Loading branch information
tmdiep committed Jun 15, 2021
1 parent 0ad3f16 commit 1432e67
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 136 deletions.
1 change: 1 addition & 0 deletions pubsublite/internal/wire/committer.go
Expand Up @@ -123,6 +123,7 @@ func (c *committer) BlockingReset() error {
c.mu.Lock()
defer c.mu.Unlock()

c.acks.Release() // Discard outstanding acks
for !c.cursorTracker.UpToDate() && c.status < serviceTerminating {
c.unsafeCommitOffsetToStream()
c.flushPending.Wait()
Expand Down
43 changes: 17 additions & 26 deletions pubsublite/internal/wire/committer_test.go
Expand Up @@ -328,8 +328,7 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(56), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
Expand All @@ -340,6 +339,8 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if err := cmt.BlockingReset(); err != nil {
Expand All @@ -350,14 +351,8 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()
complete.VerifyNotDone(t)
ack2.Ack()
cmt.SendBatchCommit()

// Until the final commit response is received, committer.BlockingReset should
// not return.
// Until the commit response is received, committer.BlockingReset should not
// return.
barrier.ReleaseAfter(func() {
complete.VerifyNotDone(t)
})
Expand All @@ -371,6 +366,9 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}

// This ack should have been discarded.
ack2.Ack()

// Calling committer.BlockingReset again should immediately return.
if err := cmt.BlockingReset(); err != nil {
t.Errorf("BlockingReset() got err: (%v), want: <nil>", err)
Expand All @@ -390,7 +388,7 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
Expand All @@ -401,6 +399,8 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
Expand All @@ -410,18 +410,11 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()
complete.VerifyNotDone(t)

// committer.BlockingReset should return when the committer is stopped.
cmt.Stop()
complete.WaitUntilDone(t, serviceTestWaitTimeout)

// Ack tracker should not be reset.
if got, want := acks.Empty(), false; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
barrier.ReleaseAfter(func() {
cmt.Stop()
complete.WaitUntilDone(t, serviceTestWaitTimeout)
})

cmt.Terminate()
if gotErr := cmt.FinalError(); gotErr != nil {
Expand Down Expand Up @@ -452,17 +445,15 @@ func TestCommitterBlockingResetFatalError(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
t.Errorf("BlockingReset() got: (%v), want: (%v)", got, want)
}
complete.SetDone()
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()

// committer.BlockingReset should return when the committer terminates due to
// fatal server error.
Expand Down
9 changes: 6 additions & 3 deletions pubsublite/internal/wire/flow_control.go
Expand Up @@ -160,11 +160,14 @@ func (ot *subscriberOffsetTracker) Reset() {
}

// RequestForRestart returns the seek request to send when a new subscribe
// stream reconnects. Returns nil if the subscriber has just started, in which
// case the server returns the offset of the last committed cursor.
// stream reconnects.
func (ot *subscriberOffsetTracker) RequestForRestart() *pb.SeekRequest {
if ot.minNextOffset <= 0 {
return nil
return &pb.SeekRequest{
Target: &pb.SeekRequest_NamedTarget_{
NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
},
}
}
return &pb.SeekRequest{
Target: &pb.SeekRequest_Cursor{
Expand Down
6 changes: 5 additions & 1 deletion pubsublite/internal/wire/flow_control_test.go
Expand Up @@ -253,7 +253,11 @@ func TestOffsetTrackerRequestForRestart(t *testing.T) {
{
desc: "Uninitialized tracker",
tracker: subscriberOffsetTracker{},
want: nil,
want: &pb.SeekRequest{
Target: &pb.SeekRequest_NamedTarget_{
NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
},
},
},
{
desc: "Next offset positive",
Expand Down
35 changes: 22 additions & 13 deletions pubsublite/internal/wire/requests_test.go
Expand Up @@ -134,33 +134,42 @@ func msgPubResp(cursor int64) *pb.PublishResponse {

// SubscriberService

func initSubReq(subscription subscriptionPartition) *pb.SubscribeRequest {
func initSubReqCommit(subscription subscriptionPartition) *pb.SubscribeRequest {
return &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
InitialLocation: &pb.SeekRequest{
Target: &pb.SeekRequest_NamedTarget_{
NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
},
},
},
},
}
}

func initSubResp() *pb.SubscribeResponse {
return &pb.SubscribeResponse{
Response: &pb.SubscribeResponse_Initial{
Initial: &pb.InitialSubscribeResponse{},
func initSubReqCursor(subscription subscriptionPartition, offset int64) *pb.SubscribeRequest {
return &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
InitialLocation: &pb.SeekRequest{
Target: &pb.SeekRequest_Cursor{
Cursor: &pb.Cursor{Offset: offset},
},
},
},
},
}
}

func seekReq(offset int64) *pb.SubscribeRequest {
return &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Seek{
Seek: &pb.SeekRequest{
Target: &pb.SeekRequest_Cursor{
Cursor: &pb.Cursor{Offset: offset},
},
},
func initSubResp() *pb.SubscribeResponse {
return &pb.SubscribeResponse{
Response: &pb.SubscribeResponse_Initial{
Initial: &pb.InitialSubscribeResponse{},
},
}
}
Expand Down
85 changes: 49 additions & 36 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -31,8 +31,7 @@ import (
var (
errServerNoMessages = errors.New("pubsublite: server delivered no messages")
errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe")
errInvalidSubscribeResponse = errors.New("pubsublite: received invalid subscribe response from server")
errNoInFlightSeek = errors.New("pubsublite: received seek response for no in-flight seek")
errInvalidSubscribeResponse = errors.New("pubsublite: received unexpected subscribe response from server")
)

// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
Expand Down Expand Up @@ -126,6 +125,10 @@ func (mq *messageDeliveryQueue) deliverMessages(messagesC chan *ReceivedMessage,
// The frequency of sending batch flow control requests.
const batchFlowControlPeriod = 100 * time.Millisecond

// Handles subscriber reset actions that are external to the subscribeStream
// (e.g. wait for the committer to flush commits).
type subscriberResetHandler func() error

// subscribeStream directly wraps the subscribe client stream. It passes
// messages to the message receiver and manages flow control. Flow control
// tokens are batched and sent to the stream via a periodic background task,
Expand All @@ -137,7 +140,7 @@ type subscribeStream struct {
subClient *vkit.SubscriberClient
settings ReceiveSettings
subscription subscriptionPartition
initialReq *pb.SubscribeRequest
handleReset subscriberResetHandler
metadata pubsubMetadata

// Fields below must be guarded with mu.
Expand All @@ -146,27 +149,20 @@ type subscribeStream struct {
offsetTracker subscriberOffsetTracker
flowControl flowControlBatcher
pollFlowControl *periodicTask
seekInFlight bool
enableBatchFlowControl bool

abstractService
}

func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings,
receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream {
receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker,
handleReset subscriberResetHandler, disableTasks bool) *subscribeStream {

s := &subscribeStream{
subClient: subClient,
settings: settings,
subscription: subscription,
initialReq: &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
},
},
},
handleReset: handleReset,
messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages),
metadata: newPubsubMetadata(),
}
Expand Down Expand Up @@ -212,7 +208,18 @@ func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, err
}

func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) {
return s.initialReq, initialResponseRequired(true)
s.mu.Lock()
defer s.mu.Unlock()
initReq := &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: s.subscription.Path,
Partition: int64(s.subscription.Partition),
InitialLocation: s.offsetTracker.RequestForRestart(),
},
},
}
return initReq, initialResponseRequired(true)
}

func (s *subscribeStream) validateInitialResponse(response interface{}) error {
Expand All @@ -231,27 +238,43 @@ func (s *subscribeStream) onStreamStatusChange(status streamStatus) {
case streamConnected:
s.unsafeUpdateStatus(serviceActive, nil)

// Reinitialize the offset and flow control tokens when a new subscribe
// stream instance is connected.
if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
if !s.stream.Send(&pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
}) {
return
}
s.seekInFlight = true
}
// Reinitialize the flow control tokens when a new subscribe stream instance
// is connected.
s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
s.enableBatchFlowControl = true
s.pollFlowControl.Start()

case streamReconnecting:
s.seekInFlight = false
// Ensure no batch flow control tokens are sent until the RequestForRestart
// is sent above when a new subscribe stream is initialized.
s.enableBatchFlowControl = false
s.pollFlowControl.Stop()

case streamResetState:
// Handle out-of-band seek notifications from the server. Committer and
// subscriber state are reset.

s.messageQueue.Stop()

// Wait for all message receiver callbacks to finish and the committer to
// flush pending commits and reset its state. Release the mutex while
// waiting.
s.mu.Unlock()
s.messageQueue.Wait()
err := s.handleReset()
s.mu.Lock()

if err != nil {
s.unsafeInitiateShutdown(serviceTerminating, nil)
return
}
s.messageQueue.Start()
s.offsetTracker.Reset()
s.flowControl.Reset(flowControlTokens{
Bytes: int64(s.settings.MaxOutstandingBytes),
Messages: int64(s.settings.MaxOutstandingMessages),
})

case streamTerminated:
s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error())
}
Expand All @@ -270,8 +293,6 @@ func (s *subscribeStream) onResponse(response interface{}) {
switch {
case subscribeResponse.GetMessages() != nil:
err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
case subscribeResponse.GetSeek() != nil:
err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
default:
err = errInvalidSubscribeResponse
}
Expand All @@ -280,14 +301,6 @@ func (s *subscribeStream) onResponse(response interface{}) {
}
}

func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
if !s.seekInFlight {
return errNoInFlightSeek
}
s.seekInFlight = false
return nil
}

func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) error {
if len(response.Messages) == 0 {
return errServerNoMessages
Expand Down Expand Up @@ -388,7 +401,7 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu
subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition}
acks := newAckTracker()
commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks)
sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks)
sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, commit.BlockingReset, f.disableTasks)
ps := &singlePartitionSubscriber{
subscriber: sub,
committer: commit,
Expand Down

0 comments on commit 1432e67

Please sign in to comment.