-
Notifications
You must be signed in to change notification settings - Fork 224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(query): Quantile query implementation using DDSketch #817
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,13 @@ | ||
package filodb.memory.format.vectors | ||
|
||
import java.nio.ByteOrder.LITTLE_ENDIAN | ||
import java.util | ||
import java.util.NoSuchElementException | ||
|
||
import org.agrona.{DirectBuffer, MutableDirectBuffer} | ||
import com.datadoghq.sketch.ddsketch.mapping.CubicallyInterpolatedMapping | ||
import com.datadoghq.sketch.ddsketch.store.Bin | ||
import scalaxy.loops._ | ||
|
||
import filodb.memory.format._ | ||
|
||
/** | ||
|
@@ -324,6 +327,91 @@ final case class MaxHistogram(innerHist: MutableHistogram, max: Double) extends | |
} | ||
} | ||
|
||
final case class DDSHistogram(relativeAccuracy: Double) extends HistogramWithBuckets { | ||
def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = ??? | ||
|
||
val logGamma: Double = Math.log((1 + relativeAccuracy) / (1 - relativeAccuracy)) | ||
val maxIndexedValue = maxIndexableValue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be a self-reference. I see later there is a method named the same, but for clarify you should rename the method, or since this is just initialized once, you could probably move the init code to up here. |
||
val minIndexedValue = minIndexableValue | ||
var zeroCount = 0 | ||
var indexMapping = new CubicallyInterpolatedMapping(relativeAccuracy) | ||
var store = new UnboundedSizeDenseStore | ||
final def bucketValue(no: Int): Double = 0 | ||
def buckets: HistogramBuckets = ??? | ||
|
||
def getCount: Long = zeroCount + store.getTotalCount | ||
|
||
override def quantile(q: Double): Double = { | ||
getValueAtQuantile(q, getCount) | ||
} | ||
|
||
def getValueAtQuantile(quantile: Double, count: Long): Double = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose you had to implement this due to some reason you could not use the underlying DDSketch's own code? Please add comment |
||
if (quantile < 0 || quantile > 1) throw new IllegalArgumentException("The quantile must be between 0 and 1.") | ||
if (count == 0) throw new NoSuchElementException | ||
val rank: Long = (quantile * (count - 1)).toLong | ||
if (rank < zeroCount) return 0 | ||
var bin: Bin = null | ||
if (quantile <= 0.5) { | ||
val binIterator: util.Iterator[Bin] = store.getAscendingIterator | ||
var n: Long = zeroCount | ||
do { | ||
bin = binIterator.next | ||
n += bin.getCount | ||
} while ( { | ||
n <= rank && binIterator.hasNext | ||
}) | ||
} | ||
else { | ||
val binIterator: util.Iterator[Bin] = store.getDescendingIterator | ||
var n: Long = count | ||
do { | ||
bin = binIterator.next | ||
n -= bin.getCount | ||
} while ( { | ||
n > rank && binIterator.hasNext | ||
}) | ||
} | ||
indexMapping.value(bin.getIndex) | ||
} | ||
|
||
def compare(other: DDSHistogram): Int = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to only compare logGamma and not the buckets? Not sure of the purpose, and why we return an Int (as opposed to Boolean) |
||
if (logGamma == other.logGamma) return 1 | ||
return 0 | ||
} | ||
|
||
def accept(value: Double): Unit = { | ||
checkValueTrackable(value) | ||
if (value < minIndexedValue) zeroCount += 1 | ||
else store.add(indexMapping.index(value)) | ||
} | ||
|
||
def mergeWith(other: DDSHistogram) : Unit = { | ||
this.store.mergeWith(other.store) | ||
} | ||
|
||
private def checkValueTrackable(value: Double): Unit = { | ||
if (value < 0 || value > maxIndexedValue) throw new IllegalArgumentException("The input value is outside the range that is tracked by the sketch.") | ||
} | ||
|
||
def minIndexableValue: Double = { | ||
Math.max(Math.exp((Integer.MIN_VALUE + 1) * logGamma), // so that index >= Integer.MIN_VALUE | ||
Double.MinValue * Math.exp(logGamma)) | ||
} // so that Math.exp(index * logGamma) >= Double.MIN_NORMAL) | ||
|
||
def maxIndexableValue: Double = Math.min(Math.exp(Integer.MAX_VALUE * logGamma), // so that index <= Integer.MAX_VALUE | ||
Double.MaxValue / (1 + relativeAccuracy)) // so that value >= Double.MAX_VALUE) | ||
|
||
def getMinValue: Double = if (zeroCount > 0) 0 | ||
else indexMapping.value(store.getMinIndex) | ||
|
||
def getMaxValue: Double = if (zeroCount > 0 && store.isEmpty) 0 | ||
else indexMapping.value(store.getMaxIndex) | ||
} | ||
|
||
object DDSHistogram { | ||
def empty(relativeAccuracy:Double): DDSHistogram = DDSHistogram(relativeAccuracy) | ||
} | ||
|
||
/** | ||
* A scheme for buckets in a histogram. Since these are Prometheus-style histograms, | ||
* each bucket definition consists of occurrences of numbers which are less than or equal to the bucketTop | ||
|
@@ -424,6 +512,9 @@ object HistogramBuckets { | |
val binaryBuckets64 = GeometricBuckets(2.0d, 2.0d, 64, minusOne = true) | ||
|
||
val emptyBuckets = GeometricBuckets(2.0d, 2.0d, 0) | ||
|
||
val emptyDDSBuckets = DDSBuckets(0.5, 0) | ||
|
||
} | ||
|
||
/** | ||
|
@@ -476,3 +567,13 @@ final case class CustomBuckets(les: Array[Double]) extends HistogramBuckets { | |
|
||
override def hashCode: Int = les.hashCode | ||
} | ||
|
||
final case class DDSBuckets(gamma: Double, | ||
numBuckets: Int) extends HistogramBuckets { | ||
final def bucketTop(no: Int): Double = 1 * Math.pow(gamma, no) | ||
final def getGamma(): Double = gamma | ||
|
||
final def serialize(buf: MutableDirectBuffer, pos: Int): Int = ??? | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add ScalaDoc here with description.... what are the units or bounds of relativeAccuracy? what are typical values?