Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter out external feature columns in consistency job comparison #599

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,18 @@ class ConsistencyJob(session: SparkSession, joinConf: Join, endDate: String) ext
if (unfilledRanges.isEmpty) return null
val allMetrics = unfilledRanges.map { unfilled =>
val comparisonDf = tableUtils.sql(unfilled.genScanQuery(null, joinConf.metaData.comparisonTable))
// External parts / contextual features don't get logged in the online data but do appear in the comparison table.
// We need to remove them from the comparison df otherwise the comparison metric computation will fail.
// We need to prepend `ext_contextual_` to the feature name since that's how it appears in the comparison df.
val externalFeatureColumns = joinConf.getExternalFeatureCols.map(col => s"ext_contextual_$col")
val comparisonDfNoExternalCols = comparisonDf.select(
comparisonDf.columns.filterNot(externalFeatureColumns.contains(_)).map(org.apache.spark.sql.functions.col): _*
)
val loggedDf =
tableUtils.sql(unfilled.genScanQuery(null, joinConf.metaData.loggedTable)).drop(Constants.SchemaHash)
// there could be external columns that are logged during online env, therefore they could not be used for computing OOC
val loggedDfNoExternalCols = loggedDf.select(comparisonDf.columns.map(org.apache.spark.sql.functions.col): _*)
val loggedDfNoExternalCols =
loggedDf.select(comparisonDfNoExternalCols.columns.map(org.apache.spark.sql.functions.col): _*)
println("Starting compare job for stats")
val joinKeys = if (joinConf.isSetRowIds) {
joinConf.rowIds.toScala
Expand All @@ -100,7 +108,7 @@ class ConsistencyJob(session: SparkSession, joinConf: Join, endDate: String) ext
}
println(s"Using ${joinKeys.mkString("[", ",", "]")} as join keys between log and backfill.")
val (compareDf, metricsKvRdd, metrics) =
CompareBaseJob.compare(comparisonDf,
CompareBaseJob.compare(comparisonDfNoExternalCols,
loggedDfNoExternalCols,
keys = joinKeys,
tableUtils,
Expand Down