diff --git a/pubsublite/internal/wire/acks.go b/pubsublite/internal/wire/acks.go index 2e55fd7a688..99647243ed8 100644 --- a/pubsublite/internal/wire/acks.go +++ b/pubsublite/internal/wire/acks.go @@ -91,12 +91,15 @@ type ackTracker struct { ackedPrefixOffset int64 // Outstanding message acks, strictly ordered by increasing message offsets. outstandingAcks *list.List // Value = *ackConsumer + // Whether new acks can be pushed. + enablePush bool } func newAckTracker() *ackTracker { return &ackTracker{ ackedPrefixOffset: nilCursorOffset, outstandingAcks: list.New(), + enablePush: true, } } @@ -105,6 +108,11 @@ func (at *ackTracker) Push(ack *ackConsumer) error { at.mu.Lock() defer at.mu.Unlock() + if !at.enablePush { + ack.Clear() + return nil + } + // These errors should not occur unless there is a bug in the client library // as message ordering should have been validated by subscriberOffsetTracker. if ack.Offset <= at.ackedPrefixOffset { @@ -136,12 +144,13 @@ func (at *ackTracker) CommitOffset() int64 { return at.ackedPrefixOffset + 1 } -// Release clears and invalidates any outstanding acks. This should be called -// when the subscriber terminates. +// Release clears and invalidates any outstanding acks. Push will clear and +// discard new acks. This should be called when the committer terminates. func (at *ackTracker) Release() { at.mu.Lock() defer at.mu.Unlock() + at.enablePush = false at.unsafeProcessAcks() for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() { diff --git a/pubsublite/internal/wire/acks_test.go b/pubsublite/internal/wire/acks_test.go index ea9f56bf342..4e94a5b4958 100644 --- a/pubsublite/internal/wire/acks_test.go +++ b/pubsublite/internal/wire/acks_test.go @@ -117,6 +117,7 @@ func TestAckTrackerRelease(t *testing.T) { ack1 := newAckConsumer(1, 0, emptyAckConsumer) ack2 := newAckConsumer(2, 0, onAckAfterRelease) ack3 := newAckConsumer(3, 0, onAckAfterRelease) + ack4 := newAckConsumer(4, 0, onAckAfterRelease) if err := ackTracker.Push(ack1); err != nil { t.Errorf("ackTracker.Push() got err %v", err) @@ -136,6 +137,15 @@ func TestAckTrackerRelease(t *testing.T) { ack2.Ack() ack3.Ack() + // New acks should be cleared and discarded. + if err := ackTracker.Push(ack4); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if got, want := ackTracker.Empty(), true; got != want { + t.Errorf("ackTracker.Empty() got %v, want %v", got, want) + } + ack4.Ack() + if got, want := ackTracker.CommitOffset(), int64(2); got != want { t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) }