Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): support out of band seeks #4208

Merged
merged 4 commits into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pubsublite/internal/wire/committer.go
Expand Up @@ -123,6 +123,7 @@ func (c *committer) BlockingReset() error {
c.mu.Lock()
defer c.mu.Unlock()

c.acks.Release() // Discard outstanding acks
for !c.cursorTracker.UpToDate() && c.status < serviceTerminating {
c.unsafeCommitOffsetToStream()
c.flushPending.Wait()
Expand Down
43 changes: 17 additions & 26 deletions pubsublite/internal/wire/committer_test.go
Expand Up @@ -328,8 +328,7 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(56), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
Expand All @@ -340,6 +339,8 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if err := cmt.BlockingReset(); err != nil {
Expand All @@ -350,14 +351,8 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()
complete.VerifyNotDone(t)
ack2.Ack()
cmt.SendBatchCommit()

// Until the final commit response is received, committer.BlockingReset should
// not return.
// Until the commit response is received, committer.BlockingReset should not
// return.
barrier.ReleaseAfter(func() {
complete.VerifyNotDone(t)
})
Expand All @@ -371,6 +366,9 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}

// This ack should have been discarded.
ack2.Ack()

// Calling committer.BlockingReset again should immediately return.
if err := cmt.BlockingReset(); err != nil {
t.Errorf("BlockingReset() got err: (%v), want: <nil>", err)
Expand All @@ -390,7 +388,7 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
Expand All @@ -401,6 +399,8 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
Expand All @@ -410,18 +410,11 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()
complete.VerifyNotDone(t)

// committer.BlockingReset should return when the committer is stopped.
cmt.Stop()
complete.WaitUntilDone(t, serviceTestWaitTimeout)

// Ack tracker should not be reset.
if got, want := acks.Empty(), false; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
barrier.ReleaseAfter(func() {
cmt.Stop()
complete.WaitUntilDone(t, serviceTestWaitTimeout)
})

cmt.Terminate()
if gotErr := cmt.FinalError(); gotErr != nil {
Expand Down Expand Up @@ -452,17 +445,15 @@ func TestCommitterBlockingResetFatalError(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
t.Errorf("BlockingReset() got: (%v), want: (%v)", got, want)
}
complete.SetDone()
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()

// committer.BlockingReset should return when the committer terminates due to
// fatal server error.
Expand Down
9 changes: 6 additions & 3 deletions pubsublite/internal/wire/flow_control.go
Expand Up @@ -160,11 +160,14 @@ func (ot *subscriberOffsetTracker) Reset() {
}

// RequestForRestart returns the seek request to send when a new subscribe
// stream reconnects. Returns nil if the subscriber has just started, in which
// case the server returns the offset of the last committed cursor.
// stream reconnects.
func (ot *subscriberOffsetTracker) RequestForRestart() *pb.SeekRequest {
if ot.minNextOffset <= 0 {
return nil
return &pb.SeekRequest{
Target: &pb.SeekRequest_NamedTarget_{
NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
},
}
}
return &pb.SeekRequest{
Target: &pb.SeekRequest_Cursor{
Expand Down
6 changes: 5 additions & 1 deletion pubsublite/internal/wire/flow_control_test.go
Expand Up @@ -253,7 +253,11 @@ func TestOffsetTrackerRequestForRestart(t *testing.T) {
{
desc: "Uninitialized tracker",
tracker: subscriberOffsetTracker{},
want: nil,
want: &pb.SeekRequest{
Target: &pb.SeekRequest_NamedTarget_{
NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
},
},
},
{
desc: "Next offset positive",
Expand Down
35 changes: 22 additions & 13 deletions pubsublite/internal/wire/requests_test.go
Expand Up @@ -134,33 +134,42 @@ func msgPubResp(cursor int64) *pb.PublishResponse {

// SubscriberService

func initSubReq(subscription subscriptionPartition) *pb.SubscribeRequest {
func initSubReqCommit(subscription subscriptionPartition) *pb.SubscribeRequest {
return &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
InitialLocation: &pb.SeekRequest{
Target: &pb.SeekRequest_NamedTarget_{
NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
},
},
},
},
}
}

func initSubResp() *pb.SubscribeResponse {
return &pb.SubscribeResponse{
Response: &pb.SubscribeResponse_Initial{
Initial: &pb.InitialSubscribeResponse{},
func initSubReqCursor(subscription subscriptionPartition, offset int64) *pb.SubscribeRequest {
return &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
InitialLocation: &pb.SeekRequest{
Target: &pb.SeekRequest_Cursor{
Cursor: &pb.Cursor{Offset: offset},
},
},
},
},
}
}

func seekReq(offset int64) *pb.SubscribeRequest {
return &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Seek{
Seek: &pb.SeekRequest{
Target: &pb.SeekRequest_Cursor{
Cursor: &pb.Cursor{Offset: offset},
},
},
func initSubResp() *pb.SubscribeResponse {
return &pb.SubscribeResponse{
Response: &pb.SubscribeResponse_Initial{
Initial: &pb.InitialSubscribeResponse{},
},
}
}
Expand Down
85 changes: 49 additions & 36 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -31,8 +31,7 @@ import (
var (
errServerNoMessages = errors.New("pubsublite: server delivered no messages")
errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe")
errInvalidSubscribeResponse = errors.New("pubsublite: received invalid subscribe response from server")
errNoInFlightSeek = errors.New("pubsublite: received seek response for no in-flight seek")
errInvalidSubscribeResponse = errors.New("pubsublite: received unexpected subscribe response from server")
)

// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
Expand Down Expand Up @@ -126,6 +125,10 @@ func (mq *messageDeliveryQueue) deliverMessages(messagesC chan *ReceivedMessage,
// The frequency of sending batch flow control requests.
const batchFlowControlPeriod = 100 * time.Millisecond

// Handles subscriber reset actions that are external to the subscribeStream
// (e.g. wait for the committer to flush commits).
type subscriberResetHandler func() error

// subscribeStream directly wraps the subscribe client stream. It passes
// messages to the message receiver and manages flow control. Flow control
// tokens are batched and sent to the stream via a periodic background task,
Expand All @@ -137,7 +140,7 @@ type subscribeStream struct {
subClient *vkit.SubscriberClient
settings ReceiveSettings
subscription subscriptionPartition
initialReq *pb.SubscribeRequest
handleReset subscriberResetHandler
metadata pubsubMetadata

// Fields below must be guarded with mu.
Expand All @@ -146,27 +149,20 @@ type subscribeStream struct {
offsetTracker subscriberOffsetTracker
flowControl flowControlBatcher
pollFlowControl *periodicTask
seekInFlight bool
enableBatchFlowControl bool

abstractService
}

func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings,
receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream {
receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker,
handleReset subscriberResetHandler, disableTasks bool) *subscribeStream {

s := &subscribeStream{
subClient: subClient,
settings: settings,
subscription: subscription,
initialReq: &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
},
},
},
handleReset: handleReset,
messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages),
metadata: newPubsubMetadata(),
}
Expand Down Expand Up @@ -212,7 +208,18 @@ func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, err
}

func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) {
return s.initialReq, initialResponseRequired(true)
s.mu.Lock()
defer s.mu.Unlock()
initReq := &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: s.subscription.Path,
Partition: int64(s.subscription.Partition),
InitialLocation: s.offsetTracker.RequestForRestart(),
},
},
}
return initReq, initialResponseRequired(true)
}

func (s *subscribeStream) validateInitialResponse(response interface{}) error {
Expand All @@ -231,27 +238,43 @@ func (s *subscribeStream) onStreamStatusChange(status streamStatus) {
case streamConnected:
s.unsafeUpdateStatus(serviceActive, nil)

// Reinitialize the offset and flow control tokens when a new subscribe
// stream instance is connected.
if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
if !s.stream.Send(&pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
}) {
return
}
s.seekInFlight = true
}
// Reinitialize the flow control tokens when a new subscribe stream instance
// is connected.
s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
s.enableBatchFlowControl = true
s.pollFlowControl.Start()

case streamReconnecting:
s.seekInFlight = false
// Ensure no batch flow control tokens are sent until the RequestForRestart
// is sent above when a new subscribe stream is initialized.
s.enableBatchFlowControl = false
s.pollFlowControl.Stop()

case streamResetState:
// Handle out-of-band seek notifications from the server. Committer and
// subscriber state are reset.

s.messageQueue.Stop()

// Wait for all message receiver callbacks to finish and the committer to
// flush pending commits and reset its state. Release the mutex while
// waiting.
s.mu.Unlock()
s.messageQueue.Wait()
err := s.handleReset()
s.mu.Lock()

if err != nil {
s.unsafeInitiateShutdown(serviceTerminating, nil)
return
}
s.messageQueue.Start()
s.offsetTracker.Reset()
s.flowControl.Reset(flowControlTokens{
Bytes: int64(s.settings.MaxOutstandingBytes),
Messages: int64(s.settings.MaxOutstandingMessages),
})

case streamTerminated:
s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error())
}
Expand All @@ -270,8 +293,6 @@ func (s *subscribeStream) onResponse(response interface{}) {
switch {
case subscribeResponse.GetMessages() != nil:
err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
case subscribeResponse.GetSeek() != nil:
err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
default:
err = errInvalidSubscribeResponse
}
Expand All @@ -280,14 +301,6 @@ func (s *subscribeStream) onResponse(response interface{}) {
}
}

func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
if !s.seekInFlight {
return errNoInFlightSeek
}
s.seekInFlight = false
return nil
}

func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) error {
if len(response.Messages) == 0 {
return errServerNoMessages
Expand Down Expand Up @@ -388,7 +401,7 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu
subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition}
acks := newAckTracker()
commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks)
sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks)
sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, commit.BlockingReset, f.disableTasks)
ps := &singlePartitionSubscriber{
subscriber: sub,
committer: commit,
Expand Down