Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing prints with logs #633

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.windowing

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.aggregator.windowing.HopsAggregator._
import ai.chronon.api.Extensions.{AggregationOps, AggregationsOps, WindowOps, WindowUtils}
Expand Down Expand Up @@ -93,6 +94,7 @@ class HopsAggregator(minQueryTs: Long,
inputSchema: Seq[(String, DataType)],
resolution: Resolution)
extends HopsAggregatorBase(aggregations, inputSchema, resolution) {
@transient lazy val logger = LoggerFactory.getLogger(getClass)

val leftBoundaries: Array[Option[Long]] = {
// Nikhil is pretty confident we won't call this when aggregations is empty
Expand Down Expand Up @@ -135,7 +137,7 @@ class HopsAggregator(minQueryTs: Long,
.zip(readableLeftBounds)
.map { case (hop, left) => s"$hop->$left" }
.mkString(", ")
println(s"""Left bounds: $readableHopsToBoundsMap
logger.info(s"""Left bounds: $readableHopsToBoundsMap
|minQueryTs = ${TsUtils.toStr(minQueryTs)}""".stripMargin)
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.windowing

import org.slf4j.LoggerFactory
import scala.collection.Seq
import ai.chronon.api.Extensions.{AggregationPartOps, WindowOps}
import ai.chronon.api._
Expand All @@ -31,6 +32,7 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,
inputSchema: Seq[(String, DataType)],
resolution: Resolution,
tailBufferMillis: Long) {
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// logically, batch response is arranged like so
// sum-90d => sum_ir_88d, [(sum_ir_1d, ts)] -> 1d is the hopSize for 90d
Expand All @@ -46,10 +48,10 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,

val batchTailTs: Array[Option[Long]] = tailTs(batchEndTs)

println(s"Batch End: ${TsUtils.toStr(batchEndTs)}")
println("Window Tails: ")
logger.info(s"Batch End: ${TsUtils.toStr(batchEndTs)}")
logger.info("Window Tails: ")
for (i <- windowMappings.indices) {
println(s" ${windowMappings(i).aggregationPart.outputColumnName} -> ${batchTailTs(i).map(TsUtils.toStr)}")
logger.info(s" ${windowMappings(i).aggregationPart.outputColumnName} -> ${batchTailTs(i).map(TsUtils.toStr)}")
}

def update(batchIr: BatchIr, row: Row): BatchIr = update(batchEndTs, batchIr, row, batchTailTs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.test

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.base.ApproxPercentiles
import ai.chronon.aggregator.row.StatsGenerator
import com.yahoo.sketches.kll.KllFloatsSketch
Expand All @@ -25,6 +26,7 @@ import org.junit.Assert._
import scala.util.Random

class ApproxPercentilesTest extends TestCase {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def testBasicImpl(nums: Int, slide: Int, k: Int, percentiles: Array[Double], errorPercent: Float): Unit = {
val sorted = (0 to nums).map(_.toFloat)
val elems = Random.shuffle(sorted.toList).toArray
Expand All @@ -42,7 +44,7 @@ class ApproxPercentilesTest extends TestCase {
val expected = result.indices.map(_ * step).map(_.toFloat).toArray
val diffs = result.indices.map(i => Math.abs(result(i) - expected(i)))
val errorMargin = (nums.toFloat * errorPercent) / 100.0
println(s"""
logger.info(s"""
|sketch size: ${merged.getSerializedSizeBytes}
|result: ${result.toVector}
|result size: ${result.size}
Expand All @@ -66,7 +68,7 @@ class ApproxPercentilesTest extends TestCase {
sample1.map(sketch1.update)
sample2.map(sketch2.update)
val drift = StatsGenerator.PSIKllSketch(sketch1.toByteArray, sketch2.toByteArray).asInstanceOf[Double]
println(s"PSI drift: $drift")
logger.info(s"PSI drift: $drift")
drift
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.aggregator.test

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.aggregator.test.SawtoothAggregatorTest.sawtoothAggregate
import ai.chronon.aggregator.windowing._
Expand All @@ -30,6 +31,7 @@ import scala.collection.mutable
import scala.collection.Seq

class Timer {
@transient lazy val logger = LoggerFactory.getLogger(getClass)

var ts: Long = System.currentTimeMillis()

Expand All @@ -38,7 +40,7 @@ class Timer {
// TODO: Write this out into a file checked into git
// or incorporate proper benchmarks
def publish(name: String, reset: Boolean = true): Unit = {
println(s"${name.padTo(25, ' ')} ${System.currentTimeMillis() - ts} ms")
logger.info(s"${name.padTo(25, ' ')} ${System.currentTimeMillis() - ts} ms")
if (reset) ts = System.currentTimeMillis()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package ai.chronon.aggregator.test

import org.slf4j.LoggerFactory
import ai.chronon.aggregator.base.Variance
import junit.framework.TestCase
import org.junit.Assert._

class VarianceTest extends TestCase {
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def mean(elems: Seq[Double]): Double = elems.sum / elems.length
def naive(elems: Seq[Double]): Double = {
Expand Down Expand Up @@ -52,8 +54,8 @@ class VarianceTest extends TestCase {
val nums = (0 until cardinality).map { _ => min + math.random * (max - min) }
val naiveResult = naive(nums)
val welfordResult = welford(nums)
println(s"naive $naiveResult - welford $welfordResult - sum of squares ${sumOfSquares(nums)}")
println((naiveResult - welfordResult) / naiveResult)
logger.info(s"naive $naiveResult - welford $welfordResult - sum of squares ${sumOfSquares(nums)}")
logger.info(((naiveResult - welfordResult) / naiveResult).toString)
assertTrue((naiveResult - welfordResult) / naiveResult < 0.0000001)
}

Expand Down
6 changes: 4 additions & 2 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.api

import org.slf4j.LoggerFactory
import ai.chronon.api.DataModel._
import ai.chronon.api.Operation._
import com.fasterxml.jackson.core.`type`.TypeReference
Expand Down Expand Up @@ -783,6 +784,7 @@ object Extensions {
}

implicit class JoinOps(val join: Join) extends Serializable {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// all keys as they should appear in left that are being used on right
def leftKeyCols: Array[String] = {
join.joinParts.toScala
Expand Down Expand Up @@ -923,7 +925,7 @@ object Extensions {
}
.filter(_.nonEmpty)
.mkString(joiner)
println(s"Generated join left side skew filter:\n $result")
logger.info(s"Generated join left side skew filter:\n $result")
result
}
}
Expand All @@ -943,7 +945,7 @@ object Extensions {
.mkString(joiner)

if (result.nonEmpty) {
println(s"Generated join part skew filter for ${joinPart.groupBy.metaData.name}:\n $result")
logger.info(s"Generated join part skew filter for ${joinPart.groupBy.metaData.name}:\n $result")
Some(result)
} else None
}
Expand Down
7 changes: 5 additions & 2 deletions api/src/main/scala/ai/chronon/api/ParametricMacro.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package ai.chronon.api

import org.slf4j.LoggerFactory
import scala.collection.mutable

// takes a map of macro names and functions and applies the functions on macro arguments
case class ParametricMacro(value: String, func: Map[String, String] => String) {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private val pattern = s"""\\{\\{\\s*$value(\\([\\s0-9A-Za-z_.,=]*\\))*\\s*}}""".r

def replace(str: String): String = {
Expand All @@ -38,7 +40,7 @@ case class ParametricMacro(value: String, func: Map[String, String] => String) {
argSeq.tail :+ (argSeq.head + "," + token)
}
}
println(parsed)
logger.info(parsed.mkString(","))
parsed.map(_.split("=").map(_.trim)).map(x => x(0) -> x(1)).toMap
}
val result = func(argMap.getOrElse(Map.empty[String, String]))
Expand All @@ -51,10 +53,11 @@ case class ParametricMacro(value: String, func: Map[String, String] => String) {
}

object ParametricMacro {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
val mc = ParametricMacro("something", { x => "st:" + x.keys.mkString("/") + "|" + x.values.mkString("/") })
val str = "something nothing-{{ something( a_1=b,, 3.1, c=d) }}-something after-{{ thing:a1=b1 }}{{ something }}"
val replaced = mc.replace(str)
println(replaced)
logger.info(replaced)
}
}
4 changes: 3 additions & 1 deletion api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.api

import org.slf4j.LoggerFactory
import ai.chronon.api.Extensions.StringsOps
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
import org.apache.thrift.protocol.{TCompactProtocol, TSimpleJSONProtocol}
Expand All @@ -28,6 +29,7 @@ import scala.reflect.ClassTag
import scala.util.ScalaJavaConversions.ListOps

object ThriftJsonCodec {
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def serializer = new TSerializer(new TSimpleJSONProtocol.Factory())

Expand Down Expand Up @@ -63,7 +65,7 @@ object ThriftJsonCodec {
base
} catch {
case e: Exception => {
println("Failed to deserialize using compact protocol, trying Json.")
logger.info("Failed to deserialize using compact protocol, trying Json.")
fromJsonStr(new String(bytes), check = false, base.getClass)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package ai.chronon.api.test

import org.slf4j.LoggerFactory
import ai.chronon.api._
import org.apache.thrift.TSerializer
import org.apache.thrift.protocol.TSimpleJSONProtocol
import org.junit.Assert._
import org.junit.Test

class DataTypeConversionTest {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
@Test
def testDataTypeToThriftAndBack(): Unit = {
// build some complex type
Expand All @@ -45,7 +47,7 @@ class DataTypeConversionTest {
// serialize with TSimpleJson - this is what python code will do
val jsonSerializer = new TSerializer(new TSimpleJSONProtocol.Factory())
val json = new String(jsonSerializer.serialize(thriftType))
println(json)
logger.info(json)

val reversedTType = ThriftJsonCodec.fromJsonStr[TDataType](json, check = true, classOf[TDataType])
val reversed = DataType.fromTDataType(reversedTType)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ai.chronon.flink

import org.slf4j.LoggerFactory
import ai.chronon.online.{Api, KVStore}
import ai.chronon.online.KVStore.PutRequest
import org.apache.flink.configuration.Configuration
Expand Down Expand Up @@ -64,6 +65,7 @@ object AsyncKVStoreWriter {
*/
class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String)
extends RichAsyncFunction[PutRequest, WriteResponse] {
private @transient lazy @transient lazy @transient lazy val logger = LoggerFactory.getLogger(getClass)

@transient private var kvStore: KVStore = _

Expand All @@ -88,7 +90,7 @@ class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String)
}

override def timeout(input: PutRequest, resultFuture: ResultFuture[WriteResponse]): Unit = {
println(s"Timed out writing to KV Store for object: $input")
logger.info(s"Timed out writing to KV Store for object: $input")
errorCounter.inc()
resultFuture.complete(util.Arrays.asList[WriteResponse](WriteResponse(input, status = false)))
}
Expand All @@ -102,15 +104,15 @@ class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String)
successCounter.inc()
} else {
errorCounter.inc()
println(s"Failed to write to KVStore for object: $input")
logger.info(s"Failed to write to KVStore for object: $input")
}
resultFuture.complete(util.Arrays.asList[WriteResponse](WriteResponse(input, status = succeeded)))
case Failure(exception) =>
// this should be rare and indicates we have an uncaught exception
// in the KVStore - we log the exception and skip the object to
// not fail the app
errorCounter.inc()
println(s"Caught exception writing to KVStore for object: $input - $exception")
logger.info(s"Caught exception writing to KVStore for object: $input - $exception")
resultFuture.complete(util.Arrays.asList[WriteResponse](WriteResponse(input, status = false)))
}
}
Expand Down
4 changes: 3 additions & 1 deletion flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ai.chronon.flink

import org.slf4j.LoggerFactory
import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.{Constants, DataModel, Query, StructType => ChrononStructType}
import ai.chronon.online.{AvroConversions, GroupByServingInfoParsed}
Expand All @@ -19,6 +20,7 @@ import scala.jdk.CollectionConverters._
*/
case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
extends RichFlatMapFunction[Map[String, Any], PutRequest] {
private @transient lazy @transient lazy @transient lazy val logger = LoggerFactory.getLogger(getClass)

@transient protected var avroConversionErrorCounter: Counter = _

Expand Down Expand Up @@ -86,7 +88,7 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
case e: Exception =>
// To improve availability, we don't rethrow the exception. We just drop the event
// and track the errors in a metric. If there are too many errors we'll get alerted/paged.
println(s"Error converting to Avro bytes - $e")
logger.info(s"Error converting to Avro bytes - $e")
avroConversionErrorCounter.inc()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ai.chronon.flink

import org.slf4j.LoggerFactory
import ai.chronon.api.Extensions.{GroupByOps, MetadataOps}
import ai.chronon.api.{Constants, GroupBy, Query, StructType => ChrononStructType}
import ai.chronon.online.{CatalystUtil, SparkConversions}
Expand Down Expand Up @@ -27,6 +28,7 @@ import scala.jdk.CollectionConverters.{asScalaBufferConverter, mapAsScalaMapConv
* @tparam T The type of the input data.
*/
class SparkExpressionEvalFn[T](encoder: Encoder[T], groupBy: GroupBy) extends RichFlatMapFunction[T, Map[String, Any]] {
private @transient lazy @transient lazy @transient lazy val logger = LoggerFactory.getLogger(getClass)

private val query: Query = groupBy.streamingSource.get.getEvents.query

Expand Down Expand Up @@ -100,7 +102,7 @@ class SparkExpressionEvalFn[T](encoder: Encoder[T], groupBy: GroupBy) extends Ri
case e: Exception =>
// To improve availability, we don't rethrow the exception. We just drop the event
// and track the errors in a metric. If there are too many errors we'll get alerted/paged.
println(s"Error evaluating Spark expression - $e")
logger.info(s"Error evaluating Spark expression - $e")
exprEvalErrorCounter.inc()
}
}
Expand Down
4 changes: 3 additions & 1 deletion online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.online

import org.slf4j.LoggerFactory
import ai.chronon.api.{Constants, StructType}
import ai.chronon.online.KVStore.{GetRequest, GetResponse, PutRequest}
import org.apache.spark.sql.SparkSession
Expand All @@ -40,6 +41,7 @@ object KVStore {
// the main system level api for key value storage
// used for streaming writes, batch bulk uploads & fetching
trait KVStore {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
implicit val executionContext: ExecutionContext = FlexibleExecutionContext.buildExecutionContext

def create(dataset: String): Unit
Expand Down Expand Up @@ -69,7 +71,7 @@ trait KVStore {
.map(_.head)
.recover {
case e: java.util.NoSuchElementException =>
println(
logger.info(
s"Failed request against ${request.dataset} check the related task to the upload of the dataset (GroupByUpload or MetadataUpload)")
throw e
}
Expand Down