Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ezvz committed Dec 7, 2023
1 parent d745b86 commit 5bff60d
Show file tree
Hide file tree
Showing 61 changed files with 75 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class HopsAggregator(minQueryTs: Long,
inputSchema: Seq[(String, DataType)],
resolution: Resolution)
extends HopsAggregatorBase(aggregations, inputSchema, resolution) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

val leftBoundaries: Array[Option[Long]] = {
// Nikhil is pretty confident we won't call this when aggregations is empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,
inputSchema: Seq[(String, DataType)],
resolution: Resolution,
tailBufferMillis: Long) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// logically, batch response is arranged like so
// sum-90d => sum_ir_88d, [(sum_ir_1d, ts)] -> 1d is the hopSize for 90d
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.junit.Assert._
import scala.util.Random

class ApproxPercentilesTest extends TestCase {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def testBasicImpl(nums: Int, slide: Int, k: Int, percentiles: Array[Double], errorPercent: Float): Unit = {
val sorted = (0 to nums).map(_.toFloat)
val elems = Random.shuffle(sorted.toList).toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.collection.mutable
import scala.collection.Seq

class Timer {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

var ts: Long = System.currentTimeMillis()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import junit.framework.TestCase
import org.junit.Assert._

class VarianceTest extends TestCase {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def mean(elems: Seq[Double]): Double = elems.sum / elems.length
def naive(elems: Seq[Double]): Double = {
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ object Extensions {
}

implicit class JoinOps(val join: Join) extends Serializable {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// all keys as they should appear in left that are being used on right
def leftKeyCols: Array[String] = {
join.joinParts.toScala
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/scala/ai/chronon/api/ParametricMacro.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

// takes a map of macro names and functions and applies the functions on macro arguments
case class ParametricMacro(value: String, func: Map[String, String] => String) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private val pattern = s"""\\{\\{\\s*$value(\\([\\s0-9A-Za-z_.,=]*\\))*\\s*}}""".r

def replace(str: String): String = {
Expand Down Expand Up @@ -53,7 +53,7 @@ case class ParametricMacro(value: String, func: Map[String, String] => String) {
}

object ParametricMacro {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
val mc = ParametricMacro("something", { x => "st:" + x.keys.mkString("/") + "|" + x.values.mkString("/") })
val str = "something nothing-{{ something( a_1=b,, 3.1, c=d) }}-something after-{{ thing:a1=b1 }}{{ something }}"
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.reflect.ClassTag
import scala.util.ScalaJavaConversions.ListOps

object ThriftJsonCodec {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def serializer = new TSerializer(new TSimpleJSONProtocol.Factory())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.junit.Assert._
import org.junit.Test

class DataTypeConversionTest {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
@Test
def testDataTypeToThriftAndBack(): Unit = {
// build some complex type
Expand Down
2 changes: 1 addition & 1 deletion online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object KVStore {
// the main system level api for key value storage
// used for streaming writes, batch bulk uploads & fetching
trait KVStore {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
implicit val executionContext: ExecutionContext = FlexibleExecutionContext.buildExecutionContext

def create(dataset: String): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object TopicInfo {
}

case class DataStream(df: DataFrame, partitions: Int, topicInfo: TopicInfo) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// apply a query to a given data stream
def apply(query: api.Query, keys: Seq[String] = null, dataModel: DataModel = DataModel.Events): DataStream = {
Expand Down
2 changes: 1 addition & 1 deletion online/src/main/scala/ai/chronon/online/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Fetcher(val kvStore: KVStore,
debug: Boolean = false,
val externalSourceRegistry: ExternalSourceRegistry = null)
extends FetcherBase(kvStore, metaDataSet, timeoutMillis, debug) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def buildJoinCodec(joinConf: api.Join): JoinCodec = {
val keyFields = new mutable.LinkedHashSet[StructField]
Expand Down
2 changes: 1 addition & 1 deletion online/src/main/scala/ai/chronon/online/FetcherBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FetcherBase(kvStore: KVStore,
timeoutMillis: Long = 10000,
debug: Boolean = false)
extends MetadataStore(kvStore, metaDataSet, timeoutMillis) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

private case class GroupByRequestMeta(
groupByServingInfoParsed: GroupByServingInfoParsed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.util.{Failure, Success, Try}
case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])])

class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, timeoutMillis: Long) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private var partitionSpec = PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis)
private val CONF_BATCH_SIZE = 50

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.junit.Test
import scala.collection.JavaConverters._

class TileCodecTest {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private val histogram = Map[String, Int]("A" -> 3, "B" -> 2).asJava

private val aggregationsAndExpected: Array[(Aggregation, Seq[Any])] = Array(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.junit.Test
import scala.util.ScalaJavaConversions.JListOps

class DataStreamBuilderTest {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
lazy val spark: SparkSession = {
System.setSecurityManager(null)
val spark = SparkSession
Expand Down
2 changes: 1 addition & 1 deletion project/FolderCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import java.io.File
import scala.reflect.io.Directory

object Folder {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def clean(files: File*): Unit = {
logger.info(s"Removing folders ${files.map(_.getAbsolutePath)}")
files.foreach { file =>
Expand Down
2 changes: 1 addition & 1 deletion project/ThriftGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import sbt._
import sys.process._

object Thrift {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def gen(inputPath: String, outputPath: String, language: String, cleanupSuffixPath: String = "", extension: String = null): Seq[File] = {
s"""echo "Generating files from thrift file: $outputPath \ninto folder $inputPath" """ !;
s"rm -rf $outputPath/$cleanupSuffixPath" !;
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Analyzer(tableUtils: TableUtils,
sample: Double = 0.1,
enableHitter: Boolean = false,
silenceMode: Boolean = false) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// include ts into heavy hitter analysis - useful to surface timestamps that have wrong units
// include total approx row count - so it is easy to understand the percentage of skewed data
def heavyHittersWithTsAndCount(df: DataFrame,
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ case class BootstrapInfo(
}

object BootstrapInfo {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// Build metadata for the join that contains schema information for join parts, external parts and bootstrap parts
def from(joinConf: api.Join,
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Comparison.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.{DecimalType, DoubleType, FloatType, MapType}
import java.util

object Comparison {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

// used for comparison
def sortedJson(m: Map[String, Any]): String = {
Expand Down
14 changes: 7 additions & 7 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DummyExtensions extends (SparkSessionExtensions => Unit) {

// The mega chronon cli
object Driver {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def parseConf[T <: TBase[_, _]: Manifest: ClassTag](confPath: String): T =
ThriftJsonCodec.fromJsonFile[T](confPath, check = true)
Expand Down Expand Up @@ -214,7 +214,7 @@ object Driver {
}

object JoinBackfill {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
class Args
extends Subcommand("join")
with OfflineSubcommand
Expand Down Expand Up @@ -262,7 +262,7 @@ object Driver {
}

object GroupByBackfill {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
class Args
extends Subcommand("group-by-backfill")
with OfflineSubcommand
Expand Down Expand Up @@ -526,7 +526,7 @@ object Driver {
}

object FetcherCli {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

class Args extends Subcommand("fetch") with OnlineSubcommand {
val keyJson: ScallopOption[String] = opt[String](required = false, descr = "json of the keys to fetch")
Expand Down Expand Up @@ -646,7 +646,7 @@ object Driver {
}

object MetadataUploader {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
class Args extends Subcommand("metadata-upload") with OnlineSubcommand {
val confPath: ScallopOption[String] =
opt[String](required = true, descr = "Path to the Chronon config file or directory")
Expand Down Expand Up @@ -691,7 +691,7 @@ object Driver {
}

object GroupByStreaming {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def dataStream(session: SparkSession, host: String, topic: String): DataFrame = {
TopicChecker.topicShouldExist(topic, host)
session.streams.addListener(new StreamingQueryListener() {
Expand All @@ -715,7 +715,7 @@ object Driver {
}

class Args extends Subcommand("group-by-streaming") with OnlineSubcommand {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
val confPath: ScallopOption[String] = opt[String](required = true, descr = "path to groupBy conf")
val DEFAULT_LAG_MILLIS = 2000 // 2seconds
val kafkaBootstrap: ScallopOption[String] =
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object Extensions {
}

implicit class DataframeOps(df: DataFrame) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
private implicit val tableUtils: TableUtils = TableUtils(df.sparkSession)

// This is safe to call on dataframes that are un-shuffled from their disk sources -
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/FastHashing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class KeyWithHash(data: Array[Any], hash: Array[Byte], hashInt: Int) extend
}

object FastHashing {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// function to generate a fast-ish hasher
// the approach tries to accumulate several tiny closures to compute the final hash
def generateKeyBuilder(keys: Array[String], schema: StructType): Row => KeyWithHash = {
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Join.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Join(joinConf: api.Join,
mutationScan: Boolean = true,
showDf: Boolean = false)
extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

private val bootstrapTable = joinConf.metaData.bootstrapTable

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class JoinBase(joinConf: api.Join,
skipFirstHole: Boolean,
mutationScan: Boolean = true,
showDf: Boolean = false) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
assert(Option(joinConf.metaData.outputNamespace).nonEmpty, s"output namespace could not be empty or null")
val metrics: Metrics.Context = Metrics.Context(Metrics.Environment.JoinOffline, joinConf)
private val outputTable = joinConf.metaData.outputTable
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.collection.Seq
import scala.util.ScalaJavaConversions.MapOps

object JoinUtils {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

/***
* Util methods for join computation
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/KvRdd.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ sealed trait BaseKvRdd {
case class KvRdd(data: RDD[(Array[Any], Array[Any])], keySchema: StructType, valueSchema: StructType)(implicit
sparkSession: SparkSession)
extends BaseKvRdd {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
val withTime = false

def toAvroDf(jsonPercent: Int = 1): DataFrame = {
Expand Down Expand Up @@ -113,7 +113,7 @@ case class TimedKvRdd(data: RDD[(Array[Any], Array[Any], Long)],
valueSchema: StructType,
storeSchemasPrefix: Option[String] = None)(implicit sparkSession: SparkSession)
extends BaseKvRdd {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
val withTime = true

// TODO make json percent configurable
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/LabelJoin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.collection.Seq
import scala.util.ScalaJavaConversions.IterableOps

class LabelJoin(joinConf: api.Join, tableUtils: TableUtils, labelDS: String) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

assert(Option(joinConf.metaData.outputNamespace).nonEmpty, s"output namespace could not be empty or null")
assert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{StringType, TimestampType}
import java.io.File

object LocalDataLoader {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def writeTableFromFile(file: File, tableName: String, session: SparkSession): Unit = {
logger.info(s"Checking table: ${tableName}")
if (session.catalog.tableExists(tableName)) return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class LogFlattenerJob(session: SparkSession,
schemaTable: String,
stepDays: Option[Int] = None)
extends Serializable {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
implicit val tableUtils: TableUtils = TableUtils(session)
val joinTblProps: Map[String, String] = Option(joinConf.metaData.tableProperties)
.map(_.toScala)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.nio.file.Paths
import scala.collection.immutable.Map

object MetadataExporter {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

val GROUPBY_PATH_SUFFIX = "/group_bys"
val JOIN_PATH_SUFFIX = "/joins"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.reflect.io.Path
import scala.util.Properties

object SparkSessionBuilder {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

val DefaultWarehouseDir = new File("/tmp/chronon/spark-warehouse")

Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/StagingQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.util.ScalaJavaConversions._

class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tableUtils: TableUtils) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)
assert(Option(stagingQueryConf.metaData.outputNamespace).nonEmpty, s"output namespace could not be empty or null")
private val outputTable = stagingQueryConf.metaData.outputTable
private val tableProps = Option(stagingQueryConf.metaData.tableProperties)
Expand Down Expand Up @@ -93,7 +93,7 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
}

object StagingQuery {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

def substitute(tu: TableUtils, query: String, start: String, end: String, latest: String): String = {
val macros: Array[ParametricMacro] = Array(
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/TableUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.{Failure, Success, Try}

case class TableUtils(sparkSession: SparkSession) {
private @transient lazy val logger = LoggerFactory.getLogger(getClass)
@transient lazy val logger = LoggerFactory.getLogger(getClass)

private val ARCHIVE_TIMESTAMP_FORMAT = "yyyyMMddHHmmss"
private lazy val archiveTimestampFormatter = DateTimeFormatter
Expand Down

0 comments on commit 5bff60d

Please sign in to comment.