Skip to content

Commit

Permalink
fix(pubsublite): fix committer races (#3810)
Browse files Browse the repository at this point in the history
Fixed 2 races in the committer and simplified TestAssigningSubscriberIgnoreOutstandingAcks.
  • Loading branch information
tmdiep committed Mar 16, 2021
1 parent 4188b73 commit d8689f1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 28 deletions.
8 changes: 8 additions & 0 deletions pubsublite/internal/wire/committer.go
Expand Up @@ -51,6 +51,7 @@ type committer struct {
acks *ackTracker
cursorTracker *commitCursorTracker
pollCommits *periodicTask
enableCommits bool

abstractService
}
Expand Down Expand Up @@ -135,6 +136,7 @@ func (c *committer) onStreamStatusChange(status streamStatus) {

switch status {
case streamConnected:
c.enableCommits = true
c.unsafeUpdateStatus(serviceActive, nil)
// Once the stream connects, clear unconfirmed commits and immediately send
// the latest desired commit offset.
Expand All @@ -143,6 +145,8 @@ func (c *committer) onStreamStatusChange(status streamStatus) {
c.pollCommits.Start()

case streamReconnecting:
// Ensure there are no commits until streamConnected has been handled above.
c.enableCommits = false
c.pollCommits.Stop()

case streamTerminated:
Expand Down Expand Up @@ -185,6 +189,9 @@ func (c *committer) commitOffsetToStream() {
}

func (c *committer) unsafeCommitOffsetToStream() {
if !c.enableCommits {
return
}
nextOffset := c.cursorTracker.NextOffset()
if nextOffset == nilCursorOffset {
return
Expand All @@ -204,6 +211,7 @@ func (c *committer) unsafeCommitOffsetToStream() {

func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !c.unsafeUpdateStatus(targetStatus, wrapError("committer", c.subscription.String(), err)) {
c.unsafeCheckDone()
return
}

Expand Down
27 changes: 27 additions & 0 deletions pubsublite/internal/wire/committer_test.go
Expand Up @@ -158,6 +158,33 @@ func TestCommitterTerminateDiscardsOutstandingAcks(t *testing.T) {
}
}

func TestCommitterStopThenTerminateDiscardsOutstandingAcks(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack := newAckConsumer(33, 0, nil)
acks := newAckTracker()
acks.Push(ack)

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
// No commits expected.
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}

cmt.Stop() // Stop waits for outstanding acks
cmt.Terminate() // Terminate should discard all outstanding acks
if gotErr := cmt.FinalError(); gotErr != nil {
t.Errorf("Final err: (%v), want: <nil>", gotErr)
}
}

func TestCommitterPermanentStreamError(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
acks := newAckTracker()
Expand Down
42 changes: 14 additions & 28 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -983,31 +983,20 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
// Assignment stream
asnStream := test.NewRPCVerifier(t)
asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil)
assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil)
assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{}), nil)
assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil)
verifiers.AddAssignmentStream(subscription, asnStream)

// Partition 1
subStream1 := test.NewRPCVerifier(t)
subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream1)

cmtStream1 := test.NewRPCVerifier(t)
commitBarrier := cmtStream1.PushWithBarrier(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
// Note: no commit expected.
verifiers.AddCommitStream(subscription, 1, cmtStream1)

// Partition 2
subStream2 := test.NewRPCVerifier(t)
subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription, 2, subStream2)
subStream := test.NewRPCVerifier(t)
subStream.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream)

cmtStream2 := test.NewRPCVerifier(t)
cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil)
cmtStream2.Push(commitReq(23), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 2, cmtStream2)
cmtStream := test.NewRPCVerifier(t)
cmtStream.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
cmtStream.Push(commitReq(12), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 1, cmtStream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
Expand All @@ -1018,18 +1007,15 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
}

// Partition assignments are initially {1}.
ack1 := receiver.ValidateMsg(msg1)
receiver.ValidateMsg(msg1).Ack()
ack2 := receiver.ValidateMsg(msg2)

// Partition assignments will now be {2}.
// Partition assignments will now be {}.
assignmentBarrier1.Release()
receiver.ValidateMsg(msg2).Ack()
assignmentBarrier2.Release() // Wait for ack to ensure the test is deterministic

// These barriers ensure that this test is deterministic by ensuring that the
// server has received expected requests before proceeding.
commitBarrier.Release()
assignmentBarrier2.Release()
// Partition 1 has already been unassigned, so this ack is discarded.
ack1.Ack()
ack2.Ack()

sub.Stop()
if gotErr := sub.WaitStopped(); gotErr != nil {
Expand Down

0 comments on commit d8689f1

Please sign in to comment.