Skip to content

Commit

Permalink
Discard outstanding acks
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Jun 4, 2021
1 parent c3abece commit afd0868
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 30 deletions.
1 change: 1 addition & 0 deletions pubsublite/internal/wire/committer.go
Expand Up @@ -123,6 +123,7 @@ func (c *committer) BlockingReset() error {
c.mu.Lock()
defer c.mu.Unlock()

c.acks.Release() // Discard outstanding acks
for !c.cursorTracker.UpToDate() && c.status < serviceTerminating {
c.unsafeCommitOffsetToStream()
c.flushPending.Wait()
Expand Down
43 changes: 17 additions & 26 deletions pubsublite/internal/wire/committer_test.go
Expand Up @@ -328,8 +328,7 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(56), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
Expand All @@ -340,6 +339,8 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if err := cmt.BlockingReset(); err != nil {
Expand All @@ -350,14 +351,8 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()
complete.VerifyNotDone(t)
ack2.Ack()
cmt.SendBatchCommit()

// Until the final commit response is received, committer.BlockingReset should
// not return.
// Until the commit response is received, committer.BlockingReset should not
// return.
barrier.ReleaseAfter(func() {
complete.VerifyNotDone(t)
})
Expand All @@ -371,6 +366,9 @@ func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}

// This ack should have been discarded.
ack2.Ack()

// Calling committer.BlockingReset again should immediately return.
if err := cmt.BlockingReset(); err != nil {
t.Errorf("BlockingReset() got err: (%v), want: <nil>", err)
Expand All @@ -390,7 +388,7 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
Expand All @@ -401,6 +399,8 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
Expand All @@ -410,18 +410,11 @@ func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()
complete.VerifyNotDone(t)

// committer.BlockingReset should return when the committer is stopped.
cmt.Stop()
complete.WaitUntilDone(t, serviceTestWaitTimeout)

// Ack tracker should not be reset.
if got, want := acks.Empty(), false; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
barrier.ReleaseAfter(func() {
cmt.Stop()
complete.WaitUntilDone(t, serviceTestWaitTimeout)
})

cmt.Terminate()
if gotErr := cmt.FinalError(); gotErr != nil {
Expand Down Expand Up @@ -452,17 +445,15 @@ func TestCommitterBlockingResetFatalError(t *testing.T) {
t.Errorf("Start() got err: (%v)", gotErr)
}

ack1.Ack()

complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
t.Errorf("BlockingReset() got: (%v), want: (%v)", got, want)
}
complete.SetDone()
}()
complete.VerifyNotDone(t)

ack1.Ack()
cmt.SendBatchCommit()

// committer.BlockingReset should return when the committer terminates due to
// fatal server error.
Expand Down
13 changes: 9 additions & 4 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -784,12 +784,13 @@ func TestSinglePartitionSubscriberAdminSeekWhileConnected(t *testing.T) {
receiver := newTestMessageReceiver(t)
msg1 := seqMsgWithOffsetAndSize(1, 100)
msg2 := seqMsgWithOffsetAndSize(2, 100)
msg3 := seqMsgWithOffsetAndSize(3, 100)

verifiers := test.NewVerifiers(t)

subStream1 := test.NewRPCVerifier(t)
subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2, msg3), nil)
// Server disconnects the stream with the RESET signal.
barrier := subStream1.PushWithBarrier(nil, nil, makeStreamResetSignal())
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)
Expand All @@ -804,7 +805,7 @@ func TestSinglePartitionSubscriberAdminSeekWhileConnected(t *testing.T) {

cmtStream := test.NewRPCVerifier(t)
cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
cmtStream.Push(commitReq(3), commitResp(1), nil)
cmtStream.Push(commitReq(4), commitResp(1), nil)
cmtStream.Push(commitReq(2), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)

Expand All @@ -818,6 +819,7 @@ func TestSinglePartitionSubscriberAdminSeekWhileConnected(t *testing.T) {

receiver.ValidateMsg(msg1).Ack()
receiver.ValidateMsg(msg2).Ack()
receiver.ValidateMsg(msg3).Ack()
barrier.Release()
receiver.ValidateMsg(msg1).Ack()

Expand All @@ -832,19 +834,20 @@ func TestSinglePartitionSubscriberAdminSeekWhileReconnecting(t *testing.T) {
receiver := newTestMessageReceiver(t)
msg1 := seqMsgWithOffsetAndSize(1, 100)
msg2 := seqMsgWithOffsetAndSize(2, 100)
msg3 := seqMsgWithOffsetAndSize(3, 100)

verifiers := test.NewVerifiers(t)

subStream1 := test.NewRPCVerifier(t)
subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2, msg3), nil)
// Normal stream breakage.
barrier := subStream1.PushWithBarrier(nil, nil, status.Error(codes.DeadlineExceeded, ""))
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)

subStream2 := test.NewRPCVerifier(t)
// The server sends the RESET signal during stream initialization.
subStream2.Push(initSubReqCursor(subscription, 3), nil, makeStreamResetSignal())
subStream2.Push(initSubReqCursor(subscription, 4), nil, makeStreamResetSignal())
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2)

subStream3 := test.NewRPCVerifier(t)
Expand All @@ -871,8 +874,10 @@ func TestSinglePartitionSubscriberAdminSeekWhileReconnecting(t *testing.T) {

receiver.ValidateMsg(msg1).Ack()
receiver.ValidateMsg(msg2).Ack()
ack := receiver.ValidateMsg(msg3) // Unacked message discarded
barrier.Release()
receiver.ValidateMsg(msg1).Ack()
ack.Ack() // Should be ignored

sub.Stop()
if gotErr := sub.WaitStopped(); gotErr != nil {
Expand Down

0 comments on commit afd0868

Please sign in to comment.