Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ezvz committed Dec 7, 2023
1 parent 965d899 commit f49fa88
Show file tree
Hide file tree
Showing 68 changed files with 153 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class HopsAggregator(minQueryTs: Long,
inputSchema: Seq[(String, DataType)],
resolution: Resolution)
extends HopsAggregatorBase(aggregations, inputSchema, resolution) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

val leftBoundaries: Array[Option[Long]] = {
// Nikhil is pretty confident we won't call this when aggregations is empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,
inputSchema: Seq[(String, DataType)],
resolution: Resolution,
tailBufferMillis: Long) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// logically, batch response is arranged like so
// sum-90d => sum_ir_88d, [(sum_ir_1d, ts)] -> 1d is the hopSize for 90d
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.junit.Assert._
import scala.util.Random

class ApproxPercentilesTest extends TestCase {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def testBasicImpl(nums: Int, slide: Int, k: Int, percentiles: Array[Double], errorPercent: Float): Unit = {
val sorted = (0 to nums).map(_.toFloat)
val elems = Random.shuffle(sorted.toList).toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.collection.mutable
import scala.collection.Seq

class Timer {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

var ts: Long = System.currentTimeMillis()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import junit.framework.TestCase
import org.junit.Assert._

class VarianceTest extends TestCase {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def mean(elems: Seq[Double]): Double = elems.sum / elems.length
def naive(elems: Seq[Double]): Double = {
Expand Down Expand Up @@ -55,7 +55,7 @@ class VarianceTest extends TestCase {
val naiveResult = naive(nums)
val welfordResult = welford(nums)
logger.info(s"naive $naiveResult - welford $welfordResult - sum of squares ${sumOfSquares(nums)}")
logger.info((naiveResult - welfordResult) / naiveResult)
logger.info(((naiveResult - welfordResult) / naiveResult).toString)
assertTrue((naiveResult - welfordResult) / naiveResult < 0.0000001)
}

Expand Down
2 changes: 1 addition & 1 deletion api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ object Extensions {
}

implicit class JoinOps(val join: Join) extends Serializable {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// all keys as they should appear in left that are being used on right
def leftKeyCols: Array[String] = {
join.joinParts.toScala
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/scala/ai/chronon/api/ParametricMacro.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

// takes a map of macro names and functions and applies the functions on macro arguments
case class ParametricMacro(value: String, func: Map[String, String] => String) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private val pattern = s"""\\{\\{\\s*$value(\\([\\s0-9A-Za-z_.,=]*\\))*\\s*}}""".r

def replace(str: String): String = {
Expand Down Expand Up @@ -53,7 +53,7 @@ case class ParametricMacro(value: String, func: Map[String, String] => String) {
}

object ParametricMacro {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
val mc = ParametricMacro("something", { x => "st:" + x.keys.mkString("/") + "|" + x.values.mkString("/") })
val str = "something nothing-{{ something( a_1=b,, 3.1, c=d) }}-something after-{{ thing:a1=b1 }}{{ something }}"
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.reflect.ClassTag
import scala.util.ScalaJavaConversions.ListOps

object ThriftJsonCodec {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def serializer = new TSerializer(new TSimpleJSONProtocol.Factory())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.junit.Assert._
import org.junit.Test

class DataTypeConversionTest {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
@Test
def testDataTypeToThriftAndBack(): Unit = {
// build some complex type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object AsyncKVStoreWriter {
*/
class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String)
extends RichAsyncFunction[PutRequest, WriteResponse] {
private val logger = LoggerFactory.getLogger(getClass)
private @transient lazy @transient lazy @transient lazy val logger = LoggerFactory.getLogger(getClass)

@transient private var kvStore: KVStore = _

Expand Down
2 changes: 1 addition & 1 deletion flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.jdk.CollectionConverters._
*/
case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
extends RichFlatMapFunction[Map[String, Any], PutRequest] {
private val logger = LoggerFactory.getLogger(getClass)
private @transient lazy @transient lazy @transient lazy val logger = LoggerFactory.getLogger(getClass)

@transient protected var avroConversionErrorCounter: Counter = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters.{asScalaBufferConverter, mapAsScalaMapConv
* @tparam T The type of the input data.
*/
class SparkExpressionEvalFn[T](encoder: Encoder[T], groupBy: GroupBy) extends RichFlatMapFunction[T, Map[String, Any]] {
private val logger = LoggerFactory.getLogger(getClass)
private @transient lazy @transient lazy @transient lazy val logger = LoggerFactory.getLogger(getClass)

private val query: Query = groupBy.streamingSource.get.getEvents.query

Expand Down
2 changes: 1 addition & 1 deletion online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object KVStore {
// the main system level api for key value storage
// used for streaming writes, batch bulk uploads & fetching
trait KVStore {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
implicit val executionContext: ExecutionContext = FlexibleExecutionContext.buildExecutionContext

def create(dataset: String): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object TopicInfo {
}

case class DataStream(df: DataFrame, partitions: Int, topicInfo: TopicInfo) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// apply a query to a given data stream
def apply(query: api.Query, keys: Seq[String] = null, dataModel: DataModel = DataModel.Events): DataStream = {
Expand Down
2 changes: 1 addition & 1 deletion online/src/main/scala/ai/chronon/online/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Fetcher(val kvStore: KVStore,
debug: Boolean = false,
val externalSourceRegistry: ExternalSourceRegistry = null)
extends FetcherBase(kvStore, metaDataSet, timeoutMillis, debug) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def buildJoinCodec(joinConf: api.Join): JoinCodec = {
val keyFields = new mutable.LinkedHashSet[StructField]
Expand Down
4 changes: 2 additions & 2 deletions online/src/main/scala/ai/chronon/online/FetcherBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FetcherBase(kvStore: KVStore,
timeoutMillis: Long = 10000,
debug: Boolean = false)
extends MetadataStore(kvStore, metaDataSet, timeoutMillis) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

private case class GroupByRequestMeta(
groupByServingInfoParsed: GroupByServingInfoParsed,
Expand All @@ -63,7 +63,7 @@ class FetcherBase(kvStore: KVStore,
overallLatency: Long,
context: Metrics.Context,
totalResponseValueBytes: Int): Map[String, AnyRef] = {
val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
val latestBatchValue = batchResponsesTry.map(_.maxBy(_.millis))
val servingInfo =
latestBatchValue.map(timedVal => updateServingInfo(timedVal.millis, oldServingInfo)).getOrElse(oldServingInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.util.{Failure, Success, Try}
case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])])

class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, timeoutMillis: Long) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private var partitionSpec = PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis)
private val CONF_BATCH_SIZE = 50

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.junit.Test
import scala.collection.JavaConverters._

class TileCodecTest {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private val histogram = Map[String, Int]("A" -> 3, "B" -> 2).asJava

private val aggregationsAndExpected: Array[(Aggregation, Seq[Any])] = Array(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.junit.Test
import scala.util.ScalaJavaConversions.JListOps

class DataStreamBuilderTest {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
lazy val spark: SparkSession = {
System.setSecurityManager(null)
val spark = SparkSession
Expand Down
2 changes: 1 addition & 1 deletion project/FolderCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import java.io.File
import scala.reflect.io.Directory

object Folder {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def clean(files: File*): Unit = {
logger.info(s"Removing folders ${files.map(_.getAbsolutePath)}")
files.foreach { file =>
Expand Down
2 changes: 1 addition & 1 deletion project/ThriftGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import sbt._
import sys.process._

object Thrift {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def gen(inputPath: String, outputPath: String, language: String, cleanupSuffixPath: String = "", extension: String = null): Seq[File] = {
s"""echo "Generating files from thrift file: $outputPath \ninto folder $inputPath" """ !;
s"rm -rf $outputPath/$cleanupSuffixPath" !;
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Analyzer(tableUtils: TableUtils,
sample: Double = 0.1,
enableHitter: Boolean = false,
silenceMode: Boolean = false) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// include ts into heavy hitter analysis - useful to surface timestamps that have wrong units
// include total approx row count - so it is easy to understand the percentage of skewed data
def heavyHittersWithTsAndCount(df: DataFrame,
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ case class BootstrapInfo(
}

object BootstrapInfo {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// Build metadata for the join that contains schema information for join parts, external parts and bootstrap parts
def from(joinConf: api.Join,
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Comparison.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.{DecimalType, DoubleType, FloatType, MapType}
import java.util

object Comparison {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// used for comparison
def sortedJson(m: Map[String, Any]): String = {
Expand Down
14 changes: 7 additions & 7 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DummyExtensions extends (SparkSessionExtensions => Unit) {

// The mega chronon cli
object Driver {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def parseConf[T <: TBase[_, _]: Manifest: ClassTag](confPath: String): T =
ThriftJsonCodec.fromJsonFile[T](confPath, check = true)
Expand Down Expand Up @@ -214,7 +214,7 @@ object Driver {
}

object JoinBackfill {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
class Args
extends Subcommand("join")
with OfflineSubcommand
Expand Down Expand Up @@ -262,7 +262,7 @@ object Driver {
}

object GroupByBackfill {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
class Args
extends Subcommand("group-by-backfill")
with OfflineSubcommand
Expand Down Expand Up @@ -526,7 +526,7 @@ object Driver {
}

object FetcherCli {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

class Args extends Subcommand("fetch") with OnlineSubcommand {
val keyJson: ScallopOption[String] = opt[String](required = false, descr = "json of the keys to fetch")
Expand Down Expand Up @@ -646,7 +646,7 @@ object Driver {
}

object MetadataUploader {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
class Args extends Subcommand("metadata-upload") with OnlineSubcommand {
val confPath: ScallopOption[String] =
opt[String](required = true, descr = "Path to the Chronon config file or directory")
Expand Down Expand Up @@ -691,7 +691,7 @@ object Driver {
}

object GroupByStreaming {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def dataStream(session: SparkSession, host: String, topic: String): DataFrame = {
TopicChecker.topicShouldExist(topic, host)
session.streams.addListener(new StreamingQueryListener() {
Expand All @@ -715,7 +715,7 @@ object Driver {
}

class Args extends Subcommand("group-by-streaming") with OnlineSubcommand {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
val confPath: ScallopOption[String] = opt[String](required = true, descr = "path to groupBy conf")
val DEFAULT_LAG_MILLIS = 2000 // 2seconds
val kafkaBootstrap: ScallopOption[String] =
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object Extensions {
}

implicit class DataframeOps(df: DataFrame) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private implicit val tableUtils: TableUtils = TableUtils(df.sparkSession)

// This is safe to call on dataframes that are un-shuffled from their disk sources -
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/FastHashing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class KeyWithHash(data: Array[Any], hash: Array[Byte], hashInt: Int) extend
}

object FastHashing {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// function to generate a fast-ish hasher
// the approach tries to accumulate several tiny closures to compute the final hash
def generateKeyBuilder(keys: Array[String], schema: StructType): Row => KeyWithHash = {
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/GroupBy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class GroupBy(val aggregations: Seq[api.Aggregation],
skewFilter: Option[String] = None,
finalize: Boolean = true)
extends Serializable {
val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

protected[spark] val tsIndex: Int = inputDf.schema.fieldNames.indexOf(Constants.TimeColumn)
protected val selectedSchema: Array[(String, api.DataType)] = SparkConversions.toChrononSchema(inputDf.schema)
Expand Down Expand Up @@ -392,7 +392,7 @@ class GroupBy(val aggregations: Seq[api.Aggregation],

// TODO: truncate queryRange for caching
object GroupBy {
val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// Need to use a case class here to allow null matching
case class SourceDataProfile(earliestRequired: String, earliestPresent: String, latestAllowed: String)
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.util.ScalaJavaConversions.{ListOps, MapOps}
import scala.util.Try

class GroupByUpload(endPartition: String, groupBy: GroupBy) extends Serializable {
val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
implicit val sparkSession: SparkSession = groupBy.sparkSession
implicit private val tableUtils: TableUtils = TableUtils(sparkSession)
private def fromBase(rdd: RDD[(Array[Any], Array[Any])]): KvRdd = {
Expand Down Expand Up @@ -105,7 +105,7 @@ class GroupByUpload(endPartition: String, groupBy: GroupBy) extends Serializable
}

object GroupByUpload {
val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// TODO - remove this if spark streaming can't reach hive tables
private def buildServingInfo(groupByConf: api.GroupBy,
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Join.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Join(joinConf: api.Join,
mutationScan: Boolean = true,
showDf: Boolean = false)
extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

private val bootstrapTable = joinConf.metaData.bootstrapTable

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class JoinBase(joinConf: api.Join,
skipFirstHole: Boolean,
mutationScan: Boolean = true,
showDf: Boolean = false) {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
assert(Option(joinConf.metaData.outputNamespace).nonEmpty, s"output namespace could not be empty or null")
val metrics: Metrics.Context = Metrics.Context(Metrics.Environment.JoinOffline, joinConf)
private val outputTable = joinConf.metaData.outputTable
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.collection.Seq
import scala.util.ScalaJavaConversions.MapOps

object JoinUtils {
private val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

/***
* Util methods for join computation
Expand Down

0 comments on commit f49fa88

Please sign in to comment.