From 26599a0995d9b108bbaaceca775457ffc331dcb2 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 5 Nov 2020 14:36:40 +1100 Subject: [PATCH] feat(pubsublite): Trackers for acks and commit cursor (#3137) These are similar to their Java counterparts with the following differences: - ackConsumer is similar to AckReplyConsumer. - ackTracker is equivalent to AckSetTrackerImpl. It maintains a single ordered queue of unacked offsets and stores the desired commit offset in ackedPrefixOffset. - commitCursorTracker is equivalent to CommitState. It maintains the last commit offset confirmed by the server and a queue of unacknowledged committed offsets. --- pubsublite/internal/wire/acks.go | 240 ++++++++++++++++++++ pubsublite/internal/wire/acks_test.go | 311 ++++++++++++++++++++++++++ 2 files changed, 551 insertions(+) create mode 100644 pubsublite/internal/wire/acks.go create mode 100644 pubsublite/internal/wire/acks_test.go diff --git a/pubsublite/internal/wire/acks.go b/pubsublite/internal/wire/acks.go new file mode 100644 index 00000000000..0a7c208e522 --- /dev/null +++ b/pubsublite/internal/wire/acks.go @@ -0,0 +1,240 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "container/list" + "fmt" + "sync" +) + +// AckConsumer is the interface exported from this package for acking messages. +type AckConsumer interface { + Ack() +} + +// ackedFunc is invoked when a message has been acked by the user. Note: if the +// ackedFunc implementation calls any ackConsumer methods, it needs to run in a +// goroutine to avoid a deadlock. +type ackedFunc func(*ackConsumer) + +// ackConsumer is used for handling message acks. It is attached to a Message +// and also stored within the ackTracker until the message has been acked by the +// user. +type ackConsumer struct { + // The message offset. + Offset int64 + // Bytes released to the flow controller once the message has been acked. + MsgBytes int64 + + // Guards access to fields below. + mu sync.Mutex + acked bool + onAck ackedFunc +} + +func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer { + return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck} +} + +func (ac *ackConsumer) Ack() { + ac.mu.Lock() + defer ac.mu.Unlock() + + if ac.acked { + return + } + ac.acked = true + if ac.onAck != nil { + // Not invoked in a goroutine here for ease of testing. + ac.onAck(ac) + } +} + +func (ac *ackConsumer) IsAcked() bool { + ac.mu.Lock() + defer ac.mu.Unlock() + return ac.acked +} + +// Clear onAck when the ack can no longer be processed. The user's ack would be +// ignored. +func (ac *ackConsumer) Clear() { + ac.mu.Lock() + defer ac.mu.Unlock() + ac.onAck = nil +} + +// Represents an uninitialized cursor offset. A sentinel value is used instead +// if an optional to simplify cursor comparisons (i.e. -1 works without the need +// to check for nil and then convert to int64). +const nilCursorOffset int64 = -1 + +// ackTracker manages outstanding message acks, i.e. messages that have been +// delivered to the user, but not yet acked. It is used by the committer and +// wireSubscriber, so requires its own mutex. +type ackTracker struct { + // Guards access to fields below. + mu sync.Mutex + // All offsets before and including this prefix have been acked by the user. + ackedPrefixOffset int64 + // Outstanding message acks, strictly ordered by increasing message offsets. + outstandingAcks *list.List // Value = *ackConsumer +} + +func newAckTracker() *ackTracker { + return &ackTracker{ + ackedPrefixOffset: nilCursorOffset, + outstandingAcks: list.New(), + } +} + +// Push adds an outstanding ack to the tracker. +func (at *ackTracker) Push(ack *ackConsumer) error { + at.mu.Lock() + defer at.mu.Unlock() + + // These errors should not occur unless there is a bug in the client library + // as message ordering should have been validated by subscriberOffsetTracker. + if ack.Offset <= at.ackedPrefixOffset { + return errOutOfOrderMessages + } + if elem := at.outstandingAcks.Back(); elem != nil { + lastOutstandingAck, _ := elem.Value.(*ackConsumer) + if ack.Offset <= lastOutstandingAck.Offset { + return errOutOfOrderMessages + } + } + + at.outstandingAcks.PushBack(ack) + return nil +} + +// CommitOffset returns the cursor offset that should be committed. May return +// nilCursorOffset if no messages have been acked thus far. +func (at *ackTracker) CommitOffset() int64 { + at.mu.Lock() + defer at.mu.Unlock() + + // Process outstanding acks and update `ackedPrefixOffset` until an unacked + // message is found. + for { + elem := at.outstandingAcks.Front() + if elem == nil { + break + } + ack, _ := elem.Value.(*ackConsumer) + if !ack.IsAcked() { + break + } + at.ackedPrefixOffset = ack.Offset + at.outstandingAcks.Remove(elem) + ack.Clear() + } + + if at.ackedPrefixOffset == nilCursorOffset { + return nilCursorOffset + } + // Convert from last acked to first unacked, which is the commit offset. + return at.ackedPrefixOffset + 1 +} + +// Release clears and invalidates any outstanding acks. This should be called +// when the subscriber terminates. +func (at *ackTracker) Release() { + at.mu.Lock() + defer at.mu.Unlock() + + for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() { + ack, _ := elem.Value.(*ackConsumer) + ack.Clear() + } + at.outstandingAcks.Init() +} + +// commitCursorTracker tracks pending and last successful committed offsets. +// It is only accessed by the committer. +type commitCursorTracker struct { + // Used to obtain the desired commit offset based on messages acked by the + // user. + acks *ackTracker + // Last offset for which the server confirmed (acknowledged) the commit. + lastConfirmedOffset int64 + // Queue of committed offsets awaiting confirmation from the server. + pendingOffsets *list.List // Value = int64 +} + +func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker { + return &commitCursorTracker{ + acks: acks, + lastConfirmedOffset: nilCursorOffset, + pendingOffsets: list.New(), + } +} + +func extractOffsetFromElem(elem *list.Element) int64 { + if elem == nil { + return nilCursorOffset + } + offset, _ := elem.Value.(int64) + return offset +} + +// NextOffset is the commit offset to be sent to the stream. Returns +// nilCursorOffset if the commit offset does not need to be updated. +func (ct *commitCursorTracker) NextOffset() int64 { + desiredCommitOffset := ct.acks.CommitOffset() + if desiredCommitOffset <= ct.lastConfirmedOffset { + // The server has already confirmed the commit offset. + return nilCursorOffset + } + if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) { + // The commit offset has already been sent to the commit stream and is + // awaiting confirmation. + return nilCursorOffset + } + return desiredCommitOffset +} + +// AddPending adds a sent, but not yet confirmed, committed offset. +func (ct *commitCursorTracker) AddPending(offset int64) { + ct.pendingOffsets.PushBack(offset) +} + +// ClearPending discards old pending offsets. Should be called when the commit +// stream reconnects, as the server acknowledgments for these would not be +// received. +func (ct *commitCursorTracker) ClearPending() { + ct.pendingOffsets.Init() +} + +// ConfirmOffsets processes the server's acknowledgment of the first +// `numConfirmed` pending offsets. +func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error { + if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed { + return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending) + } + + for i := int64(0); i < numConfirmed; i++ { + front := ct.pendingOffsets.Front() + ct.lastConfirmedOffset = extractOffsetFromElem(front) + ct.pendingOffsets.Remove(front) + } + return nil +} + +// Done when the server has confirmed the desired commit offset. +func (ct *commitCursorTracker) Done() bool { + return ct.acks.CommitOffset() <= ct.lastConfirmedOffset +} diff --git a/pubsublite/internal/wire/acks_test.go b/pubsublite/internal/wire/acks_test.go new file mode 100644 index 00000000000..b7204e361db --- /dev/null +++ b/pubsublite/internal/wire/acks_test.go @@ -0,0 +1,311 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import "testing" + +func TestAckConsumerAck(t *testing.T) { + numAcks := 0 + onAck := func(ac *ackConsumer) { + numAcks++ + } + ackConsumer := newAckConsumer(0, 0, onAck) + + // Test duplicate acks. + for i := 0; i < 3; i++ { + ackConsumer.Ack() + + if got, want := ackConsumer.IsAcked(), true; got != want { + t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want) + } + if got, want := numAcks, 1; got != want { + t.Errorf("onAck func called %v times, expected %v call", got, want) + } + } +} + +func TestAckConsumerClear(t *testing.T) { + onAck := func(ac *ackConsumer) { + t.Error("onAck func should not have been called") + } + ackConsumer := newAckConsumer(0, 0, onAck) + ackConsumer.Clear() + ackConsumer.Ack() + + if got, want := ackConsumer.IsAcked(), true; got != want { + t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want) + } +} + +func TestAckTrackerProcessing(t *testing.T) { + ackTracker := newAckTracker() + + // No messages received yet. + if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + + onAck := func(ac *ackConsumer) { + // Nothing to do. + } + ack1 := newAckConsumer(1, 0, onAck) + ack2 := newAckConsumer(2, 0, onAck) + ack3 := newAckConsumer(3, 0, onAck) + if err := ackTracker.Push(ack1); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack2); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack3); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + + // All messages unacked. + if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + + ack1.Ack() + if got, want := ackTracker.CommitOffset(), int64(2); got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + + // Skipped ack2, so the commit offset should not have been updated. + ack3.Ack() + if got, want := ackTracker.CommitOffset(), int64(2); got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + + // Both ack2 and ack3 should be removed from the outstanding acks queue. + ack2.Ack() + if got, want := ackTracker.CommitOffset(), int64(4); got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } + + // Newly received message. + ack4 := newAckConsumer(4, 0, onAck) + if err := ackTracker.Push(ack4); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + ack4.Ack() + if got, want := ackTracker.CommitOffset(), int64(5); got != want { + t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) + } +} + +func TestAckTrackerRelease(t *testing.T) { + ackTracker := newAckTracker() + onAck := func(ac *ackConsumer) { + t.Error("onAck should not be called") + } + ack1 := newAckConsumer(1, 0, onAck) + ack2 := newAckConsumer(2, 0, onAck) + ack3 := newAckConsumer(3, 0, onAck) + + if err := ackTracker.Push(ack1); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack2); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack3); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + + // After clearing outstanding acks, onAck should not be called. + ackTracker.Release() + ack1.Ack() + ack2.Ack() + ack3.Ack() +} + +func TestCommitCursorTrackerProcessing(t *testing.T) { + ackTracker := newAckTracker() + commitTracker := newCommitCursorTracker(ackTracker) + + // No messages received yet. + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + + onAck := func(ac *ackConsumer) { + // Nothing to do. + } + ack1 := newAckConsumer(1, 0, onAck) + ack2 := newAckConsumer(2, 0, onAck) + ack3 := newAckConsumer(3, 0, onAck) + if err := ackTracker.Push(ack1); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack2); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack3); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + + // All messages unacked. + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + + // Msg1 acked and commit sent to stream. + ack1.Ack() + if got, want := commitTracker.NextOffset(), int64(2); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + commitTracker.AddPending(commitTracker.NextOffset()) + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + + // Msg 2 & 3 acked commit and sent to stream. + ack2.Ack() + if got, want := commitTracker.NextOffset(), int64(3); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + ack3.Ack() + if got, want := commitTracker.NextOffset(), int64(4); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + commitTracker.AddPending(commitTracker.NextOffset()) + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + if got, want := commitTracker.Done(), false; got != want { + t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want) + } + + // First 2 pending commits acknowledged. + if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) + } + if err := commitTracker.ConfirmOffsets(2); err != nil { + t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err) + } + if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want { + t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) + } + if got, want := commitTracker.Done(), true; got != want { + t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want) + } +} + +func TestCommitCursorTrackerStreamReconnects(t *testing.T) { + ackTracker := newAckTracker() + commitTracker := newCommitCursorTracker(ackTracker) + + onAck := func(ac *ackConsumer) { + // Nothing to do. + } + ack1 := newAckConsumer(1, 0, onAck) + ack2 := newAckConsumer(2, 0, onAck) + ack3 := newAckConsumer(3, 0, onAck) + if err := ackTracker.Push(ack1); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack2); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if err := ackTracker.Push(ack3); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + + // All messages unacked. + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + + // Msg1 acked and commit sent to stream. + ack1.Ack() + if got, want := commitTracker.NextOffset(), int64(2); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + commitTracker.AddPending(commitTracker.NextOffset()) + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + + // Msg2 acked and commit sent to stream. + ack2.Ack() + if got, want := commitTracker.NextOffset(), int64(3); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + commitTracker.AddPending(commitTracker.NextOffset()) + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + + // Stream breaks and pending offsets are cleared. + commitTracker.ClearPending() + if got, want := commitTracker.Done(), false; got != want { + t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want) + } + // When the stream reconnects the next offset should be 3 (offset 2 skipped). + if got, want := commitTracker.NextOffset(), int64(3); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + commitTracker.AddPending(commitTracker.NextOffset()) + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + + // Msg2 acked and commit sent to stream. + ack3.Ack() + if got, want := commitTracker.NextOffset(), int64(4); got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + commitTracker.AddPending(commitTracker.NextOffset()) + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } + + // Only 1 pending commit confirmed. + if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) + } + if err := commitTracker.ConfirmOffsets(1); err != nil { + t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err) + } + if got, want := commitTracker.lastConfirmedOffset, int64(3); got != want { + t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) + } + if got, want := commitTracker.Done(), false; got != want { + t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want) + } + + // Final pending commit confirmed. + if err := commitTracker.ConfirmOffsets(1); err != nil { + t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err) + } + if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want { + t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) + } + if got, want := commitTracker.Done(), true; got != want { + t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want) + } + + // Note: Done() returns true even though there are unacked messages. + ack4 := newAckConsumer(4, 0, onAck) + if err := ackTracker.Push(ack4); err != nil { + t.Errorf("ackTracker.Push() got err %v", err) + } + if got, want := commitTracker.Done(), true; got != want { + t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want) + } + if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { + t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) + } +}