Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(pubsublite): flush and reset committer (#4143)
- Adds ackTracker.Reset() and commitCursorTracker.Reset() to reset these trackers to their initial state.
- Adds committer.BlockingReset(), which flushes pending commits to the server and then resets the state of the committer.
  • Loading branch information
tmdiep committed May 26, 2021
1 parent ff5f8c9 commit 0ecd732
Show file tree
Hide file tree
Showing 5 changed files with 452 additions and 19 deletions.
62 changes: 62 additions & 0 deletions pubsublite/internal/test/condition.go
@@ -0,0 +1,62 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package test

import (
"testing"
"time"
)

// Condition allows tests to wait for some event to occur, or check that it has
// not occurred.
type Condition struct {
name string
done chan struct{}
}

// NewCondition creates a new condition.
func NewCondition(name string) *Condition {
return &Condition{
name: name,
done: make(chan struct{}),
}
}

// SetDone marks the condition as done.
func (c *Condition) SetDone() {
close(c.done)
}

// WaitUntilDone waits up to the specified duration for the condition to be
// marked done.
func (c *Condition) WaitUntilDone(t *testing.T, duration time.Duration) {
t.Helper()

select {
case <-time.After(duration):
t.Errorf("Condition(%q): timed out after waiting %v", c.name, duration)
case <-c.done:
}
}

// VerifyNotDone checks that the condition is not done.
func (c *Condition) VerifyNotDone(t *testing.T) {
t.Helper()

select {
case <-c.done:
t.Errorf("Condition(%q): is done, expected not done", c.name)
default:
}
}
26 changes: 24 additions & 2 deletions pubsublite/internal/wire/acks.go
Expand Up @@ -152,7 +152,21 @@ func (at *ackTracker) Release() {

at.enablePush = false
at.unsafeProcessAcks()
at.unsafeClearAcks()
}

// Reset the state of the tracker. Clears and invalidates any outstanding acks.
func (at *ackTracker) Reset() {
at.mu.Lock()
defer at.mu.Unlock()

at.unsafeClearAcks()
at.ackedPrefixOffset = nilCursorOffset
at.enablePush = true
}

// Clears and invalidates any outstanding acks.
func (at *ackTracker) unsafeClearAcks() {
for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() {
ack, _ := elem.Value.(*ackConsumer)
ack.Clear()
Expand Down Expand Up @@ -213,6 +227,13 @@ func extractOffsetFromElem(elem *list.Element) int64 {
return offset
}

// Reset the state of the tracker.
func (ct *commitCursorTracker) Reset() {
ct.acks.Reset()
ct.lastConfirmedOffset = nilCursorOffset
ct.pendingOffsets.Init()
}

// NextOffset is the commit offset to be sent to the stream. Returns
// nilCursorOffset if the commit offset does not need to be updated.
func (ct *commitCursorTracker) NextOffset() int64 {
Expand Down Expand Up @@ -256,7 +277,8 @@ func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error {
return nil
}

// UpToDate when the server has confirmed the desired commit offset.
// UpToDate when the server has confirmed the desired commit offset and there
// are no pending acks.
func (ct *commitCursorTracker) UpToDate() bool {
return ct.acks.CommitOffset() <= ct.lastConfirmedOffset
return ct.acks.CommitOffset() <= ct.lastConfirmedOffset && ct.acks.Empty()
}

0 comments on commit 0ecd732

Please sign in to comment.