Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): Quantile query implementation using DDSketch #817

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 102 additions & 1 deletion memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package filodb.memory.format.vectors

import java.nio.ByteOrder.LITTLE_ENDIAN
import java.util
import java.util.NoSuchElementException

import org.agrona.{DirectBuffer, MutableDirectBuffer}
import com.datadoghq.sketch.ddsketch.mapping.CubicallyInterpolatedMapping
import com.datadoghq.sketch.ddsketch.store.Bin
import scalaxy.loops._

import filodb.memory.format._

/**
Expand Down Expand Up @@ -324,6 +327,91 @@ final case class MaxHistogram(innerHist: MutableHistogram, max: Double) extends
}
}

final case class DDSHistogram(relativeAccuracy: Double) extends HistogramWithBuckets {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add ScalaDoc here with description.... what are the units or bounds of relativeAccuracy? what are typical values?

def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = ???

val logGamma: Double = Math.log((1 + relativeAccuracy) / (1 - relativeAccuracy))
val maxIndexedValue = maxIndexableValue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a self-reference. I see later there is a method named the same, but for clarify you should rename the method, or since this is just initialized once, you could probably move the init code to up here.

val minIndexedValue = minIndexableValue
var zeroCount = 0
var indexMapping = new CubicallyInterpolatedMapping(relativeAccuracy)
var store = new UnboundedSizeDenseStore
final def bucketValue(no: Int): Double = 0
def buckets: HistogramBuckets = ???

def getCount: Long = zeroCount + store.getTotalCount

override def quantile(q: Double): Double = {
getValueAtQuantile(q, getCount)
}

def getValueAtQuantile(quantile: Double, count: Long): Double = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose you had to implement this due to some reason you could not use the underlying DDSketch's own code? Please add comment

if (quantile < 0 || quantile > 1) throw new IllegalArgumentException("The quantile must be between 0 and 1.")
if (count == 0) throw new NoSuchElementException
val rank: Long = (quantile * (count - 1)).toLong
if (rank < zeroCount) return 0
var bin: Bin = null
if (quantile <= 0.5) {
val binIterator: util.Iterator[Bin] = store.getAscendingIterator
var n: Long = zeroCount
do {
bin = binIterator.next
n += bin.getCount
} while ( {
n <= rank && binIterator.hasNext
})
}
else {
val binIterator: util.Iterator[Bin] = store.getDescendingIterator
var n: Long = count
do {
bin = binIterator.next
n -= bin.getCount
} while ( {
n > rank && binIterator.hasNext
})
}
indexMapping.value(bin.getIndex)
}

def compare(other: DDSHistogram): Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to only compare logGamma and not the buckets? Not sure of the purpose, and why we return an Int (as opposed to Boolean)

if (logGamma == other.logGamma) return 1
return 0
}

def accept(value: Double): Unit = {
checkValueTrackable(value)
if (value < minIndexedValue) zeroCount += 1
else store.add(indexMapping.index(value))
}

def mergeWith(other: DDSHistogram) : Unit = {
this.store.mergeWith(other.store)
}

private def checkValueTrackable(value: Double): Unit = {
if (value < 0 || value > maxIndexedValue) throw new IllegalArgumentException("The input value is outside the range that is tracked by the sketch.")
}

def minIndexableValue: Double = {
Math.max(Math.exp((Integer.MIN_VALUE + 1) * logGamma), // so that index >= Integer.MIN_VALUE
Double.MinValue * Math.exp(logGamma))
} // so that Math.exp(index * logGamma) >= Double.MIN_NORMAL)

def maxIndexableValue: Double = Math.min(Math.exp(Integer.MAX_VALUE * logGamma), // so that index <= Integer.MAX_VALUE
Double.MaxValue / (1 + relativeAccuracy)) // so that value >= Double.MAX_VALUE)

def getMinValue: Double = if (zeroCount > 0) 0
else indexMapping.value(store.getMinIndex)

def getMaxValue: Double = if (zeroCount > 0 && store.isEmpty) 0
else indexMapping.value(store.getMaxIndex)
}

object DDSHistogram {
def empty(relativeAccuracy:Double): DDSHistogram = DDSHistogram(relativeAccuracy)
}

/**
* A scheme for buckets in a histogram. Since these are Prometheus-style histograms,
* each bucket definition consists of occurrences of numbers which are less than or equal to the bucketTop
Expand Down Expand Up @@ -424,6 +512,9 @@ object HistogramBuckets {
val binaryBuckets64 = GeometricBuckets(2.0d, 2.0d, 64, minusOne = true)

val emptyBuckets = GeometricBuckets(2.0d, 2.0d, 0)

val emptyDDSBuckets = DDSBuckets(0.5, 0)

}

/**
Expand Down Expand Up @@ -476,3 +567,13 @@ final case class CustomBuckets(les: Array[Double]) extends HistogramBuckets {

override def hashCode: Int = les.hashCode
}

final case class DDSBuckets(gamma: Double,
numBuckets: Int) extends HistogramBuckets {
final def bucketTop(no: Int): Double = 1 * Math.pow(gamma, no)
final def getGamma(): Double = gamma

final def serialize(buf: MutableDirectBuffer, pos: Int): Int = ???

}

Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ object BinaryHistogram extends StrictLogging {
val formatCode = if (buckets.numBuckets == 0) HistFormat_Null else buckets match {
case g: GeometricBuckets => HistFormat_Geometric_XOR
case c: CustomBuckets => HistFormat_Custom_XOR
// TODO change this
case d: DDSBuckets => HistFormat_Custom_Delta
}

buf.putByte(2, formatCode)
Expand Down