From 0ecd732e3f57928e7999ae4e78871be070c184d9 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 27 May 2021 09:55:13 +1000 Subject: [PATCH] feat(pubsublite): flush and reset committer (#4143) - Adds ackTracker.Reset() and commitCursorTracker.Reset() to reset these trackers to their initial state. - Adds committer.BlockingReset(), which flushes pending commits to the server and then resets the state of the committer. --- pubsublite/internal/test/condition.go | 62 +++++++ pubsublite/internal/wire/acks.go | 26 ++- pubsublite/internal/wire/acks_test.go | 188 +++++++++++++++++++-- pubsublite/internal/wire/committer.go | 35 +++- pubsublite/internal/wire/committer_test.go | 160 ++++++++++++++++++ 5 files changed, 452 insertions(+), 19 deletions(-) create mode 100644 pubsublite/internal/test/condition.go diff --git a/pubsublite/internal/test/condition.go b/pubsublite/internal/test/condition.go new file mode 100644 index 00000000000..4dd521ebec3 --- /dev/null +++ b/pubsublite/internal/test/condition.go @@ -0,0 +1,62 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package test + +import ( + "testing" + "time" +) + +// Condition allows tests to wait for some event to occur, or check that it has +// not occurred. +type Condition struct { + name string + done chan struct{} +} + +// NewCondition creates a new condition. +func NewCondition(name string) *Condition { + return &Condition{ + name: name, + done: make(chan struct{}), + } +} + +// SetDone marks the condition as done. +func (c *Condition) SetDone() { + close(c.done) +} + +// WaitUntilDone waits up to the specified duration for the condition to be +// marked done. +func (c *Condition) WaitUntilDone(t *testing.T, duration time.Duration) { + t.Helper() + + select { + case <-time.After(duration): + t.Errorf("Condition(%q): timed out after waiting %v", c.name, duration) + case <-c.done: + } +} + +// VerifyNotDone checks that the condition is not done. +func (c *Condition) VerifyNotDone(t *testing.T) { + t.Helper() + + select { + case <-c.done: + t.Errorf("Condition(%q): is done, expected not done", c.name) + default: + } +} diff --git a/pubsublite/internal/wire/acks.go b/pubsublite/internal/wire/acks.go index 99647243ed8..dcc18138df2 100644 --- a/pubsublite/internal/wire/acks.go +++ b/pubsublite/internal/wire/acks.go @@ -152,7 +152,21 @@ func (at *ackTracker) Release() { at.enablePush = false at.unsafeProcessAcks() + at.unsafeClearAcks() +} +// Reset the state of the tracker. Clears and invalidates any outstanding acks. +func (at *ackTracker) Reset() { + at.mu.Lock() + defer at.mu.Unlock() + + at.unsafeClearAcks() + at.ackedPrefixOffset = nilCursorOffset + at.enablePush = true +} + +// Clears and invalidates any outstanding acks. +func (at *ackTracker) unsafeClearAcks() { for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() { ack, _ := elem.Value.(*ackConsumer) ack.Clear() @@ -213,6 +227,13 @@ func extractOffsetFromElem(elem *list.Element) int64 { return offset } +// Reset the state of the tracker. +func (ct *commitCursorTracker) Reset() { + ct.acks.Reset() + ct.lastConfirmedOffset = nilCursorOffset + ct.pendingOffsets.Init() +} + // NextOffset is the commit offset to be sent to the stream. Returns // nilCursorOffset if the commit offset does not need to be updated. func (ct *commitCursorTracker) NextOffset() int64 { @@ -256,7 +277,8 @@ func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error { return nil } -// UpToDate when the server has confirmed the desired commit offset. +// UpToDate when the server has confirmed the desired commit offset and there +// are no pending acks. func (ct *commitCursorTracker) UpToDate() bool { - return ct.acks.CommitOffset() <= ct.lastConfirmedOffset + return ct.acks.CommitOffset() <= ct.lastConfirmedOffset && ct.acks.Empty() } diff --git a/pubsublite/internal/wire/acks_test.go b/pubsublite/internal/wire/acks_test.go index 4e94a5b4958..623127962ff 100644 --- a/pubsublite/internal/wire/acks_test.go +++ b/pubsublite/internal/wire/acks_test.go @@ -13,7 +13,11 @@ package wire -import "testing" +import ( + "testing" + + "cloud.google.com/go/pubsublite/internal/test" +) func emptyAckConsumer(_ *ackConsumer) { // Nothing to do. @@ -85,18 +89,27 @@ func TestAckTrackerProcessing(t *testing.T) { if got, want := ackTracker.CommitOffset(), int64(2); got != want { t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) } + if got, want := ackTracker.Empty(), false; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } // Skipped ack2, so the commit offset should not have been updated. ack3.Ack() if got, want := ackTracker.CommitOffset(), int64(2); got != want { t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) } + if got, want := ackTracker.Empty(), false; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } // Both ack2 and ack3 should be removed from the outstanding acks queue. ack2.Ack() if got, want := ackTracker.CommitOffset(), int64(4); got != want { t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) } + if got, want := ackTracker.Empty(), true; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } // Newly received message. ack4 := newAckConsumer(4, 0, emptyAckConsumer) @@ -104,9 +117,15 @@ func TestAckTrackerProcessing(t *testing.T) { t.Errorf("ackTracker.Push() got err %v", err) } ack4.Ack() + if got, want := ackTracker.Empty(), false; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } if got, want := ackTracker.CommitOffset(), int64(5); got != want { t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) } + if got, want := ackTracker.Empty(), true; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } } func TestAckTrackerRelease(t *testing.T) { @@ -136,16 +155,79 @@ func TestAckTrackerRelease(t *testing.T) { ackTracker.Release() ack2.Ack() ack3.Ack() + if got, want := ackTracker.Empty(), true; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } // New acks should be cleared and discarded. if err := ackTracker.Push(ack4); err != nil { t.Errorf("ackTracker.Push() got err %v", err) } + ack4.Ack() + if got, want := ackTracker.Empty(), true; got != want { t.Errorf("ackTracker.Empty() got %v, want %v", got, want) } - ack4.Ack() + if got, want := ackTracker.CommitOffset(), int64(2); got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } +} + +func TestAckTrackerReset(t *testing.T) { + ackTracker := newAckTracker() + onAckAfterReset := func(ac *ackConsumer) { + t.Error("onAck should not be called") + } + ack1 := newAckConsumer(1, 0, emptyAckConsumer) + ack2 := newAckConsumer(2, 0, emptyAckConsumer) + ack3 := newAckConsumer(3, 0, onAckAfterReset) + + if err := ackTracker.Push(ack1); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack2); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack3); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + + // Ack tracker should not allow duplicate msg1. + if got, want := ackTracker.Push(ack1), errOutOfOrderMessages; !test.ErrorEqual(got, want) { + t.Errorf("ackTracker.Push() got err %v, want err %v", got, want) + } + + // Ack 2 messages to advance the commit offset. + ack1.Ack() + ack2.Ack() + if got, want := ackTracker.CommitOffset(), int64(3); got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + if got, want := ackTracker.Empty(), false; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } + // Reset should clear the outstanding acks and reset the desired commit + // offset. + ackTracker.Reset() + // Outstanding ack3 should be invalidated. + ack3.Ack() + if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + if got, want := ackTracker.Empty(), true; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } + + // After reset, msg1 should be accepted and processed. + ack1 = newAckConsumer(1, 0, emptyAckConsumer) + if err := ackTracker.Push(ack1); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if got, want := ackTracker.Empty(), false; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } + ack1.Ack() if got, want := ackTracker.CommitOffset(), int64(2); got != want { t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) } @@ -159,6 +241,9 @@ func TestCommitCursorTrackerProcessing(t *testing.T) { if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) } + if got, want := commitTracker.UpToDate(), true; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } ack1 := newAckConsumer(1, 0, emptyAckConsumer) ack2 := newAckConsumer(2, 0, emptyAckConsumer) @@ -177,6 +262,9 @@ func TestCommitCursorTrackerProcessing(t *testing.T) { if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) } + if got, want := commitTracker.UpToDate(), false; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } // Msg1 acked and commit sent to stream. ack1.Ack() @@ -187,6 +275,9 @@ func TestCommitCursorTrackerProcessing(t *testing.T) { if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) } + if got, want := commitTracker.UpToDate(), false; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } // Msg 2 & 3 acked commit and sent to stream. ack2.Ack() @@ -220,6 +311,70 @@ func TestCommitCursorTrackerProcessing(t *testing.T) { } } +func TestCommitCursorTrackerReset(t *testing.T) { + ackTracker := newAckTracker() + commitTracker := newCommitCursorTracker(ackTracker) + + onAckAfterReset := func(ac *ackConsumer) { + t.Error("onAck should not be called") + } + ack1 := newAckConsumer(1, 0, emptyAckConsumer) + ack2 := newAckConsumer(2, 0, emptyAckConsumer) + ack3 := newAckConsumer(3, 0, onAckAfterReset) + if err := ackTracker.Push(ack1); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack2); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack3); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + + // Ack and commit 2 messages. + ack1.Ack() + ack2.Ack() + if got, want := commitTracker.NextOffset(), int64(3); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + commitTracker.AddPending(commitTracker.NextOffset()) + if err := commitTracker.ConfirmOffsets(1); err != nil { + t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err) + } + + // Reset should clear the ack tracker and commit tracker to their initial + // states. + commitTracker.Reset() + // Outstanding ack3 should be invalidated. + ack3.Ack() + if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + if got, want := ackTracker.Empty(), true; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + if got, want := commitTracker.UpToDate(), true; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } + + // After reset, msg1 should be accepted and processed. + ack1 = newAckConsumer(1, 0, emptyAckConsumer) + if err := ackTracker.Push(ack1); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + ack1.Ack() + if got, want := commitTracker.NextOffset(), int64(2); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + commitTracker.AddPending(commitTracker.NextOffset()) + if err := commitTracker.ConfirmOffsets(1); err != nil { + t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err) + } +} + func TestCommitCursorTrackerStreamReconnects(t *testing.T) { ackTracker := newAckTracker() commitTracker := newCommitCursorTracker(ackTracker) @@ -241,6 +396,9 @@ func TestCommitCursorTrackerStreamReconnects(t *testing.T) { if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) } + if got, want := commitTracker.UpToDate(), false; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } // Msg1 acked and commit sent to stream. ack1.Ack() @@ -251,6 +409,9 @@ func TestCommitCursorTrackerStreamReconnects(t *testing.T) { if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) } + if got, want := commitTracker.UpToDate(), false; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } // Msg2 acked and commit sent to stream. ack2.Ack() @@ -261,6 +422,9 @@ func TestCommitCursorTrackerStreamReconnects(t *testing.T) { if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) } + if got, want := commitTracker.UpToDate(), false; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } // Stream breaks and pending offsets are cleared. commitTracker.ClearPending() @@ -275,8 +439,11 @@ func TestCommitCursorTrackerStreamReconnects(t *testing.T) { if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) } + if got, want := commitTracker.UpToDate(), false; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } - // Msg2 acked and commit sent to stream. + // Msg3 acked and commit sent to stream. ack3.Ack() if got, want := commitTracker.NextOffset(), int64(4); got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) @@ -285,6 +452,9 @@ func TestCommitCursorTrackerStreamReconnects(t *testing.T) { if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) } + if got, want := commitTracker.UpToDate(), false; got != want { + t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) + } // Only 1 pending commit confirmed. if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want { @@ -310,16 +480,4 @@ func TestCommitCursorTrackerStreamReconnects(t *testing.T) { if got, want := commitTracker.UpToDate(), true; got != want { t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) } - - // Note: UpToDate() returns true even though there are unacked messages. - ack4 := newAckConsumer(4, 0, emptyAckConsumer) - if err := ackTracker.Push(ack4); err != nil { - t.Errorf("ackTracker.Push() got err %v", err) - } - if got, want := commitTracker.UpToDate(), true; got != want { - t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) - } - if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { - t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) - } } diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go index c447dad6f46..5344e45edff 100644 --- a/pubsublite/internal/wire/committer.go +++ b/pubsublite/internal/wire/committer.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "reflect" + "sync" "time" "google.golang.org/grpc" @@ -46,11 +47,12 @@ type committer struct { initialReq *pb.StreamingCommitCursorRequest metadata pubsubMetadata - // Fields below must be guarded with mutex. + // Fields below must be guarded with mu. stream *retryableStream acks *ackTracker cursorTracker *commitCursorTracker pollCommits *periodicTask + flushPending *sync.Cond enableCommits bool abstractService @@ -82,6 +84,7 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei backgroundTask = func() {} } c.pollCommits = newPeriodicTask(commitCursorPeriod, backgroundTask) + c.flushPending = sync.NewCond(&c.mu) return c } @@ -114,6 +117,23 @@ func (c *committer) Terminate() { c.unsafeInitiateShutdown(serviceTerminating, nil) } +// BlockingReset flushes any pending commits to the server, waits until the +// server has confirmed the commit, and resets the state of the committer. +func (c *committer) BlockingReset() error { + c.mu.Lock() + defer c.mu.Unlock() + + for !c.cursorTracker.UpToDate() && c.status < serviceTerminating { + c.unsafeCommitOffsetToStream() + c.flushPending.Wait() + } + if c.status >= serviceTerminating { + return ErrServiceStopped + } + c.cursorTracker.Reset() + return nil +} + func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) { return c.cursorClient.StreamingCommitCursor(c.metadata.AddToContext(ctx)) } @@ -215,6 +235,9 @@ func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error return } + // Notify any waiting threads of the termination. + c.flushPending.Broadcast() + // If it's a graceful shutdown, expedite sending final commits to the stream. if targetStatus == serviceTerminating { c.unsafeCommitOffsetToStream() @@ -227,10 +250,18 @@ func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error c.unsafeOnTerminated() } +// Performs actions when the cursor tracker is up to date. func (c *committer) unsafeCheckDone() { + if !c.cursorTracker.UpToDate() { + return + } + + // Notify any waiting threads that flushing pending commits is complete. + c.flushPending.Broadcast() + // The commit stream can be closed once the final commit offset has been // confirmed and there are no outstanding acks. - if c.status == serviceTerminating && c.cursorTracker.UpToDate() && c.acks.Empty() { + if c.status == serviceTerminating { c.unsafeOnTerminated() } } diff --git a/pubsublite/internal/wire/committer_test.go b/pubsublite/internal/wire/committer_test.go index 7fe864b9820..304b676626d 100644 --- a/pubsublite/internal/wire/committer_test.go +++ b/pubsublite/internal/wire/committer_test.go @@ -52,6 +52,10 @@ func (tc *testCommitter) Terminate() { tc.cmt.Terminate() } +func (tc *testCommitter) BlockingReset() error { + return tc.cmt.BlockingReset() +} + func TestCommitterStreamReconnect(t *testing.T) { subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0} ack1 := newAckConsumer(33, 0, nil) @@ -312,3 +316,159 @@ func TestCommitterZeroConfirmedOffsets(t *testing.T) { t.Errorf("Final err: (%v), want msg: (%v)", gotErr, wantMsg) } } + +func TestCommitterBlockingResetNormalCompletion(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0} + ack1 := newAckConsumer(33, 0, nil) + ack2 := newAckConsumer(55, 0, nil) + acks := newAckTracker() + acks.Push(ack1) + acks.Push(ack2) + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initCommitReq(subscription), initCommitResp(), nil) + stream.Push(commitReq(34), commitResp(1), nil) + barrier := stream.PushWithBarrier(commitReq(56), commitResp(1), nil) + verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + cmt := newTestCommitter(t, subscription, acks) + if gotErr := cmt.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + complete := test.NewCondition("blocking reset complete") + go func() { + if err := cmt.BlockingReset(); err != nil { + t.Errorf("BlockingReset() got err: (%v), want: ", err) + } + cmt.BlockingReset() + complete.SetDone() + }() + complete.VerifyNotDone(t) + + ack1.Ack() + cmt.SendBatchCommit() + complete.VerifyNotDone(t) + ack2.Ack() + cmt.SendBatchCommit() + + // Until the final commit response is received, committer.BlockingReset should + // not return. + barrier.ReleaseAfter(func() { + complete.VerifyNotDone(t) + }) + complete.WaitUntilDone(t, serviceTestWaitTimeout) + + // Ack tracker should be reset. + if got, want := acks.CommitOffset(), nilCursorOffset; got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + if got, want := acks.Empty(), true; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } + + // Calling committer.BlockingReset again should immediately return. + if err := cmt.BlockingReset(); err != nil { + t.Errorf("BlockingReset() got err: (%v), want: ", err) + } + + cmt.StopVerifyNoError() +} + +func TestCommitterBlockingResetCommitterStopped(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0} + ack1 := newAckConsumer(33, 0, nil) + ack2 := newAckConsumer(55, 0, nil) + acks := newAckTracker() + acks.Push(ack1) + acks.Push(ack2) + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initCommitReq(subscription), initCommitResp(), nil) + stream.Push(commitReq(34), commitResp(1), nil) + verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + cmt := newTestCommitter(t, subscription, acks) + if gotErr := cmt.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + complete := test.NewCondition("blocking reset complete") + go func() { + if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) { + t.Errorf("BlockingReset() got: (%v), want: (%v)", got, want) + } + complete.SetDone() + }() + complete.VerifyNotDone(t) + + ack1.Ack() + cmt.SendBatchCommit() + complete.VerifyNotDone(t) + + // committer.BlockingReset should return when the committer is stopped. + cmt.Stop() + complete.WaitUntilDone(t, serviceTestWaitTimeout) + + // Ack tracker should not be reset. + if got, want := acks.Empty(), false; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } + + cmt.Terminate() + if gotErr := cmt.FinalError(); gotErr != nil { + t.Errorf("Final err: (%v), want: ", gotErr) + } +} + +func TestCommitterBlockingResetFatalError(t *testing.T) { + subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0} + ack1 := newAckConsumer(33, 0, nil) + ack2 := newAckConsumer(55, 0, nil) + acks := newAckTracker() + acks.Push(ack1) + acks.Push(ack2) + serverErr := status.Error(codes.FailedPrecondition, "failed") + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initCommitReq(subscription), initCommitResp(), nil) + stream.Push(commitReq(34), nil, serverErr) + verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + cmt := newTestCommitter(t, subscription, acks) + if gotErr := cmt.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + complete := test.NewCondition("blocking reset complete") + go func() { + if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) { + t.Errorf("BlockingReset() got: (%v), want: (%v)", got, want) + } + complete.SetDone() + }() + complete.VerifyNotDone(t) + + ack1.Ack() + cmt.SendBatchCommit() + + // committer.BlockingReset should return when the committer terminates due to + // fatal server error. + complete.WaitUntilDone(t, serviceTestWaitTimeout) + + if gotErr := cmt.FinalError(); !test.ErrorEqual(gotErr, serverErr) { + t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr) + } +}