-
Notifications
You must be signed in to change notification settings - Fork 2
/
iterator.go
267 lines (237 loc) · 6.29 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package statedb
import (
"bytes"
"fmt"
)
// Collect creates a slice of objects out of the iterator.
// The iterator is consumed in the process.
func Collect[Obj any](iter Iterator[Obj]) []Obj {
objs := []Obj{}
for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() {
objs = append(objs, obj)
}
return objs
}
// ProcessEach invokes the given function for each object provided by the iterator.
func ProcessEach[Obj any, It Iterator[Obj]](iter It, fn func(Obj, Revision) error) (err error) {
for obj, rev, ok := iter.Next(); ok; obj, rev, ok = iter.Next() {
err = fn(obj, rev)
if err != nil {
return
}
}
return
}
// iterator adapts the "any" object iterator to a typed object.
type iterator[Obj any] struct {
iter interface{ Next() ([]byte, object, bool) }
}
func (it *iterator[Obj]) Next() (obj Obj, revision uint64, ok bool) {
_, iobj, ok := it.iter.Next()
if ok {
obj = iobj.data.(Obj)
revision = iobj.revision
}
return
}
// Map applies a function to transform every object returned by the iterator
func Map[In, Out any, It Iterator[In]](iter It, transform func(In) Out) Iterator[Out] {
return &mapIterator[In, Out]{
iter: iter,
transform: transform,
}
}
type mapIterator[In, Out any] struct {
iter Iterator[In]
transform func(In) Out
}
func (it *mapIterator[In, Out]) Next() (out Out, revision Revision, ok bool) {
obj, rev, ok := it.iter.Next()
if ok {
return it.transform(obj), rev, true
}
return
}
// Filter includes objects for which the supplied predicate returns true
func Filter[Obj any, It Iterator[Obj]](iter It, pred func(Obj) bool) Iterator[Obj] {
return &filterIterator[Obj]{
iter: iter,
pred: pred,
}
}
type filterIterator[Obj any] struct {
iter Iterator[Obj]
pred func(Obj) bool
}
func (it *filterIterator[Obj]) Next() (out Obj, revision Revision, ok bool) {
for {
out, revision, ok = it.iter.Next()
if !ok {
break
}
if it.pred(out) {
return out, revision, true
}
}
return
}
// uniqueIterator iterates over objects in a unique index. Since
// we find the node by prefix search, we may see a key that shares
// the search prefix but is longer. We skip those objects.
type uniqueIterator[Obj any] struct {
iter interface{ Next() ([]byte, object, bool) }
key []byte
}
func (it *uniqueIterator[Obj]) Next() (obj Obj, revision uint64, ok bool) {
var iobj object
for {
var key []byte
key, iobj, ok = it.iter.Next()
if !ok || bytes.Equal(key, it.key) {
break
}
}
if ok {
obj = iobj.data.(Obj)
revision = iobj.revision
}
return
}
// nonUniqueIterator iterates over a non-unique index. Since we seek by prefix and don't
// require that indexers terminate the keys, the iterator checks that the prefix
// has the right length.
type nonUniqueIterator[Obj any] struct {
iter interface{ Next() ([]byte, object, bool) }
key []byte
}
func (it *nonUniqueIterator[Obj]) Next() (obj Obj, revision uint64, ok bool) {
var iobj object
for {
var key []byte
key, iobj, ok = it.iter.Next()
if !ok {
return
}
_, secondary := decodeNonUniqueKey(key)
// Equal length implies equal key since we got here via
// prefix search and all child nodes share the same prefix.
if len(secondary) == len(it.key) {
break
}
// This node has a longer secondary key that shares our search
// prefix, skip it.
}
if ok {
obj = iobj.data.(Obj)
revision = iobj.revision
}
return
}
func NewDualIterator[Obj any](left, right Iterator[Obj]) *DualIterator[Obj] {
return &DualIterator[Obj]{
left: iterState[Obj]{iter: left},
right: iterState[Obj]{iter: right},
}
}
type iterState[Obj any] struct {
iter Iterator[Obj]
obj Obj
rev Revision
ok bool
}
// DualIterator allows iterating over two iterators in revision order.
// Meant to be used for combined iteration of LowerBound(ByRevision)
// and Deleted().
type DualIterator[Obj any] struct {
left iterState[Obj]
right iterState[Obj]
}
func (it *DualIterator[Obj]) Next() (obj Obj, revision uint64, fromLeft, ok bool) {
// Advance the iterators
if !it.left.ok && it.left.iter != nil {
it.left.obj, it.left.rev, it.left.ok = it.left.iter.Next()
if !it.left.ok {
it.left.iter = nil
}
}
if !it.right.ok && it.right.iter != nil {
it.right.obj, it.right.rev, it.right.ok = it.right.iter.Next()
if !it.right.ok {
it.right.iter = nil
}
}
// Find the lowest revision object
switch {
case !it.left.ok && !it.right.ok:
ok = false
return
case it.left.ok && !it.right.ok:
it.left.ok = false
return it.left.obj, it.left.rev, true, true
case it.right.ok && !it.left.ok:
it.right.ok = false
return it.right.obj, it.right.rev, false, true
case it.left.rev <= it.right.rev:
it.left.ok = false
return it.left.obj, it.left.rev, true, true
case it.right.rev <= it.left.rev:
it.right.ok = false
return it.right.obj, it.right.rev, false, true
default:
panic(fmt.Sprintf("BUG: Unhandled case: %+v", it))
}
}
type changeIterator[Obj any] struct {
table Table[Obj]
revision Revision
dt *deleteTracker[Obj]
iter *DualIterator[Obj]
watch <-chan struct{}
}
func (it *changeIterator[Obj]) Next() (ev Change[Obj], revision uint64, ok bool) {
if it.iter == nil {
return
}
ev.Object, revision, ev.Deleted, ok = it.iter.Next()
if !ok {
it.iter = nil
return
}
ev.Revision = revision
it.revision = revision
if ev.Deleted {
it.dt.mark(revision)
}
return
}
func (it *changeIterator[Obj]) Watch(txn ReadTxn) <-chan struct{} {
if it.iter == nil {
// Iterator has been exhausted, check if we need to requery
// or whether we need to wait for changes first.
select {
case <-it.watch:
default:
// Watch channel not closed yet, so return the same watch
// channel.
return it.watch
}
updateIter, watch := it.table.LowerBound(txn, ByRevision[Obj](it.revision+1))
deleteIter := it.dt.deleted(txn, it.revision+1)
it.iter = NewDualIterator(deleteIter, updateIter)
// It is enough to watch the revision index and not the graveyard since
// any object that is inserted into the graveyard will be deleted from
// the revision index.
it.watch = watch
// Return a closed watch channel to immediately trigger iteration.
return closedWatchChannel
}
return it.watch
}
func (it *changeIterator[Obj]) Close() {
if it.dt != nil {
it.dt.close()
}
*it = changeIterator[Obj]{}
}