/
OptimizedObservedRemoveSetV2.cs
392 lines (345 loc) · 16.1 KB
/
OptimizedObservedRemoveSetV2.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using Nyris.Crdt.Interfaces;
using Nyris.Crdt.Model;
using Range = Nyris.Crdt.Model.Range;
namespace Nyris.Crdt.Sets
{
/// <summary>
/// This is a performance-optimized version of <see cref="OptimizedObservedRemoveSet{TActorId,TItem}"/>
/// It is thread safe with supported concurrent reads.
/// </summary>
/// <typeparam name="TActorId"></typeparam>
/// <typeparam name="TItem"></typeparam>
[Obsolete("Please use OptimizedObservedRemoveSetV3 instead", false)]
public class OptimizedObservedRemoveSetV2<TActorId, TItem>
: SetChangesNotifier<TItem>,
ICRDT<ObservedRemoveDtos<TActorId, TItem>.Dto>,
IDeltaCrdt<ObservedRemoveDtos<TActorId, TItem>.DeltaDto, ObservedRemoveDtos<TActorId, TItem>.CausalTimestamp>,
IDisposable
where TItem : IEquatable<TItem>
where TActorId : IEquatable<TActorId>, IComparable<TActorId>
{
private readonly ConcurrentDictionary<TActorId, VersionedItemList<TItem>> _items = new();
private readonly VersionContext<TActorId> _versionContext = new();
// lock is used for operations, that rely on _items and _versionContext being in sync
private readonly ReaderWriterLockSlim _lock = new();
public HashSet<TItem> Values
{
get
{
var values = new HashSet<TItem>();
foreach (var batch in _items.Values.SelectMany(dottedList => dottedList))
{
var span = batch.Span;
for (var i = 0; i < span.Length; ++i)
{
values.Add(span[i].Item);
}
}
return values;
}
}
// TODO: maybe optimize this to be updated-on-write and just a constant lookup on read
public ulong StorageSize => _items.Values.Aggregate(_versionContext.StorageSize, (current, dottedList) => current + dottedList.StorageSize);
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing) return;
_lock.Dispose();
}
public bool Contains(TItem item)
{
foreach (var dottedList in _items.Values)
{
if (dottedList.Contains(item)) return true;
}
return false;
}
#region Mutations
public ImmutableArray<ObservedRemoveDtos<TActorId,TItem>.DeltaDto> Add(TItem item, TActorId actor)
{
Debug.Assert(item is not null);
Debug.Assert(actor is not null);
_lock.EnterWriteLock();
try
{
var itemAlreadyPresent = Contains(item);
// we add item even if already present, cause of potential concurrent complications
// For example, think of a case when current item was added long time ago,
// AND there was a concurrent removal that was not yet propagated.
// In this case if we do not add an item, once removal is propagated, item will be gone.
// Checking if item is present in the set before operation is needed only to notify
// observers about state change
var version = _versionContext.Increment(actor);
if (!_items.TryGetValue(actor, out var actorsDottedItems))
{
_items[actor] = actorsDottedItems = VersionedItemList<TItem>.New();
}
actorsDottedItems.TryAdd(item, version, out var removedVersion, true);
if (!itemAlreadyPresent)
{
NotifyAdded(item);
}
return removedVersion.HasValue
? ImmutableArray.Create(ObservedRemoveDtos<TActorId,TItem>.DeltaDto.Added(item, actor, version),
ObservedRemoveDtos<TActorId,TItem>.DeltaDto.Removed(actor, removedVersion.Value))
: ImmutableArray.Create(ObservedRemoveDtos<TActorId,TItem>.DeltaDto.Added(item, actor, version));
}
finally
{
_lock.ExitWriteLock();
}
}
public ImmutableArray<ObservedRemoveDtos<TActorId,TItem>.DeltaDto> Remove(TItem item)
{
Debug.Assert(item is not null);
_lock.EnterWriteLock(); // although _versionContext is not updated, we need to update multiple dottedLists atomically
try
{
var dtos = ImmutableArray.CreateBuilder<ObservedRemoveDtos<TActorId,TItem>.DeltaDto>(_items.Count);
foreach (var (actorId, actorsDottedItems) in _items)
{
if (actorsDottedItems.TryRemove(item, out var removedDot))
{
dtos.Add(ObservedRemoveDtos<TActorId,TItem>.DeltaDto.Removed(actorId, removedDot));
}
}
if (dtos.Count > 0)
{
NotifyRemoved(item);
}
return dtos.ToImmutable();
}
finally
{
_lock.ExitWriteLock();
}
}
#endregion
#region Crdt
public MergeResult Merge(ObservedRemoveDtos<TActorId, TItem>.Dto other)
{
_lock.EnterWriteLock();
try
{
foreach (var actorId in other.VersionContext.Keys)
{
var otherItems = other.Items[actorId];
// if we have not seen this actor, just take everything for this actor from other
if (!_versionContext.TryGetValue(actorId, out var myRange))
{
var newItems = new List<TItem>();
foreach (var item in otherItems.Keys)
{
if(!Contains(item)) newItems.Add(item);
}
_items[actorId] = new VersionedItemList<TItem>(otherItems);
foreach (var range in other.VersionContext[actorId])
{
_versionContext.Merge(actorId, range);
}
foreach (var item in newItems)
{
NotifyAdded(item);
}
continue;
}
var myItems = _items[actorId];
// check which items to add or which dots to update
foreach (var (item, otherDot) in otherItems)
{
// if 'other' has items which we have not seen (i.e. their dot is bigger then value from our version vector), take that item
if (!myRange.Contains(otherDot))
{
var itemInSet = Contains(item); // check if item anywhere in set
myItems.TryAdd(item, otherDot);
if(!itemInSet) NotifyAdded(item);
}
// if 'other' has items that we seen and our dot is lower, update the dot
else if (myItems.TryGetValue(item, out var myDot) && myDot < otherDot)
{
myItems.TryUpdate(item, otherDot, myDot);
}
// if 'other' has items with dot lower then our dot, then we don't need to update - ours is newer
// if 'other' has items with a dot lower then our version, but we don't have them - it means we already observed their deletion
}
var otherRangeList = other.VersionContext[actorId];
// check which items to remove
using var otherRanges = new ConcurrentVersionRanges(otherRangeList);
foreach (var batch in myItems)
{
var span = batch.Span;
for (var i = 0; i < batch.Length; ++i)
{
var item = span[i].Item;
if (otherRanges.Contains(span[i].Dot) && !otherItems.ContainsKey(item))
{
myItems.TryRemove(item, out _);
if(!Contains(item)) NotifyRemoved(item);
}
}
}
foreach (var range in other.VersionContext[actorId])
{
_versionContext.Merge(actorId, range);
}
}
}
finally
{
_lock.ExitWriteLock();
}
return MergeResult.ConflictSolved;
}
public ObservedRemoveDtos<TActorId, TItem>.Dto ToDto()
{
_lock.EnterReadLock();
try
{
var versionVector = _versionContext.ToDictionary(pair => pair.Value.ToImmutable());
var items = _items.ToImmutableDictionary(
pair => pair.Key,
pair => pair.Value
.SelectMany(batch => batch.ToArray())
.ToImmutableDictionary(p => p.Item, p => p.Dot));
return new ObservedRemoveDtos<TActorId, TItem>.Dto(versionVector, items);
}
finally
{
_lock.ExitReadLock();
}
}
#endregion
#region DeltaCrdt
public ObservedRemoveDtos<TActorId,TItem>.CausalTimestamp GetLastKnownTimestamp()
{
return new ObservedRemoveDtos<TActorId,TItem>.CausalTimestamp(_versionContext.ToDictionary(pair =>
{
var array = pair.Value.ToArray();
return Unsafe.As<Range[], ImmutableArray<Range>>(ref array);
}));
}
public IEnumerable<ObservedRemoveDtos<TActorId,TItem>.DeltaDto> EnumerateDeltaDtos(ObservedRemoveDtos<TActorId,TItem>.CausalTimestamp? timestamp = default)
{
var since = timestamp?.Since ?? ImmutableDictionary<TActorId, ImmutableArray<Range>>.Empty;
foreach (var actorId in _versionContext.Actors)
{
if (!_versionContext.TryGetValue(actorId, out var myDotRanges)
|| !_items.TryGetValue(actorId, out var myItems)) continue;
if (!since.TryGetValue(actorId, out var except)) except = ImmutableArray<Range>.Empty;
// every new item, that was added since provided version
foreach (var batch in myItems.GetItemsOutsideRanges(except))
{
var innerArray = batch.Array!; // faster indexing
for (var i = batch.Offset; i < batch.Count; ++i)
{
var (item, version) = innerArray[i];
yield return ObservedRemoveDtos<TActorId,TItem>.DeltaDto.Added(item, actorId, version);
}
}
// every item, that was removed (we don't know the items, but we can restore dots)
var emptyRanges = GetEmptyRanges(myItems, myDotRanges);
for (var i = 0; i < emptyRanges.Length; ++i)
{
yield return ObservedRemoveDtos<TActorId,TItem>.DeltaDto.Removed(actorId, emptyRanges[i]);
}
}
}
public DeltaMergeResult Merge(ObservedRemoveDtos<TActorId,TItem>.DeltaDto delta)
{
_lock.EnterWriteLock();
try
{
return MergeInternal(delta);
}
finally
{
_lock.ExitWriteLock();
}
}
public DeltaMergeResult Merge(ImmutableArray<ObservedRemoveDtos<TActorId,TItem>.DeltaDto> deltas)
{
_lock.EnterWriteLock();
try
{
var stateUpdated = false;
for (var i = 0; i < deltas.Length; ++i)
{
stateUpdated |= DeltaMergeResult.StateUpdated == MergeInternal(deltas[i]);
}
return stateUpdated ? DeltaMergeResult.StateUpdated : DeltaMergeResult.StateNotChanged;
}
finally
{
_lock.ExitWriteLock();
}
}
private DeltaMergeResult MergeInternal(ObservedRemoveDtos<TActorId,TItem>.DeltaDto delta)
{
var actorId = delta.Actor;
var actorsDottedItems = _items.GetOrAdd(actorId, _ => VersionedItemList<TItem>.New());
var observedRanges = _versionContext.GetOrAdd(actorId);
switch (delta)
{
case ObservedRemoveDtos<TActorId,TItem>.DeltaDtoAddition(var item, _, var version):
// first check if version is new. If already observed by context, ignore
if (observedRanges.Contains(version)) return DeltaMergeResult.StateNotChanged;
// item may be also "observed" by another actor - this is important for notifications
var itemAlreadyPresent = Contains(item);
actorsDottedItems.TryAdd(item, version);
_versionContext.Merge(actorId, version);
if(itemAlreadyPresent) NotifyAdded(item);
break;
case ObservedRemoveDtos<TActorId,TItem>.DeltaDtoDeletedDot(_, var version):
// state is updated if we removed something, but also if removal failed because we never observed the addition
// (in which case VersionContext is updated and Merge returns true)
var versionContextUpdated = observedRanges.Merge(version);
var itemRemoved = actorsDottedItems.TryRemove(version, out var removedItem);
// if item is removed from actorsDottedItems, check if it's present in another actor
// if not, then notify observers
if (itemRemoved && !Contains(removedItem!)) NotifyRemoved(removedItem!);
var stateUpdated = versionContextUpdated || itemRemoved;
return stateUpdated ? DeltaMergeResult.StateUpdated : DeltaMergeResult.StateNotChanged;
case ObservedRemoveDtos<TActorId,TItem>.DeltaDtoDeletedRange(_, var range):
stateUpdated = actorsDottedItems.TryRemove(range, out var removedItems) | observedRanges.Merge(range);
// this piece is probably the best argument in favor of item-centered design of the set
// (in contrast to actor centered used here)
if (removedItems is not null)
{
foreach (var item in removedItems)
{
if(!Contains(item)) NotifyRemoved(item);
}
}
return stateUpdated ? DeltaMergeResult.StateUpdated : DeltaMergeResult.StateNotChanged;
default:
throw new ArgumentOutOfRangeException(nameof(delta));
}
return DeltaMergeResult.StateUpdated;
}
private ImmutableArray<Range> GetEmptyRanges(VersionedItemList<TItem> versionedItemList, ConcurrentVersionRanges ranges)
{
_lock.EnterReadLock(); // needs to be locked, as reads from both VersionContext and items
try
{
return versionedItemList.GetEmptyRanges(ranges.ToImmutable());
}
finally
{
_lock.ExitReadLock();
}
}
#endregion
}
}