Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ezvz committed Dec 6, 2023
1 parent b322934 commit 3695355
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 17 deletions.
2 changes: 1 addition & 1 deletion api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ object Extensions {
}

implicit class JoinOps(val join: Join) extends Serializable {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
// all keys as they should appear in left that are being used on right
def leftKeyCols: Array[String] = {
join.joinParts.toScala
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ class Analyzer(tableUtils: TableUtils,
logger.info(s"----- Schema validation completed. Found ${keysWithError.size} errors")
val keyErrorSet: Set[(String, String)] = keysWithError.toSet
logger.info(keyErrorSet.map { case (key, errorMsg) => s"$key => $errorMsg" }.mkString("\n"))
logger.info(s"---- Table permission check completed. Found permission errors in ${noAccessTables.size} tables ----")
logger.info(
s"---- Table permission check completed. Found permission errors in ${noAccessTables.size} tables ----")
logger.info(noAccessTables.mkString("\n"))
logger.info(s"---- Data availability check completed. Found issue in ${dataAvailabilityErrors.size} tables ----")
dataAvailabilityErrors.foreach(error =>
Expand Down
18 changes: 10 additions & 8 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ object Driver {
}

object JoinBackfill {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
class Args
extends Subcommand("join")
with OfflineSubcommand
Expand Down Expand Up @@ -256,12 +256,13 @@ object Driver {
}

df.show(numRows = 3, truncate = 0, vertical = true)
logger.info(s"\nShowing three rows of output above.\nQuery table `${args.joinConf.metaData.outputTable}` for more.\n")
logger.info(
s"\nShowing three rows of output above.\nQuery table `${args.joinConf.metaData.outputTable}` for more.\n")
}
}

object GroupByBackfill {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
class Args
extends Subcommand("group-by-backfill")
with OfflineSubcommand
Expand Down Expand Up @@ -525,7 +526,7 @@ object Driver {
}

object FetcherCli {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)

class Args extends Subcommand("fetch") with OnlineSubcommand {
val keyJson: ScallopOption[String] = opt[String](required = false, descr = "json of the keys to fetch")
Expand Down Expand Up @@ -565,7 +566,8 @@ object Driver {
)
series.get(keyMap("statsKey").asInstanceOf[String])
else series
logger.info(s"--- [FETCHED RESULT] ---\n${objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(toPrint)}")
logger.info(
s"--- [FETCHED RESULT] ---\n${objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(toPrint)}")
}

def run(args: Args): Unit = {
Expand Down Expand Up @@ -644,7 +646,7 @@ object Driver {
}

object MetadataUploader {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
class Args extends Subcommand("metadata-upload") with OnlineSubcommand {
val confPath: ScallopOption[String] =
opt[String](required = true, descr = "Path to the Chronon config file or directory")
Expand Down Expand Up @@ -689,7 +691,7 @@ object Driver {
}

object GroupByStreaming {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
def dataStream(session: SparkSession, host: String, topic: String): DataFrame = {
TopicChecker.topicShouldExist(topic, host)
session.streams.addListener(new StreamingQueryListener() {
Expand All @@ -713,7 +715,7 @@ object Driver {
}

class Args extends Subcommand("group-by-streaming") with OnlineSubcommand {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
val confPath: ScallopOption[String] = opt[String](required = true, descr = "path to groupBy conf")
val DEFAULT_LAG_MILLIS = 2000 // 2seconds
val kafkaBootstrap: ScallopOption[String] =
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object Extensions {
}

implicit class DataframeOps(df: DataFrame) {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
private implicit val tableUtils: TableUtils = TableUtils(df.sparkSession)

// This is safe to call on dataframes that are un-shuffled from their disk sources -
Expand Down
6 changes: 4 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ abstract class JoinBase(joinConf: api.Join,
}
} catch {
case e: Exception =>
logger.info(s"Error while processing groupBy: ${joinConf.metaData.name}/${joinPart.groupBy.getMetaData.getName}")
logger.info(
s"Error while processing groupBy: ${joinConf.metaData.name}/${joinPart.groupBy.getMetaData.getName}")
throw e
}
if (tableUtils.tableExists(partTable)) {
Expand All @@ -193,7 +194,8 @@ abstract class JoinBase(joinConf: api.Join,
val rowCount = leftDfWithStats.get.count
val unfilledRange = leftDfWithStats.get.partitionRange

logger.info(s"\nBackfill is required for ${joinPart.groupBy.metaData.name} for $rowCount rows on range $unfilledRange")
logger.info(
s"\nBackfill is required for ${joinPart.groupBy.metaData.name} for $rowCount rows on range $unfilledRange")
val rightBloomMap =
JoinUtils.genBloomFilterIfNeeded(leftDf,
joinPart,
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/LabelJoin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class LabelJoin(joinConf: api.Join, tableUtils: TableUtils, labelDS: String) {
labelTable
} else {
// creating final join view with feature join output table
logger.info(s"Joining label table : ${outputLabelTable} with joined output table : ${joinConf.metaData.outputTable}")
logger.info(
s"Joining label table : ${outputLabelTable} with joined output table : ${joinConf.metaData.outputTable}")
JoinUtils.createOrReplaceView(
joinConf.metaData.outputFinalView,
leftTable = joinConf.metaData.outputTable,
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class CompareJob(

// Save the comparison table
logger.info("Saving comparison output..")
logger.info(s"Comparison schema ${compareDf.schema.fields.map(sb => (sb.name, sb.dataType)).toMap.mkString("\n - ")}")
logger.info(
s"Comparison schema ${compareDf.schema.fields.map(sb => (sb.name, sb.dataType)).toMap.mkString("\n - ")}")
tableUtils.insertUnPartitioned(compareDf, comparisonTableName, tableProps, saveMode = SaveMode.Overwrite)

// Save the metrics table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
private val microBatchIntervalMillis: Int = getProp("batch_interval_millis", "1000").toInt

private case class PutRequestHelper(inputSchema: StructType) extends Serializable {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
private val keyIndices: Array[Int] = keyColumns.map(inputSchema.fieldIndex)
private val valueIndices: Array[Int] = valueColumns.map(inputSchema.fieldIndex)
private val tsIndex: Int = inputSchema.fieldIndex(eventTimeColumn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ object TopicChecker {
}

class Args(arguments: Seq[String]) extends ScallopConf(arguments) {
private val logger = LoggerFactory.getLogger(getClass)
private val logger = LoggerFactory.getLogger(getClass)
val conf: ScallopOption[String] = opt[String](descr = "Conf to pull topic and bootstrap server information")
val bootstrap: ScallopOption[String] = opt[String](descr = "Kafka bootstrap server in host:port format")
val topic: ScallopOption[String] = opt[String](descr = "kafka topic to check metadata for")
Expand Down

0 comments on commit 3695355

Please sign in to comment.