Skip to content

Commit

Permalink
quantile query added
Browse files Browse the repository at this point in the history
  • Loading branch information
HimaVarsha94 committed Jul 14, 2020
1 parent 3c04bcf commit 03dd6f0
Show file tree
Hide file tree
Showing 7 changed files with 479 additions and 47 deletions.
103 changes: 102 additions & 1 deletion memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package filodb.memory.format.vectors

import java.nio.ByteOrder.LITTLE_ENDIAN
import java.util
import java.util.NoSuchElementException

import org.agrona.{DirectBuffer, MutableDirectBuffer}
import com.datadoghq.sketch.ddsketch.mapping.CubicallyInterpolatedMapping
import com.datadoghq.sketch.ddsketch.store.Bin
import scalaxy.loops._

import filodb.memory.format._

/**
Expand Down Expand Up @@ -324,6 +327,91 @@ final case class MaxHistogram(innerHist: MutableHistogram, max: Double) extends
}
}

final case class DDSHistogram(relativeAccuracy: Double) extends HistogramWithBuckets {
def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = ???

val logGamma: Double = Math.log((1 + relativeAccuracy) / (1 - relativeAccuracy))
val maxIndexedValue = maxIndexableValue
val minIndexedValue = minIndexableValue
var zeroCount = 0
var indexMapping = new CubicallyInterpolatedMapping(relativeAccuracy)
var store = new UnboundedSizeDenseStore
final def bucketValue(no: Int): Double = 0
def buckets: HistogramBuckets = ???

def getCount: Long = zeroCount + store.getTotalCount

override def quantile(q: Double): Double = {
getValueAtQuantile(q, getCount)
}

def getValueAtQuantile(quantile: Double, count: Long): Double = {
if (quantile < 0 || quantile > 1) throw new IllegalArgumentException("The quantile must be between 0 and 1.")
if (count == 0) throw new NoSuchElementException
val rank: Long = (quantile * (count - 1)).toLong
if (rank < zeroCount) return 0
var bin: Bin = null
if (quantile <= 0.5) {
val binIterator: util.Iterator[Bin] = store.getAscendingIterator
var n: Long = zeroCount
do {
bin = binIterator.next
n += bin.getCount
} while ( {
n <= rank && binIterator.hasNext
})
}
else {
val binIterator: util.Iterator[Bin] = store.getDescendingIterator
var n: Long = count
do {
bin = binIterator.next
n -= bin.getCount
} while ( {
n > rank && binIterator.hasNext
})
}
indexMapping.value(bin.getIndex)
}

def compare(other: DDSHistogram): Int = {
if (logGamma == other.logGamma) return 1
return 0
}

def accept(value: Double): Unit = {
checkValueTrackable(value)
if (value < minIndexedValue) zeroCount += 1
else store.add(indexMapping.index(value))
}

def mergeWith(other: DDSHistogram) : Unit = {
this.store.mergeWith(other.store)
}

private def checkValueTrackable(value: Double): Unit = {
if (value < 0 || value > maxIndexedValue) throw new IllegalArgumentException("The input value is outside the range that is tracked by the sketch.")
}

def minIndexableValue: Double = {
Math.max(Math.exp((Integer.MIN_VALUE + 1) * logGamma), // so that index >= Integer.MIN_VALUE
Double.MinValue * Math.exp(logGamma))
} // so that Math.exp(index * logGamma) >= Double.MIN_NORMAL)

def maxIndexableValue: Double = Math.min(Math.exp(Integer.MAX_VALUE * logGamma), // so that index <= Integer.MAX_VALUE
Double.MaxValue / (1 + relativeAccuracy)) // so that value >= Double.MAX_VALUE)

def getMinValue: Double = if (zeroCount > 0) 0
else indexMapping.value(store.getMinIndex)

def getMaxValue: Double = if (zeroCount > 0 && store.isEmpty) 0
else indexMapping.value(store.getMaxIndex)
}

object DDSHistogram {
def empty(relativeAccuracy:Double): DDSHistogram = DDSHistogram(relativeAccuracy)
}

/**
* A scheme for buckets in a histogram. Since these are Prometheus-style histograms,
* each bucket definition consists of occurrences of numbers which are less than or equal to the bucketTop
Expand Down Expand Up @@ -424,6 +512,9 @@ object HistogramBuckets {
val binaryBuckets64 = GeometricBuckets(2.0d, 2.0d, 64, minusOne = true)

val emptyBuckets = GeometricBuckets(2.0d, 2.0d, 0)

val emptyDDSBuckets = DDSBuckets(0.5, 0)

}

/**
Expand Down Expand Up @@ -476,3 +567,13 @@ final case class CustomBuckets(les: Array[Double]) extends HistogramBuckets {

override def hashCode: Int = les.hashCode
}

final case class DDSBuckets(gamma: Double,
numBuckets: Int) extends HistogramBuckets {
final def bucketTop(no: Int): Double = 1 * Math.pow(gamma, no)
final def getGamma(): Double = gamma

final def serialize(buf: MutableDirectBuffer, pos: Int): Int = ???

}

Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ object BinaryHistogram extends StrictLogging {
val formatCode = if (buckets.numBuckets == 0) HistFormat_Null else buckets match {
case g: GeometricBuckets => HistFormat_Geometric_XOR
case c: CustomBuckets => HistFormat_Custom_XOR
// TODO change this
case d: DDSBuckets => HistFormat_Custom_Delta
}

buf.putByte(2, formatCode)
Expand Down

0 comments on commit 03dd6f0

Please sign in to comment.