/
acks.go
247 lines (215 loc) · 7.18 KB
/
acks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"container/list"
"fmt"
"sync"
)
// AckConsumer is the interface exported from this package for acking messages.
type AckConsumer interface {
Ack()
}
// ackedFunc is invoked when a message has been acked by the user. Note: if the
// ackedFunc implementation calls any ackConsumer methods, it needs to run in a
// goroutine to avoid a deadlock.
type ackedFunc func(*ackConsumer)
// ackConsumer is used for handling message acks. It is attached to a Message
// and also stored within the ackTracker until the message has been acked by the
// user.
type ackConsumer struct {
// The message offset.
Offset int64
// Bytes released to the flow controller once the message has been acked.
MsgBytes int64
// Guards access to fields below.
mu sync.Mutex
acked bool
onAck ackedFunc
}
func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer {
return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck}
}
func (ac *ackConsumer) Ack() {
ac.mu.Lock()
defer ac.mu.Unlock()
if ac.acked {
return
}
ac.acked = true
if ac.onAck != nil {
// Not invoked in a goroutine here for ease of testing.
ac.onAck(ac)
}
}
func (ac *ackConsumer) IsAcked() bool {
ac.mu.Lock()
defer ac.mu.Unlock()
return ac.acked
}
// Clear onAck when the ack can no longer be processed. The user's ack would be
// ignored.
func (ac *ackConsumer) Clear() {
ac.mu.Lock()
defer ac.mu.Unlock()
ac.onAck = nil
}
// Represents an uninitialized cursor offset. A sentinel value is used instead
// if an optional to simplify cursor comparisons (i.e. -1 works without the need
// to check for nil and then convert to int64).
const nilCursorOffset int64 = -1
// ackTracker manages outstanding message acks, i.e. messages that have been
// delivered to the user, but not yet acked. It is used by the committer and
// subscribeStream, so requires its own mutex.
type ackTracker struct {
// Guards access to fields below.
mu sync.Mutex
// All offsets before and including this prefix have been acked by the user.
ackedPrefixOffset int64
// Outstanding message acks, strictly ordered by increasing message offsets.
outstandingAcks *list.List // Value = *ackConsumer
}
func newAckTracker() *ackTracker {
return &ackTracker{
ackedPrefixOffset: nilCursorOffset,
outstandingAcks: list.New(),
}
}
// Push adds an outstanding ack to the tracker.
func (at *ackTracker) Push(ack *ackConsumer) error {
at.mu.Lock()
defer at.mu.Unlock()
// These errors should not occur unless there is a bug in the client library
// as message ordering should have been validated by subscriberOffsetTracker.
if ack.Offset <= at.ackedPrefixOffset {
return errOutOfOrderMessages
}
if elem := at.outstandingAcks.Back(); elem != nil {
lastOutstandingAck, _ := elem.Value.(*ackConsumer)
if ack.Offset <= lastOutstandingAck.Offset {
return errOutOfOrderMessages
}
}
at.outstandingAcks.PushBack(ack)
return nil
}
// CommitOffset returns the cursor offset that should be committed. May return
// nilCursorOffset if no messages have been acked thus far.
func (at *ackTracker) CommitOffset() int64 {
at.mu.Lock()
defer at.mu.Unlock()
// Process outstanding acks and update `ackedPrefixOffset` until an unacked
// message is found.
for {
elem := at.outstandingAcks.Front()
if elem == nil {
break
}
ack, _ := elem.Value.(*ackConsumer)
if !ack.IsAcked() {
break
}
at.ackedPrefixOffset = ack.Offset
at.outstandingAcks.Remove(elem)
ack.Clear()
}
if at.ackedPrefixOffset == nilCursorOffset {
return nilCursorOffset
}
// Convert from last acked to first unacked, which is the commit offset.
return at.ackedPrefixOffset + 1
}
// Release clears and invalidates any outstanding acks. This should be called
// when the subscriber terminates.
func (at *ackTracker) Release() {
at.mu.Lock()
defer at.mu.Unlock()
for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() {
ack, _ := elem.Value.(*ackConsumer)
ack.Clear()
}
at.outstandingAcks.Init()
}
// Empty when there are no outstanding acks.
func (at *ackTracker) Empty() bool {
at.mu.Lock()
defer at.mu.Unlock()
return at.outstandingAcks.Len() == 0
}
// commitCursorTracker tracks pending and last successful committed offsets.
// It is only accessed by the committer.
type commitCursorTracker struct {
// Used to obtain the desired commit offset based on messages acked by the
// user.
acks *ackTracker
// Last offset for which the server confirmed (acknowledged) the commit.
lastConfirmedOffset int64
// Queue of committed offsets awaiting confirmation from the server.
pendingOffsets *list.List // Value = int64
}
func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker {
return &commitCursorTracker{
acks: acks,
lastConfirmedOffset: nilCursorOffset,
pendingOffsets: list.New(),
}
}
func extractOffsetFromElem(elem *list.Element) int64 {
if elem == nil {
return nilCursorOffset
}
offset, _ := elem.Value.(int64)
return offset
}
// NextOffset is the commit offset to be sent to the stream. Returns
// nilCursorOffset if the commit offset does not need to be updated.
func (ct *commitCursorTracker) NextOffset() int64 {
desiredCommitOffset := ct.acks.CommitOffset()
if desiredCommitOffset <= ct.lastConfirmedOffset {
// The server has already confirmed the commit offset.
return nilCursorOffset
}
if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) {
// The commit offset has already been sent to the commit stream and is
// awaiting confirmation.
return nilCursorOffset
}
return desiredCommitOffset
}
// AddPending adds a sent, but not yet confirmed, committed offset.
func (ct *commitCursorTracker) AddPending(offset int64) {
ct.pendingOffsets.PushBack(offset)
}
// ClearPending discards old pending offsets. Should be called when the commit
// stream reconnects, as the server acknowledgments for these would not be
// received.
func (ct *commitCursorTracker) ClearPending() {
ct.pendingOffsets.Init()
}
// ConfirmOffsets processes the server's acknowledgment of the first
// `numConfirmed` pending offsets.
func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error {
if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed {
return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending)
}
for i := int64(0); i < numConfirmed; i++ {
front := ct.pendingOffsets.Front()
ct.lastConfirmedOffset = extractOffsetFromElem(front)
ct.pendingOffsets.Remove(front)
}
return nil
}
// Done when the server has confirmed the desired commit offset.
func (ct *commitCursorTracker) Done() bool {
return ct.acks.CommitOffset() <= ct.lastConfirmedOffset
}