-
Notifications
You must be signed in to change notification settings - Fork 224
/
HistogramVector.scala
593 lines (507 loc) · 26.5 KB
/
HistogramVector.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
package filodb.memory.format.vectors
import java.nio.ByteBuffer
import com.typesafe.scalalogging.StrictLogging
import debox.Buffer
import org.agrona.{DirectBuffer, ExpandableArrayBuffer, MutableDirectBuffer}
import org.agrona.concurrent.UnsafeBuffer
import scalaxy.loops._
import filodb.memory.{BinaryRegion, MemFactory}
import filodb.memory.format._
import filodb.memory.format.BinaryVector.BinaryVectorPtr
import filodb.memory.format.MemoryReader._
/**
* BinaryHistogram is the binary format for a histogram binary blob included in BinaryRecords and sent over the wire.
* It fits the BinaryRegionMedium protocol.
* Format:
* +0000 u16 2-byte total length of this BinaryHistogram (excluding this length)
* +0002 u8 1-byte combined histogram buckets and values format code
* 0x00 Empty/null histogram
* 0x03 geometric + NibblePacked delta Long values
* 0x04 geometric_1 + NibblePacked delta Long values (see [[HistogramBuckets]])
* 0x05 custom LE/bucket values + NibblePacked delta Long values
*
* +0003 u16 2-byte length of Histogram bucket definition
* +0005 [u8] Histogram bucket definition, see [[HistogramBuckets]]
* First two bytes of definition is always the number of buckets, a u16
* +(5+n) remaining values according to format above
*
* NOTE: most of the methods below actually expect a pointer to the +2 hist bucket definition, not the length field
*/
object BinaryHistogram extends StrictLogging {
// Pass in a buffer which includes the length bytes. Value class - no allocations.
case class BinHistogram(buf: DirectBuffer) extends AnyVal {
def totalLength: Int = buf.getShort(0).toInt + 2
def numBuckets: Int = buf.getShort(5).toInt
def formatCode: Byte = buf.getByte(2)
def bucketDefNumBytes: Int = buf.getShort(3).toInt
def bucketDefOffset: Long = buf.addressOffset + 5
def valuesIndex: Int = 2 + 3 + bucketDefNumBytes // pointer to values bytes
def valuesNumBytes: Int = totalLength - valuesIndex
def valuesByteSlice: DirectBuffer = {
UnsafeUtils.wrapDirectBuf(buf.byteArray, buf.addressOffset + valuesIndex, valuesNumBytes, valuesBuf)
valuesBuf
}
override def toString: String = s"<BinHistogram: ${toHistogram}>"
def debugStr: String = s"totalLen=$totalLength numBuckets=$numBuckets formatCode=$formatCode " +
s"bucketDef=$bucketDefNumBytes bytes valuesIndex=$valuesIndex values=$valuesNumBytes bytes"
/**
* Converts this BinHistogram to a Histogram object. May not be the most efficient.
* Intended for slower paths such as high level (lower # samples) aggregation and HTTP/CLI materialization
* by clients. Materializes/deserializes everything.
* Ingestion ingests BinHistograms directly without conversion to Histogram first.
*/
def toHistogram: Histogram = formatCode match {
case HistFormat_Geometric_Delta =>
val bucketDef = HistogramBuckets.geometric(buf.byteArray, bucketDefOffset, false)
LongHistogram.fromPacked(bucketDef, valuesByteSlice).getOrElse(Histogram.empty)
case HistFormat_Geometric1_Delta =>
val bucketDef = HistogramBuckets.geometric(buf.byteArray, bucketDefOffset, true)
LongHistogram.fromPacked(bucketDef, valuesByteSlice).getOrElse(Histogram.empty)
case HistFormat_Custom_Delta =>
val bucketDef = HistogramBuckets.custom(buf.byteArray, bucketDefOffset - 2)
LongHistogram.fromPacked(bucketDef, valuesByteSlice).getOrElse(Histogram.empty)
case x =>
logger.debug(s"Unrecognizable histogram format code $x, returning empty histogram")
Histogram.empty
}
}
// Thread local buffer used as read-only byte slice
private val tlValuesBuf = new ThreadLocal[DirectBuffer]()
def valuesBuf: DirectBuffer = tlValuesBuf.get match {
case UnsafeUtils.ZeroPointer => val buf = new UnsafeBuffer(Array.empty[Byte])
tlValuesBuf.set(buf)
buf
case b: DirectBuffer => b
}
// Thread local buffer used as temp buffer for writing binary histograms
private val tlHistBuf = new ThreadLocal[MutableDirectBuffer]()
def histBuf: MutableDirectBuffer = tlHistBuf.get match {
case UnsafeUtils.ZeroPointer => val buf = new ExpandableArrayBuffer(4096)
tlHistBuf.set(buf)
buf
case b: MutableDirectBuffer => b
}
val empty2DSink = NibblePack.DeltaDiffPackSink(Array[Long](), histBuf)
val emptySectSink = UnsafeUtils.ZeroPointer.asInstanceOf[NibblePack.DeltaSectDiffPackSink]
val HistFormat_Null = 0x00.toByte
val HistFormat_Geometric_Delta = 0x03.toByte
val HistFormat_Geometric1_Delta = 0x04.toByte
val HistFormat_Custom_Delta = 0x05.toByte
def isValidFormatCode(code: Byte): Boolean =
(code == HistFormat_Null) || (code == HistFormat_Geometric1_Delta) || (code == HistFormat_Geometric_Delta) ||
(code == HistFormat_Custom_Delta)
/**
* Writes binary histogram with geometric bucket definition and data which is non-increasing, but will be
* decoded as increasing. Intended only for specific use cases when the source histogram are non increasing
* buckets, ie each bucket has a count that is independent.
* @param buf the buffer to write the histogram to. Highly recommended this be an ExpandableArrayBuffer or equiv.
* so it can grow.
* @return the number of bytes written, including the length prefix
*/
def writeNonIncreasing(buckets: GeometricBuckets, values: Array[Long], buf: MutableDirectBuffer): Int = {
require(buckets.numBuckets == values.size, s"Values array size of ${values.size} != ${buckets.numBuckets}")
val formatCode = if (buckets.minusOne) HistFormat_Geometric1_Delta else HistFormat_Geometric_Delta
buf.putByte(2, formatCode)
val valuesIndex = buckets.serialize(buf, 3)
val finalPos = NibblePack.packNonIncreasing(values, buf, valuesIndex)
require(finalPos <= 65535, s"Histogram data is too large: $finalPos bytes needed")
buf.putShort(0, (finalPos - 2).toShort)
finalPos
}
def writeDelta(buckets: HistogramBuckets, values: Array[Long]): Int =
writeDelta(buckets, values, histBuf)
/**
* Encodes binary histogram with geometric bucket definition and data which is strictly increasing and positive.
* All histograms after ingestion are expected to be increasing.
* Delta encoding is applied for compression.
* @param buf the buffer to write the histogram to. Highly recommended this be an ExpandableArrayBuffer or equiv.
* so it can grow.
* @return the number of bytes written, including the length prefix
*/
def writeDelta(buckets: HistogramBuckets, values: Array[Long], buf: MutableDirectBuffer): Int = {
require(buckets.numBuckets == values.size, s"Values array size of ${values.size} != ${buckets.numBuckets}")
val formatCode = buckets match {
case g: GeometricBuckets if g.minusOne => HistFormat_Geometric1_Delta
case g: GeometricBuckets => HistFormat_Geometric_Delta
case c: CustomBuckets if c.numBuckets == 0 => HistFormat_Null
case c: CustomBuckets => HistFormat_Custom_Delta
}
buf.putByte(2, formatCode)
val finalPos = if (formatCode == HistFormat_Null) { 3 }
else {
val valuesIndex = buckets.serialize(buf, 3)
NibblePack.packDelta(values, buf, valuesIndex)
}
require(finalPos <= 65535, s"Histogram data is too large: $finalPos bytes needed")
buf.putShort(0, (finalPos - 2).toShort)
finalPos
}
}
object HistogramVector {
type HistIterator = Iterator[Histogram] with TypedIterator
val OffsetNumHistograms = 6
val OffsetFormatCode = 8 // u8: BinHistogram format code/bucket type
val OffsetBucketDefSize = 9 // # of bytes of bucket definition
val OffsetBucketDef = 11 // Start of bucket definition
val OffsetNumBuckets = 11
// After the bucket area are regions for storing the counter values or pointers to them
final def getNumBuckets(acc: MemoryReader, addr: Ptr.U8): Int = addr.add(OffsetNumBuckets).asU16.getU16(acc)
final def getNumHistograms(acc: MemoryReader, addr: Ptr.U8): Int = addr.add(OffsetNumHistograms).asU16.getU16(acc)
final def resetNumHistograms(acc: MemoryAccessor, addr: Ptr.U8): Unit =
addr.add(OffsetNumHistograms).asU16.asMut.set(acc, 0)
final def incrNumHistograms(acc: MemoryAccessor, addr: Ptr.U8): Unit =
addr.add(OffsetNumHistograms).asU16.asMut.set(acc, getNumHistograms(acc, addr) + 1)
// Note: the format code defines bucket definition format + format of each individual compressed histogram
final def formatCode(acc: MemoryReader, addr: Ptr.U8): Byte = addr.add(OffsetFormatCode).getU8(acc).toByte
final def afterBucketDefAddr(acc: MemoryReader, addr: Ptr.U8): Ptr.U8 =
addr + OffsetBucketDef + bucketDefNumBytes(acc, addr)
final def bucketDefNumBytes(acc: MemoryReader, addr: Ptr.U8): Int =
addr.add(OffsetBucketDefSize).asU16.getU16(acc)
final def bucketDefAddr(addr: Ptr.U8): Ptr.U8 = addr + OffsetBucketDef
// Matches the bucket definition whose # bytes is at (base, offset)
final def matchBucketDef(hist: BinaryHistogram.BinHistogram, acc: MemoryReader, addr: Ptr.U8): Boolean =
(hist.formatCode == formatCode(acc, addr)) &&
(hist.bucketDefNumBytes == bucketDefNumBytes(acc, addr)) && {
UnsafeUtils.equate(acc.base, acc.baseOffset + bucketDefAddr(addr).addr,
hist.buf.byteArray, hist.bucketDefOffset, hist.bucketDefNumBytes)
}
def appending(factory: MemFactory, maxBytes: Int): AppendableHistogramVector = {
val addr = factory.allocateOffheap(maxBytes)
new AppendableHistogramVector(factory, Ptr.U8(addr), maxBytes)
}
def appending2D(factory: MemFactory, maxBytes: Int): AppendableHistogramVector = {
val addr = factory.allocateOffheap(maxBytes)
new Appendable2DDeltaHistVector(factory, Ptr.U8(addr), maxBytes)
}
def appendingSect(factory: MemFactory, maxBytes: Int): AppendableHistogramVector = {
val addr = factory.allocateOffheap(maxBytes)
new AppendableSectDeltaHistVector(factory, Ptr.U8(addr), maxBytes)
}
def apply(buffer: ByteBuffer): HistogramReader = apply(MemoryReader.fromByteBuffer(buffer), 0)
import WireFormat._
def apply(acc: MemoryReader, p: BinaryVectorPtr): HistogramReader = BinaryVector.vectorType(acc, p) match {
case x if x == WireFormat(VECTORTYPE_HISTOGRAM, SUBTYPE_H_SIMPLE) => new RowHistogramReader(acc, Ptr.U8(p))
case x if x == WireFormat(VECTORTYPE_HISTOGRAM, SUBTYPE_H_SECTDELTA) =>new SectDeltaHistogramReader(acc, Ptr.U8(p))
}
// Thread local buffer used as temp buffer for histogram vector encoding ONLY
private val tlEncodingBuf = new ThreadLocal[MutableDirectBuffer]()
private[memory] def encodingBuf: MutableDirectBuffer = tlEncodingBuf.get match {
case UnsafeUtils.ZeroPointer => val buf = new ExpandableArrayBuffer(4096)
tlEncodingBuf.set(buf)
buf
case b: MutableDirectBuffer => b
}
}
/**
* A HistogramVector appender storing compressed histogram values for less storage space.
* This is a Section-based vector - sections of up to 64 histograms are stored at a time.
* It stores histograms up to a maximum allowed size (since histograms are variable length)
* Note that the bucket schema is not set until getting the first item.
* This one stores the compressed histograms as-is, with no other transformation.
*
* Read/Write/Lock semantics: everything is gated by the number of elements.
* When it is 0, nothing is initialized so the reader guards against that.
* When it is > 0, then all structures are initialized.
*/
class AppendableHistogramVector(factory: MemFactory,
vectPtr: Ptr.U8,
val maxBytes: Int) extends BinaryAppendableVector[DirectBuffer] with SectionWriter {
import HistogramVector._
import BinaryHistogram._
protected def vectSubType: Int = WireFormat.SUBTYPE_H_SIMPLE
// Initialize header
BinaryVector.writeMajorAndSubType(MemoryAccessor.nativePtrAccessor,
addr, WireFormat.VECTORTYPE_HISTOGRAM, vectSubType)
reset()
final def addr: BinaryVectorPtr = vectPtr.addr
def maxElementsPerSection: IntU8 = IntU8(64)
val dispose = () => {
// free our own memory
factory.freeMemory(addr)
}
final def numBytes: Int = vectPtr.asI32.getI32(nativePtrReader) + 4
final def length: Int = getNumHistograms(nativePtrReader, vectPtr)
final def isAvailable(index: Int): Boolean = true
final def isAllNA: Boolean = (length == 0)
final def noNAs: Boolean = (length > 0)
private def setNumBytes(len: Int): Unit = {
require(len >= 0)
vectPtr.asI32.asMut.set(MemoryAccessor.nativePtrAccessor, len)
}
// NOTE: to eliminate allocations, re-use the DirectBuffer and keep passing the same instance to addData
final def addData(buf: DirectBuffer): AddResponse = {
val h = BinHistogram(buf)
// Validate it's a valid bin histogram
if (buf.capacity < 5 || !isValidFormatCode(h.formatCode) ||
h.formatCode == HistFormat_Null) {
return InvalidHistogram
}
if (h.bucketDefNumBytes > h.totalLength) return InvalidHistogram
val numItems = getNumHistograms(nativePtrReader, vectPtr)
if (numItems == 0) {
// Copy the bucket definition and set the bucket def size
UnsafeUtils.unsafe.copyMemory(buf.byteArray, h.bucketDefOffset,
UnsafeUtils.ZeroPointer, bucketDefAddr(vectPtr).addr,
h.bucketDefNumBytes)
UnsafeUtils.setShort(addr + OffsetBucketDefSize, h.bucketDefNumBytes.toShort)
UnsafeUtils.setByte(addr + OffsetFormatCode, h.formatCode)
// Initialize the first section
val firstSectPtr = afterBucketDefAddr(nativePtrReader, vectPtr)
initSectionWriter(firstSectPtr, ((vectPtr + maxBytes).addr - firstSectPtr.addr).toInt)
} else {
// check the bucket schema is identical. If not, return BucketSchemaMismatch
if (!matchBucketDef(h, nativePtrReader, vectPtr)) return BucketSchemaMismatch
}
val res = appendHist(buf, h, numItems)
if (res == Ack) {
// set new number of bytes first. Remember to exclude initial 4 byte length prefix
setNumBytes(maxBytes - bytesLeft - 4)
// Finally, increase # histograms which is the ultimate safe gate for access by readers
incrNumHistograms(MemoryAccessor.nativePtrAccessor, vectPtr)
}
res
}
// Inner method to add the histogram to this vector
protected def appendHist(buf: DirectBuffer, h: BinHistogram, numItems: Int): AddResponse = {
appendBlob(buf.byteArray, buf.addressOffset + h.valuesIndex, h.valuesNumBytes)
}
final def addNA(): AddResponse = Ack // TODO: Add a 0 to every appender
def addFromReaderNoNA(reader: RowReader, col: Int): AddResponse = addData(reader.blobAsBuffer(col))
def copyToBuffer: Buffer[DirectBuffer] = ???
def apply(index: Int): DirectBuffer = ???
def finishCompaction(newAddress: BinaryRegion.NativePointer): BinaryVectorPtr = newAddress
// NOTE: do not access reader below unless this vect is nonempty. TODO: fix this, or don't if we don't use this class
lazy val reader: VectorDataReader = new RowHistogramReader(nativePtrReader, vectPtr)
def reset(): Unit = {
resetNumHistograms(MemoryAccessor.nativePtrAccessor, vectPtr)
setNumBytes(OffsetNumBuckets + 2)
}
// We don't optimize -- for now. Histograms are already stored compressed.
// In future, play with other optimization strategies, such as delta encoding.
}
/**
* An appender for Prom-style histograms that increase over time.
* It stores deltas between successive histograms to save space, but the histograms are assumed to be always
* increasing. If they do not increase, then that is considered a "reset" and recorded as such for
* counter correction during queries.
* Great for compression but recovering original value means summing up all the diffs :(
*/
class Appendable2DDeltaHistVector(factory: MemFactory,
vectPtr: Ptr.U8,
maxBytes: Int) extends AppendableHistogramVector(factory, vectPtr, maxBytes) {
import BinaryHistogram._
import HistogramVector._
override def vectSubType: Int = WireFormat.SUBTYPE_H_2DDELTA
private var repackSink = BinaryHistogram.empty2DSink
// TODO: handle corrections correctly. :D
override def appendHist(buf: DirectBuffer, h: BinHistogram, numItems: Int): AddResponse = {
// Must initialize sink correctly at beg once the actual # buckets are known
// Also, we need to write repacked diff histogram to a temporary buffer, as appendBlob needs to know the size
// before writing.
if (repackSink == BinaryHistogram.empty2DSink)
repackSink = NibblePack.DeltaDiffPackSink(new Array[Long](h.numBuckets), encodingBuf)
// Recompress hist based on delta from last hist, write to temp storage. Note that no matter what
// we HAVE to feed each incoming hist through the sink, to properly seed the last hist values.
repackSink.writePos = 0
NibblePack.unpackToSink(h.valuesByteSlice, repackSink, h.numBuckets)
// See if we are at beginning of section. If so, write the original histogram. If not, repack and write diff
if (repackSink.valueDropped) {
repackSink.reset()
newSectionWithBlob(buf.byteArray, buf.addressOffset + h.valuesIndex, h.valuesNumBytes, Section.TypeDrop)
} else if (numItems == 0 || needNewSection(h.valuesNumBytes)) {
repackSink.reset()
appendBlob(buf.byteArray, buf.addressOffset + h.valuesIndex, h.valuesNumBytes)
} else {
val repackedLen = repackSink.writePos
repackSink.reset()
appendBlob(encodingBuf.byteArray, encodingBuf.addressOffset, repackedLen)
}
}
}
/**
* Appender for Prom-style increasing counter histograms of fixed buckets.
* Unlike 2DDelta, it stores deltas from the first (original) histogram of a section, so that the original
* histogram can easily be recovered just by one add.
*/
class AppendableSectDeltaHistVector(factory: MemFactory,
vectPtr: Ptr.U8,
maxBytes: Int) extends AppendableHistogramVector(factory, vectPtr, maxBytes) {
import BinaryHistogram._
import HistogramVector._
override def vectSubType: Int = WireFormat.SUBTYPE_H_SECTDELTA
private var repackSink = BinaryHistogram.emptySectSink
// Default to smaller section sizes to maximize compression
override final def maxElementsPerSection: IntU8 = IntU8(16)
override def appendHist(buf: DirectBuffer, h: BinHistogram, numItems: Int): AddResponse = {
// Initial histogram: set up new sink with first=true flag / just init with # of buckets
if (repackSink == BinaryHistogram.emptySectSink)
repackSink = new NibblePack.DeltaSectDiffPackSink(h.numBuckets, encodingBuf)
// Recompress hist based on original delta. Do this for ALL histograms so drop detection works correctly
repackSink.writePos = 0
NibblePack.unpackToSink(h.valuesByteSlice, repackSink, h.numBuckets)
// If need new section, append blob. Reset state for originalDeltas as needed.
if (repackSink.valueDropped) {
repackSink.reset()
repackSink.setOriginal()
newSectionWithBlob(buf.byteArray, buf.addressOffset + h.valuesIndex, h.valuesNumBytes, Section.TypeDrop)
} else if (numItems == 0 || needNewSection(h.valuesNumBytes)) {
repackSink.reset()
repackSink.setOriginal()
appendBlob(buf.byteArray, buf.addressOffset + h.valuesIndex, h.valuesNumBytes)
} else {
val repackedLen = repackSink.writePos
repackSink.reset()
appendBlob(encodingBuf.byteArray, encodingBuf.addressOffset, repackedLen)
}
}
override lazy val reader: VectorDataReader = new SectDeltaHistogramReader(nativePtrReader, vectPtr)
}
trait HistogramReader extends VectorDataReader {
def acc: MemoryReader
def buckets: HistogramBuckets
def apply(index: Int): HistogramWithBuckets
def sum(start: Int, end: Int): MutableHistogram
}
/**
* A reader for row-based Histogram vectors. Mostly contains logic to skip around the vector to find the right
* record pointer.
*/
class RowHistogramReader(val acc: MemoryReader, histVect: Ptr.U8) extends HistogramReader with SectionReader {
import HistogramVector._
final def length: Int = getNumHistograms(acc, histVect)
val numBuckets = if (length > 0) getNumBuckets(acc, histVect) else 0
val buckets = HistogramBuckets(acc, bucketDefAddr(histVect).add(-2), formatCode(acc, histVect))
val returnHist = LongHistogram(buckets, new Array[Long](buckets.numBuckets))
val endAddr = histVect + histVect.asI32.getI32(acc) + 4
def firstSectionAddr: Ptr.U8 = afterBucketDefAddr(acc, histVect)
/**
* Iterates through each histogram. Note this is expensive due to materializing the Histogram object
* every time. Using higher level functions such as sum is going to be a much better bet usually.
*/
def iterate(accNotUsed: MemoryReader, vectorNotUsed: BinaryVectorPtr, startElement: Int): TypedIterator =
new Iterator[Histogram] with TypedIterator {
var elem = startElement
def hasNext: Boolean = elem < getNumHistograms(acc, histVect)
def next: Histogram = {
val h = apply(elem)
elem += 1
h
}
}
def length(accNotUsed: MemoryReader, vectorNotUsed: BinaryVectorPtr): Int = length
protected val dSink = NibblePack.DeltaSink(returnHist.values)
// WARNING: histogram returned is shared between calls, do not reuse!
def apply(index: Int): HistogramWithBuckets = {
require(length > 0)
val histPtr = locate(index)
val histLen = histPtr.asU16.getU16(acc)
val buf = BinaryHistogram.valuesBuf
acc.wrapInto(buf, histPtr.add(2).addr, histLen)
dSink.reset()
NibblePack.unpackToSink(buf, dSink, numBuckets)
returnHist
}
// sum_over_time returning a Histogram with sums for each bucket. Start and end are inclusive row numbers
// NOTE: for now this is just a dumb implementation that decompresses each histogram fully
final def sum(start: Int, end: Int): MutableHistogram = {
require(length > 0 && start >= 0 && end < length)
val summedHist = MutableHistogram.empty(buckets)
for { i <- start to end optimized } {
summedHist.addNoCorrection(apply(i))
}
summedHist
}
}
final case class HistogramCorrection(lastValue: LongHistogram, correction: LongHistogram) extends CorrectionMeta
trait CounterHistogramReader extends HistogramReader with CounterVectorReader {
def correctedValue(n: Int, meta: CorrectionMeta): HistogramWithBuckets
}
/**
* A reader for SectDelta encoded histograms, including correction/drop functionality
*/
class SectDeltaHistogramReader(acc2: MemoryReader, histVect: Ptr.U8)
extends RowHistogramReader(acc2, histVect) with CounterHistogramReader {
// baseHist is section base histogram; summedHist used to compute base + delta or other sums
private val summedHist = LongHistogram.empty(buckets)
private val baseHist = summedHist.copy
private val baseSink = NibblePack.DeltaSink(baseHist.values)
// override setSection: also set the base histogram for getting real value
override def setSection(sectAddr: Ptr.U8, newElemNo: Int = 0): Unit = {
super.setSection(sectAddr, newElemNo)
// unpack to baseHist
baseSink.reset()
val buf = BinaryHistogram.valuesBuf
acc.wrapInto(buf, curHist.add(2).addr, curHist.asU16.getU16(acc))
NibblePack.unpackToSink(buf, baseSink, numBuckets)
}
override def apply(index: Int): HistogramWithBuckets = {
require(length > 0)
val histPtr = locate(index)
// Just return the base histogram if we are at start of section
if (index == sectStartingElemNo) baseHist
else {
val histLen = histPtr.asU16.getU16(acc)
val buf = BinaryHistogram.valuesBuf
acc.wrapInto(buf, histPtr.add(2).addr, histLen)
dSink.reset()
NibblePack.unpackToSink(buf, dSink, numBuckets)
summedHist.populateFrom(baseHist)
summedHist.add(returnHist)
summedHist
}
}
// TODO: optimized summing. It's wasteful to apply the base + delta math so many times ...
// instead add delta + base * n if possible. However, do we care about sum performance on increasing histograms?
def detectDropAndCorrection(accNotUsed: MemoryReader, vectorNotUsed: BinaryVectorPtr,
meta: CorrectionMeta): CorrectionMeta = meta match {
case NoCorrection => meta // No last value, cannot compare. Just pass it on.
case h @ HistogramCorrection(lastValue, correction) =>
val firstValue = apply(0)
// Last value is the new delta correction. Also assume the correction is already a cloned independent thing
if (firstValue < lastValue) {
correction.add(lastValue)
h
} else { meta }
}
// code to go through and build a list of corrections and corresponding index values.. (dropIndex, correction)
private lazy val corrections = {
var index = 0
// Step 1: build an iterator of (starting-index, section) for each section
iterateSections.map { case (s) => val o = (index, s); index += s.numElements(acc); o }.collect {
case (i, s) if i > 0 && s.sectionType(acc) == Section.TypeDrop =>
(i, apply(i - 1).asInstanceOf[LongHistogram].copy)
}.toBuffer
}
def dropPositions(accNotUsed: MemoryReader, vectorNotUsed: BinaryVectorPtr): debox.Buffer[Int] = {
val res = debox.Buffer.empty[Int]
corrections.foreach { case (dropPos, hist) =>
res += dropPos
}
res
}
def updateCorrection(accNotUsed: MemoryReader, vectorNotUsed: BinaryVectorPtr,
meta: CorrectionMeta): CorrectionMeta = {
val correction = meta match {
case NoCorrection => LongHistogram.empty(buckets)
case HistogramCorrection(_, corr) => corr
}
// Go through and add corrections
corrections.foreach { case (_, corr) => correction.add(corr) }
HistogramCorrection(apply(length - 1).asInstanceOf[LongHistogram].copy, correction)
}
def correctedValue(n: Int, meta: CorrectionMeta): HistogramWithBuckets = {
// Get the raw histogram value -- COPY it as we need to modify it, and also
// calling corrections below might modify the temporary value
val h = apply(n).asInstanceOf[LongHistogram].copy
// Apply any necessary corrections
corrections.foreach { case (ci, corr) =>
if (ci <= n) h.add(corr)
}
// Plus any carryover previous corrections
meta match {
case NoCorrection => h
case HistogramCorrection(_, corr) => h.add(corr)
h
}
}
}