Skip to content

Commit

Permalink
Fixing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
ezvz committed Dec 5, 2023
1 parent a62c1d4 commit 63ae081
Show file tree
Hide file tree
Showing 72 changed files with 707 additions and 510 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package ai.chronon.aggregator.base

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.base.TimeTuple.typ
import ai.chronon.api._

import java.util

object TimeTuple extends Ordering[util.ArrayList[Any]] {
private val logger = LoggerFactory.getLogger(getClass)
type typ = util.ArrayList[Any]

def `type`(inputType: DataType): DataType =
Expand Down Expand Up @@ -53,6 +55,7 @@ object TimeTuple extends Ordering[util.ArrayList[Any]] {
}

abstract class TimeOrdered(inputType: DataType) extends TimedAggregator[Any, TimeTuple.typ, Any] {
private val logger = LoggerFactory.getLogger(getClass)
override def outputType: DataType = inputType

override def irType: DataType = TimeTuple.`type`(inputType)
Expand All @@ -72,6 +75,7 @@ abstract class TimeOrdered(inputType: DataType) extends TimedAggregator[Any, Tim
}

class First(inputType: DataType) extends TimeOrdered(inputType) {
private val logger = LoggerFactory.getLogger(getClass)
//mutating
override def update(
ir: util.ArrayList[Any],
Expand All @@ -92,6 +96,7 @@ class First(inputType: DataType) extends TimeOrdered(inputType) {
}

class Last(inputType: DataType) extends TimeOrdered(inputType) {
private val logger = LoggerFactory.getLogger(getClass)
//mutating
override def update(
ir: util.ArrayList[Any],
Expand Down Expand Up @@ -119,6 +124,7 @@ class OrderByLimitTimed(
limit: Int,
ordering: Ordering[TimeTuple.typ]
) extends TimedAggregator[Any, util.ArrayList[TimeTuple.typ], util.ArrayList[Any]] {
private val logger = LoggerFactory.getLogger(getClass)
type Container = util.ArrayList[TimeTuple.typ]
private val minHeap = new MinHeap[TimeTuple.typ](limit, ordering)

Expand All @@ -129,7 +135,7 @@ class OrderByLimitTimed(
override final def prepare(input: Any, ts: Long): Container = {
// val gson = new Gson()
val tuple = TimeTuple.make(ts, input)
// println(s"init: ${gson.toJson(tuple)}")
// logger.info(s"init: ${gson.toJson(tuple)}")
val arr = new Container()
arr.add(tuple)
arr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.windowing

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.aggregator.windowing.HopsAggregator._
import ai.chronon.api.Extensions.{AggregationOps, AggregationsOps, WindowOps, WindowUtils}
Expand All @@ -31,6 +32,7 @@ import java.util
// t
class HopsAggregatorBase(aggregations: Seq[Aggregation], inputSchema: Seq[(String, DataType)], resolution: Resolution)
extends Serializable {
private val logger = LoggerFactory.getLogger(getClass)

@transient lazy val rowAggregator =
new RowAggregator(inputSchema, aggregations.flatMap(_.unWindowed))
Expand Down Expand Up @@ -93,6 +95,7 @@ class HopsAggregator(minQueryTs: Long,
inputSchema: Seq[(String, DataType)],
resolution: Resolution)
extends HopsAggregatorBase(aggregations, inputSchema, resolution) {
private val logger = LoggerFactory.getLogger(getClass)

val leftBoundaries: Array[Option[Long]] = {
// Nikhil is pretty confident we won't call this when aggregations is empty
Expand Down Expand Up @@ -135,7 +138,7 @@ class HopsAggregator(minQueryTs: Long,
.zip(readableLeftBounds)
.map { case (hop, left) => s"$hop->$left" }
.mkString(", ")
println(s"""Left bounds: $readableHopsToBoundsMap
logger.info(s"""Left bounds: $readableHopsToBoundsMap
|minQueryTs = ${TsUtils.toStr(minQueryTs)}""".stripMargin)
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.windowing

import org.slf4j.LoggerFactory
import scala.collection.Seq
import ai.chronon.api.Extensions.{AggregationPartOps, WindowOps}
import ai.chronon.api._
Expand All @@ -31,6 +32,7 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,
inputSchema: Seq[(String, DataType)],
resolution: Resolution,
tailBufferMillis: Long) {
private val logger = LoggerFactory.getLogger(getClass)

// logically, batch response is arranged like so
// sum-90d => sum_ir_88d, [(sum_ir_1d, ts)] -> 1d is the hopSize for 90d
Expand All @@ -46,10 +48,10 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,

val batchTailTs: Array[Option[Long]] = tailTs(batchEndTs)

println(s"Batch End: ${TsUtils.toStr(batchEndTs)}")
println("Window Tails: ")
logger.info(s"Batch End: ${TsUtils.toStr(batchEndTs)}")
logger.info("Window Tails: ")
for (i <- windowMappings.indices) {
println(s" ${windowMappings(i).aggregationPart.outputColumnName} -> ${batchTailTs(i).map(TsUtils.toStr)}")
logger.info(s" ${windowMappings(i).aggregationPart.outputColumnName} -> ${batchTailTs(i).map(TsUtils.toStr)}")
}

def update(batchIr: BatchIr, row: Row): BatchIr = update(batchEndTs, batchIr, row, batchTailTs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@

package ai.chronon.aggregator.test

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.base.ApproxDistinctCount
import junit.framework.TestCase
import org.junit.Assert._

class ApproxDistinctTest extends TestCase {
private val logger = LoggerFactory.getLogger(getClass)
def testErrorBound(uniques: Int, errorBound: Int, lgK: Int): Unit = {
val uniqueElems = 1 to uniques
val duplicates = uniqueElems ++ uniqueElems ++ uniqueElems
val counter = new ApproxDistinctCount[Long](lgK)
val ir = counter.prepare(duplicates.head)
duplicates.tail.foreach { elem => counter.update(ir, elem) }
val estimated = counter.finalize(ir)
// println(s"estimated - $estimated, actual - $uniques, bound - $errorBound")
// logger.info(s"estimated - $estimated, actual - $uniques, bound - $errorBound")
assertTrue(Math.abs(estimated - uniques) < errorBound)
}

Expand All @@ -46,7 +48,7 @@ class ApproxDistinctTest extends TestCase {
}
val ir = irList.reduceLeft(counter.merge)
val estimated = counter.finalize(ir)
// println(s"estimated - $estimated, actual - $uniques, bound - $errorBound")
// logger.info(s"estimated - $estimated, actual - $uniques, bound - $errorBound")
assertTrue(Math.abs(estimated - uniques) < errorBound)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.test

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.base.ApproxPercentiles
import ai.chronon.aggregator.row.StatsGenerator
import com.yahoo.sketches.kll.KllFloatsSketch
Expand All @@ -25,6 +26,7 @@ import org.junit.Assert._
import scala.util.Random

class ApproxPercentilesTest extends TestCase {
private val logger = LoggerFactory.getLogger(getClass)
def testBasicImpl(nums: Int, slide: Int, k: Int, percentiles: Array[Double], errorPercent: Float): Unit = {
val sorted = (0 to nums).map(_.toFloat)
val elems = Random.shuffle(sorted.toList).toArray
Expand All @@ -42,7 +44,7 @@ class ApproxPercentilesTest extends TestCase {
val expected = result.indices.map(_ * step).map(_.toFloat).toArray
val diffs = result.indices.map(i => Math.abs(result(i) - expected(i)))
val errorMargin = (nums.toFloat * errorPercent) / 100.0
println(s"""
logger.info(s"""
|sketch size: ${merged.getSerializedSizeBytes}
|result: ${result.toVector}
|result size: ${result.size}
Expand All @@ -66,7 +68,7 @@ class ApproxPercentilesTest extends TestCase {
sample1.map(sketch1.update)
sample2.map(sketch2.update)
val drift = StatsGenerator.PSIKllSketch(sketch1.toByteArray, sketch2.toByteArray).asInstanceOf[Double]
println(s"PSI drift: $drift")
logger.info(s"PSI drift: $drift")
drift
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.test

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.api._
import junit.framework.TestCase
Expand All @@ -25,6 +26,7 @@ import java.util
import scala.collection.JavaConverters._

class TestRow(val fieldsSeq: Any*)(tsIndex: Int = 0) extends Row {
private val logger = LoggerFactory.getLogger(getClass)
val fields: util.List[Any] = new java.util.ArrayList[Any](fieldsSeq.asJava)
override val length: Int = fields.size()

Expand All @@ -39,7 +41,7 @@ class TestRow(val fieldsSeq: Any*)(tsIndex: Int = 0) extends Row {

override def mutationTs: Long = timeStamp

def print(): Unit = println(fieldsSeq)
def print(): Unit = logger.info(fieldsSeq)

def set(index: Int, any: Any): Unit = fields.set(index, any)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.test

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.aggregator.test.SawtoothAggregatorTest.sawtoothAggregate
import ai.chronon.aggregator.windowing._
Expand All @@ -30,6 +31,7 @@ import scala.collection.mutable
import scala.collection.Seq

class Timer {
private val logger = LoggerFactory.getLogger(getClass)

var ts: Long = System.currentTimeMillis()

Expand All @@ -38,7 +40,7 @@ class Timer {
// TODO: Write this out into a file checked into git
// or incorporate proper benchmarks
def publish(name: String, reset: Boolean = true): Unit = {
println(s"${name.padTo(25, ' ')} ${System.currentTimeMillis() - ts} ms")
logger.info(s"${name.padTo(25, ' ')} ${System.currentTimeMillis() - ts} ms")
if (reset) ts = System.currentTimeMillis()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package ai.chronon.aggregator.test

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.base.Variance
import junit.framework.TestCase
import org.junit.Assert._

class VarianceTest extends TestCase {
private val logger = LoggerFactory.getLogger(getClass)

def mean(elems: Seq[Double]): Double = elems.sum / elems.length
def naive(elems: Seq[Double]): Double = {
Expand Down Expand Up @@ -52,8 +54,8 @@ class VarianceTest extends TestCase {
val nums = (0 until cardinality).map { _ => min + math.random * (max - min) }
val naiveResult = naive(nums)
val welfordResult = welford(nums)
println(s"naive $naiveResult - welford $welfordResult - sum of squares ${sumOfSquares(nums)}")
println((naiveResult - welfordResult) / naiveResult)
logger.info(s"naive $naiveResult - welford $welfordResult - sum of squares ${sumOfSquares(nums)}")
logger.info((naiveResult - welfordResult) / naiveResult)
assertTrue((naiveResult - welfordResult) / naiveResult < 0.0000001)
}

Expand Down

0 comments on commit 63ae081

Please sign in to comment.