Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spark-jobs): add cron scheduler to spark-driver to schedule downsampling spark jobs #1173

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ filodb {
# tagB3 = value3
#}
]

# cron scheduler
cron-scheduler {
enabled = false
# schedule = 0 0 */6 ? * * * # every 6 hours
}
}

ds-index-job {
Expand All @@ -372,7 +378,12 @@ filodb {

# Amount of time to wait for a Cassandra write to finish before proceeding to next batch of partitions
cassandra-write-timeout = 1.minutes
}

# cron scheduler
cron-scheduler {
enabled = false
# schedule = 0 0 */6 ? * * * # every 6 hours
}

spark {
# The amount of time to wait for dataset creation, truncation, schema changes, etc.
Expand Down
4 changes: 3 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object Dependencies {
val monixKafkaVersion = "0.15"
val sparkVersion = "2.4.4"
val sttpVersion = "1.3.3"
val quartzVersion = "2.3.1"

/* Dependencies shared */
val logbackDep = "ch.qos.logback" % "logback-classic" % "1.2.3"
Expand Down Expand Up @@ -81,7 +82,8 @@ object Dependencies {
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-core" % sparkVersion % Test excludeAll(excludeNetty),
"org.apache.spark" %% "spark-sql" % sparkVersion % Test excludeAll(excludeNetty)
"org.apache.spark" %% "spark-sql" % sparkVersion % Test excludeAll(excludeNetty),
"org.quartz-scheduler" % "quartz" % quartzVersion
)

lazy val cassDeps = commonDeps ++ Seq(
Expand Down
204 changes: 137 additions & 67 deletions spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import java.time.format.DateTimeFormatter
import kamon.Kamon
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.quartz._
import org.quartz.impl.StdSchedulerFactory

import filodb.coordinator.KamonShutdownHook
import filodb.downsampler.DownsamplerContext
Expand Down Expand Up @@ -48,7 +50,86 @@ object DownsamplerMain extends App {

val d = new Downsampler(settings, batchDownsampler)
val sparkConf = new SparkConf(loadDefaults = true)
d.run(sparkConf)
val sparkSession = SparkSession.builder()
.appName("FiloDBDownsampler")
.config(sparkConf)
.getOrCreate()

private[this] def scheduleJob(scheduler: Scheduler,
sparkSession: SparkSession,
cronExpression: String): Unit = {
val job = JobBuilder.newJob(classOf[SparkDownsamplerJob])
.withIdentity("FiloDBDownsampler", "Group")
.build
val trigger = TriggerBuilder
.newTrigger()
.withIdentity(s"cron trigger $cronExpression", "triggerGroup")
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).build()
DownsamplerContext.dsLogger.info(s"triggering job with schedule $cronExpression")
scheduler.scheduleJob(job, trigger)
}

private[this] class SparkDownsamplerJob extends Job {
override def execute(context: JobExecutionContext): Unit = {
// get the timeInPeriod from JobContext (rerun failed job) or derive from configuration
val timeInPeriod = context.getScheduler.getContext.getOrDefault("timeInPeriod", -1L)
.asInstanceOf[Long] match {
case -1L => userTimesInPeriod(sparkSession).head // first run
case time: Long => time // retry failed run
}
if (context.getRefireCount > 5) { // triggered job run failed count
val jee = new JobExecutionException("Retries exceeded the threshold count 3")
DownsamplerContext.dsLogger
.error(s"Job failed to run for 5 times for the period, To rerun this job add the following spark config: " +
s""""spark.filodb.downsampler.userTimeOverride": "${java.time.Instant.ofEpochMilli(timeInPeriod)}"""")
//make sure this trigger doesn't run again
jee.setUnscheduleFiringTrigger(true)
throw jee
}
try {
// set the timeInPeriod in context, will be used for refire (reruns)
context.getScheduler.getContext.put("timeInPeriod", timeInPeriod)
d.run(sparkSession, Seq(timeInPeriod))
} catch {
case e1: Exception =>
DownsamplerContext.dsLogger.error("exception during scheduled job run", e1)
val jee = new JobExecutionException(e1)
Thread.sleep(300000) //backoff for 5 mins
//fire it again
jee.setRefireImmediately(true)
throw jee
}
}
}

// Use the comma separated spark property spark.filodb.downsampler.user-time-override to override the
// userTime periods for which downsampling should occur.
// Generally disabled, defaults the period that just ended prior to now.
// Specified during reruns for downsampling old data
def userTimesInPeriod(spark: SparkSession): Seq[Long] = {
spark.sparkContext.getConf
.getOption("spark.filodb.downsampler.userTimeOverride") match {
// by default assume a time in the previous downsample period
case None => Seq(System.currentTimeMillis() - settings.downsampleChunkDuration)
// examples: 2019-10-20T12:34:56Z or 2019-10-20T12:34:56-08:00
case Some(str) =>
str.split(",")
.map(timeStr => Instant.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(timeStr)).toEpochMilli())
}
}

def startJob: Unit = {
if (settings.cronEnabled && sparkSession.sparkContext.getConf
.getOption("spark.filodb.downsampler.userTimeOverride").isEmpty) {
val sf = new StdSchedulerFactory
val sched = sf.getScheduler()
scheduleJob(sched, sparkSession, settings.cronExpression.get)
} else
d.run(sparkSession, userTimesInPeriod(sparkSession))
}

// entry-point for the execution
startJob
}

class Downsampler(settings: DownsamplerSettings, batchDownsampler: BatchDownsampler) extends Serializable {
Expand All @@ -60,72 +141,61 @@ class Downsampler(settings: DownsamplerSettings, batchDownsampler: BatchDownsamp
// Otherwise, config values below were not being sent over.
// See https://medium.com/onzo-tech/serialization-challenges-with-spark-and-scala-a2287cd51c54
// scalastyle:off method.length
def run(sparkConf: SparkConf): SparkSession = {

val spark = SparkSession.builder()
.appName("FiloDBDownsampler")
.config(sparkConf)
.getOrCreate()

DownsamplerContext.dsLogger.info(s"Spark Job Properties: ${spark.sparkContext.getConf.toDebugString}")

// Use the spark property spark.filodb.downsampler.user-time-override to override the
// userTime period for which downsampling should occur.
// Generally disabled, defaults the period that just ended prior to now.
// Specified during reruns for downsampling old data
val userTimeInPeriod: Long = spark.sparkContext.getConf
.getOption("spark.filodb.downsampler.userTimeOverride") match {
// by default assume a time in the previous downsample period
case None => System.currentTimeMillis() - settings.downsampleChunkDuration
// examples: 2019-10-20T12:34:56Z or 2019-10-20T12:34:56-08:00
case Some(str) => Instant.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(str)).toEpochMilli()
}

val userTimeStart: Long = (userTimeInPeriod / settings.downsampleChunkDuration) * settings.downsampleChunkDuration
val userTimeEndExclusive: Long = userTimeStart + settings.downsampleChunkDuration
val ingestionTimeStart: Long = userTimeStart - settings.widenIngestionTimeRangeBy.toMillis
val ingestionTimeEnd: Long = userTimeEndExclusive + settings.widenIngestionTimeRangeBy.toMillis

DownsamplerContext.dsLogger.info(s"This is the Downsampling driver. Starting downsampling job " +
s"rawDataset=${settings.rawDatasetName} for " +
s"userTimeInPeriod=${java.time.Instant.ofEpochMilli(userTimeInPeriod)} " +
s"ingestionTimeStart=${java.time.Instant.ofEpochMilli(ingestionTimeStart)} " +
s"ingestionTimeEnd=${java.time.Instant.ofEpochMilli(ingestionTimeEnd)} " +
s"userTimeStart=${java.time.Instant.ofEpochMilli(userTimeStart)} " +
s"userTimeEndExclusive=${java.time.Instant.ofEpochMilli(userTimeEndExclusive)}")
DownsamplerContext.dsLogger.info(s"To rerun this job add the following spark config: " +
s""""spark.filodb.downsampler.userTimeOverride": "${java.time.Instant.ofEpochMilli(userTimeInPeriod)}"""")

val splits = batchDownsampler.rawCassandraColStore.getScanSplits(batchDownsampler.rawDatasetRef)
DownsamplerContext.dsLogger.info(s"Cassandra split size: ${splits.size}. We will have this many spark " +
s"partitions. Tune num-token-range-splits-for-scans if parallelism is low or latency is high")

KamonShutdownHook.registerShutdownHook()

spark.sparkContext
.makeRDD(splits)
.mapPartitions { splitIter =>
Kamon.init()
KamonShutdownHook.registerShutdownHook()
val rawDataSource = batchDownsampler.rawCassandraColStore
val batchIter = rawDataSource.getChunksByIngestionTimeRangeNoAsync(
datasetRef = batchDownsampler.rawDatasetRef,
splits = splitIter, ingestionTimeStart = ingestionTimeStart,
ingestionTimeEnd = ingestionTimeEnd,
userTimeStart = userTimeStart, endTimeExclusive = userTimeEndExclusive,
maxChunkTime = settings.rawDatasetIngestionConfig.storeConfig.maxChunkTime.toMillis,
batchSize = settings.batchSize,
cassFetchSize = settings.cassFetchSize)
batchIter
}
.foreach { rawPartsBatch =>
Kamon.init()
KamonShutdownHook.registerShutdownHook()
batchDownsampler.downsampleBatch(rawPartsBatch, userTimeStart, userTimeEndExclusive)
}

DownsamplerContext.dsLogger.info(s"Downsampling Driver completed successfully")
jobCompleted.increment()
def run(spark: SparkSession, userTimeInPeriods: Seq[Long]): SparkSession = {
val userTimes = userTimeInPeriods match {
case head::tail => tail.foldLeft(${java.time.Instant.ofEpochMilli(head)})
(${java.time.Instant.ofEpochMilli(_)} + "," + ${java.time.Instant.ofEpochMilli(_)})
case Nil => ""
}
DownsamplerContext.dsLogger.info(s"Downsampling job for userTimeInPeriod(s)=$userTimes")
userTimeInPeriods.foreach(userTimeInPeriod => {
DownsamplerContext.dsLogger.info(s"Spark Job Properties: ${spark.sparkContext.getConf.toDebugString}")
val userTimeStart: Long = (userTimeInPeriod / settings.downsampleChunkDuration) * settings.downsampleChunkDuration
val userTimeEndExclusive: Long = userTimeStart + settings.downsampleChunkDuration
val ingestionTimeStart: Long = userTimeStart - settings.widenIngestionTimeRangeBy.toMillis
val ingestionTimeEnd: Long = userTimeEndExclusive + settings.widenIngestionTimeRangeBy.toMillis

DownsamplerContext.dsLogger.info(s"This is the Downsampling driver. Starting downsampling job " +
s"rawDataset=${settings.rawDatasetName} for " +
s"userTimeInPeriod=${java.time.Instant.ofEpochMilli(userTimeInPeriod)} " +
s"ingestionTimeStart=${java.time.Instant.ofEpochMilli(ingestionTimeStart)} " +
s"ingestionTimeEnd=${java.time.Instant.ofEpochMilli(ingestionTimeEnd)} " +
s"userTimeStart=${java.time.Instant.ofEpochMilli(userTimeStart)} " +
s"userTimeEndExclusive=${java.time.Instant.ofEpochMilli(userTimeEndExclusive)}")
DownsamplerContext.dsLogger.info(s"To rerun this job add the following spark config: " +
s""""spark.filodb.downsampler.userTimeOverride": "${java.time.Instant.ofEpochMilli(userTimeInPeriod)}"""")

val splits = batchDownsampler.rawCassandraColStore.getScanSplits(batchDownsampler.rawDatasetRef)
DownsamplerContext.dsLogger.info(s"Cassandra split size: ${splits.size}. We will have this many spark " +
s"partitions. Tune num-token-range-splits-for-scans if parallelism is low or latency is high")

KamonShutdownHook.registerShutdownHook()

spark.sparkContext
.makeRDD(splits)
.mapPartitions { splitIter =>
Kamon.init()
KamonShutdownHook.registerShutdownHook()
val rawDataSource = batchDownsampler.rawCassandraColStore
val batchIter = rawDataSource.getChunksByIngestionTimeRangeNoAsync(
datasetRef = batchDownsampler.rawDatasetRef,
splits = splitIter, ingestionTimeStart = ingestionTimeStart,
ingestionTimeEnd = ingestionTimeEnd,
userTimeStart = userTimeStart, endTimeExclusive = userTimeEndExclusive,
maxChunkTime = settings.rawDatasetIngestionConfig.storeConfig.maxChunkTime.toMillis,
batchSize = settings.batchSize,
cassFetchSize = settings.cassFetchSize)
batchIter
}
.foreach { rawPartsBatch =>
Kamon.init()
KamonShutdownHook.registerShutdownHook()
batchDownsampler.downsampleBatch(rawPartsBatch, userTimeStart, userTimeEndExclusive)
}

DownsamplerContext.dsLogger.info(s"Downsampling Driver completed successfully")
jobCompleted.increment()
})
spark
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ

@transient lazy val cassandraConfig = filodbConfig.getConfig("cassandra")

@transient lazy val cronEnabled = downsamplerConfig.getBoolean("cron-scheduler.enabled")

@transient lazy val cronExpression = downsamplerConfig.getAs[String]("cron-scheduler.schedule")

@transient lazy val rawDatasetName = downsamplerConfig.getString("raw-dataset-name")

@transient lazy val rawDatasetIngestionConfig = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import java.time.Instant
import java.time.format.DateTimeFormatter

import kamon.Kamon
import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.quartz._
import org.quartz.impl.StdSchedulerFactory

import filodb.coordinator.KamonShutdownHook
import filodb.downsampler.DownsamplerContext
Expand Down Expand Up @@ -38,20 +40,88 @@ object DSIndexJobMain extends App {
//migrate partkeys between these hours
val iu = new IndexJobDriver(dsSettings, dsIndexJobSettings)
val sparkConf = new SparkConf(loadDefaults = true)
iu.run(sparkConf)
val sparkSession = SparkSession.builder()
.appName("FiloDB_Index_Downsampler")
.config(sparkConf)
.getOrCreate()

private[this] def scheduleJob(scheduler: Scheduler,
sparkSession: SparkSession,
cronExpression: String): Unit = {
val job = JobBuilder.newJob(classOf[SparkDSIndexJob])
.withIdentity("DSIndexJob", "Group")
.build
val trigger = TriggerBuilder
.newTrigger()
.withIdentity(s"cron trigger $cronExpression", "triggerGroup")
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).build()
DownsamplerContext.dsLogger.info(s"triggering job with schedule $cronExpression")
scheduler.scheduleJob(job, trigger)
}

private[this] class SparkDSIndexJob extends Job {
override def execute(context: JobExecutionContext): Unit = {
// get the timeInPeriod from JobContext (rerun failed job) or derive from configuration
val timeInPeriod = context.getScheduler.getContext.getOrDefault("timeInPeriod", -1L)
.asInstanceOf[Long] match {
case -1L => timeInMigrationPeriod(sparkSession)
case time: Long => time
}
if (context.getRefireCount > 5) { // triggered job run failed count
val jee = new JobExecutionException("Retries exceeded the threshold count 3")
DownsamplerContext.dsLogger
.error(s"Job failed to run for period: ${java.time.Instant.ofEpochMilli(timeInPeriod)}")
//make sure job doesn't run again. index job need to run in sequence. so EXIT the job.
jee.setUnscheduleAllTriggers(true)
throw jee
}
try {
// set the timeInPeriod in context, will be used for reruns
context.getScheduler.getContext.put("timeInPeriod", timeInPeriod)
iu.run(sparkSession, timeInPeriod)
} catch {
case e1: Exception =>
DownsamplerContext.dsLogger.error("exception during scheduled job run", e1)
val jee = new JobExecutionException(e1)
Thread.sleep(300000) //backoff for 5 mins
//fire it again
jee.setRefireImmediately(true)
throw jee
}
}
}

def timeInMigrationPeriod(spark: SparkSession): Long = {
spark.sparkContext.getConf
.getOption ("spark.filodb.downsampler.index.timeInPeriodOverride") match {
// by default assume a time in the previous downsample period
case None => System.currentTimeMillis () - dsSettings.downsampleChunkDuration
// examples: 2019-10-20T12:34:56Z or 2019-10-20T12:34:56-08:00
case Some (str) => Instant.from (DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse (str) ).toEpochMilli
}
}
def startJob: Unit = {
if (dsIndexJobSettings.cronEnabled && sparkSession.sparkContext.getConf
.getOption("spark.filodb.downsampler.index.timeInPeriodOverride").isEmpty) {
val sf = new StdSchedulerFactory
val sched = sf.getScheduler()
scheduleJob(sched, sparkSession, dsIndexJobSettings.cronExpression.get)
} else
iu.run(sparkSession, timeInPeriod = timeInMigrationPeriod(sparkSession))
}

// entry-point for the execution
startJob
}



class IndexJobDriver(dsSettings: DownsamplerSettings, dsIndexJobSettings: DSIndexJobSettings) extends Serializable {

@transient lazy private val jobCompleted = Kamon.counter("index-migration-completed").withoutTags()

// scalastyle:off method.length
def run(conf: SparkConf): SparkSession = {
val spark = SparkSession.builder()
.appName("FiloDB_Index_Downsampler")
.config(conf)
.getOrCreate()
def run(spark: SparkSession, timeInPeriod: Long): SparkSession = {

def hour(millis: Long) = millis / 1000 / 60 / 60

Expand All @@ -60,7 +130,7 @@ class IndexJobDriver(dsSettings: DownsamplerSettings, dsIndexJobSettings: DSInde
// by default assume a time in the previous downsample period
case None => System.currentTimeMillis() - dsSettings.downsampleChunkDuration
// examples: 2019-10-20T12:34:56Z or 2019-10-20T12:34:56-08:00
case Some(str) => Instant.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(str)).toEpochMilli()
case Some(str) => Instant.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(str)).toEpochMilli
}

val hourInMigrationPeriod = hour(timeInMigrationPeriod)
Expand Down