Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(pubsublite): Trackers for acks and commit cursor (#3137)
These are similar to their Java counterparts with the following differences:
- ackConsumer is similar to AckReplyConsumer.
- ackTracker is equivalent to AckSetTrackerImpl. It maintains a single ordered queue of unacked offsets and stores the desired commit offset in ackedPrefixOffset.
- commitCursorTracker is equivalent to CommitState. It maintains the last commit offset confirmed by the server and a queue of unacknowledged committed offsets.
  • Loading branch information
tmdiep committed Nov 5, 2020
1 parent 8e3c306 commit 26599a0
Show file tree
Hide file tree
Showing 2 changed files with 551 additions and 0 deletions.
240 changes: 240 additions & 0 deletions pubsublite/internal/wire/acks.go
@@ -0,0 +1,240 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"container/list"
"fmt"
"sync"
)

// AckConsumer is the interface exported from this package for acking messages.
type AckConsumer interface {
Ack()
}

// ackedFunc is invoked when a message has been acked by the user. Note: if the
// ackedFunc implementation calls any ackConsumer methods, it needs to run in a
// goroutine to avoid a deadlock.
type ackedFunc func(*ackConsumer)

// ackConsumer is used for handling message acks. It is attached to a Message
// and also stored within the ackTracker until the message has been acked by the
// user.
type ackConsumer struct {
// The message offset.
Offset int64
// Bytes released to the flow controller once the message has been acked.
MsgBytes int64

// Guards access to fields below.
mu sync.Mutex
acked bool
onAck ackedFunc
}

func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer {
return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck}
}

func (ac *ackConsumer) Ack() {
ac.mu.Lock()
defer ac.mu.Unlock()

if ac.acked {
return
}
ac.acked = true
if ac.onAck != nil {
// Not invoked in a goroutine here for ease of testing.
ac.onAck(ac)
}
}

func (ac *ackConsumer) IsAcked() bool {
ac.mu.Lock()
defer ac.mu.Unlock()
return ac.acked
}

// Clear onAck when the ack can no longer be processed. The user's ack would be
// ignored.
func (ac *ackConsumer) Clear() {
ac.mu.Lock()
defer ac.mu.Unlock()
ac.onAck = nil
}

// Represents an uninitialized cursor offset. A sentinel value is used instead
// if an optional to simplify cursor comparisons (i.e. -1 works without the need
// to check for nil and then convert to int64).
const nilCursorOffset int64 = -1

// ackTracker manages outstanding message acks, i.e. messages that have been
// delivered to the user, but not yet acked. It is used by the committer and
// wireSubscriber, so requires its own mutex.
type ackTracker struct {
// Guards access to fields below.
mu sync.Mutex
// All offsets before and including this prefix have been acked by the user.
ackedPrefixOffset int64
// Outstanding message acks, strictly ordered by increasing message offsets.
outstandingAcks *list.List // Value = *ackConsumer
}

func newAckTracker() *ackTracker {
return &ackTracker{
ackedPrefixOffset: nilCursorOffset,
outstandingAcks: list.New(),
}
}

// Push adds an outstanding ack to the tracker.
func (at *ackTracker) Push(ack *ackConsumer) error {
at.mu.Lock()
defer at.mu.Unlock()

// These errors should not occur unless there is a bug in the client library
// as message ordering should have been validated by subscriberOffsetTracker.
if ack.Offset <= at.ackedPrefixOffset {
return errOutOfOrderMessages
}
if elem := at.outstandingAcks.Back(); elem != nil {
lastOutstandingAck, _ := elem.Value.(*ackConsumer)
if ack.Offset <= lastOutstandingAck.Offset {
return errOutOfOrderMessages
}
}

at.outstandingAcks.PushBack(ack)
return nil
}

// CommitOffset returns the cursor offset that should be committed. May return
// nilCursorOffset if no messages have been acked thus far.
func (at *ackTracker) CommitOffset() int64 {
at.mu.Lock()
defer at.mu.Unlock()

// Process outstanding acks and update `ackedPrefixOffset` until an unacked
// message is found.
for {
elem := at.outstandingAcks.Front()
if elem == nil {
break
}
ack, _ := elem.Value.(*ackConsumer)
if !ack.IsAcked() {
break
}
at.ackedPrefixOffset = ack.Offset
at.outstandingAcks.Remove(elem)
ack.Clear()
}

if at.ackedPrefixOffset == nilCursorOffset {
return nilCursorOffset
}
// Convert from last acked to first unacked, which is the commit offset.
return at.ackedPrefixOffset + 1
}

// Release clears and invalidates any outstanding acks. This should be called
// when the subscriber terminates.
func (at *ackTracker) Release() {
at.mu.Lock()
defer at.mu.Unlock()

for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() {
ack, _ := elem.Value.(*ackConsumer)
ack.Clear()
}
at.outstandingAcks.Init()
}

// commitCursorTracker tracks pending and last successful committed offsets.
// It is only accessed by the committer.
type commitCursorTracker struct {
// Used to obtain the desired commit offset based on messages acked by the
// user.
acks *ackTracker
// Last offset for which the server confirmed (acknowledged) the commit.
lastConfirmedOffset int64
// Queue of committed offsets awaiting confirmation from the server.
pendingOffsets *list.List // Value = int64
}

func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker {
return &commitCursorTracker{
acks: acks,
lastConfirmedOffset: nilCursorOffset,
pendingOffsets: list.New(),
}
}

func extractOffsetFromElem(elem *list.Element) int64 {
if elem == nil {
return nilCursorOffset
}
offset, _ := elem.Value.(int64)
return offset
}

// NextOffset is the commit offset to be sent to the stream. Returns
// nilCursorOffset if the commit offset does not need to be updated.
func (ct *commitCursorTracker) NextOffset() int64 {
desiredCommitOffset := ct.acks.CommitOffset()
if desiredCommitOffset <= ct.lastConfirmedOffset {
// The server has already confirmed the commit offset.
return nilCursorOffset
}
if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) {
// The commit offset has already been sent to the commit stream and is
// awaiting confirmation.
return nilCursorOffset
}
return desiredCommitOffset
}

// AddPending adds a sent, but not yet confirmed, committed offset.
func (ct *commitCursorTracker) AddPending(offset int64) {
ct.pendingOffsets.PushBack(offset)
}

// ClearPending discards old pending offsets. Should be called when the commit
// stream reconnects, as the server acknowledgments for these would not be
// received.
func (ct *commitCursorTracker) ClearPending() {
ct.pendingOffsets.Init()
}

// ConfirmOffsets processes the server's acknowledgment of the first
// `numConfirmed` pending offsets.
func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error {
if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed {
return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending)
}

for i := int64(0); i < numConfirmed; i++ {
front := ct.pendingOffsets.Front()
ct.lastConfirmedOffset = extractOffsetFromElem(front)
ct.pendingOffsets.Remove(front)
}
return nil
}

// Done when the server has confirmed the desired commit offset.
func (ct *commitCursorTracker) Done() bool {
return ct.acks.CommitOffset() <= ct.lastConfirmedOffset
}

0 comments on commit 26599a0

Please sign in to comment.