Skip to content

Commit

Permalink
fix(pubsublite): ackTracker should discard new acks after committer t…
Browse files Browse the repository at this point in the history
…erminates (#3827)

Prevent new acks from being added to ackTracker after the committer terminates, which can occur due to races, as they are added in subscribeStream.
  • Loading branch information
tmdiep committed Mar 18, 2021
1 parent cb43066 commit bc49753
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
13 changes: 11 additions & 2 deletions pubsublite/internal/wire/acks.go
Expand Up @@ -91,12 +91,15 @@ type ackTracker struct {
ackedPrefixOffset int64
// Outstanding message acks, strictly ordered by increasing message offsets.
outstandingAcks *list.List // Value = *ackConsumer
// Whether new acks can be pushed.
enablePush bool
}

func newAckTracker() *ackTracker {
return &ackTracker{
ackedPrefixOffset: nilCursorOffset,
outstandingAcks: list.New(),
enablePush: true,
}
}

Expand All @@ -105,6 +108,11 @@ func (at *ackTracker) Push(ack *ackConsumer) error {
at.mu.Lock()
defer at.mu.Unlock()

if !at.enablePush {
ack.Clear()
return nil
}

// These errors should not occur unless there is a bug in the client library
// as message ordering should have been validated by subscriberOffsetTracker.
if ack.Offset <= at.ackedPrefixOffset {
Expand Down Expand Up @@ -136,12 +144,13 @@ func (at *ackTracker) CommitOffset() int64 {
return at.ackedPrefixOffset + 1
}

// Release clears and invalidates any outstanding acks. This should be called
// when the subscriber terminates.
// Release clears and invalidates any outstanding acks. Push will clear and
// discard new acks. This should be called when the committer terminates.
func (at *ackTracker) Release() {
at.mu.Lock()
defer at.mu.Unlock()

at.enablePush = false
at.unsafeProcessAcks()

for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() {
Expand Down
10 changes: 10 additions & 0 deletions pubsublite/internal/wire/acks_test.go
Expand Up @@ -117,6 +117,7 @@ func TestAckTrackerRelease(t *testing.T) {
ack1 := newAckConsumer(1, 0, emptyAckConsumer)
ack2 := newAckConsumer(2, 0, onAckAfterRelease)
ack3 := newAckConsumer(3, 0, onAckAfterRelease)
ack4 := newAckConsumer(4, 0, onAckAfterRelease)

if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
Expand All @@ -136,6 +137,15 @@ func TestAckTrackerRelease(t *testing.T) {
ack2.Ack()
ack3.Ack()

// New acks should be cleared and discarded.
if err := ackTracker.Push(ack4); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if got, want := ackTracker.Empty(), true; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
ack4.Ack()

if got, want := ackTracker.CommitOffset(), int64(2); got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
Expand Down

0 comments on commit bc49753

Please sign in to comment.