Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

driver端异常恢复, 如何确保exactly once语义的呢? #50

Open
jingwangfei opened this issue Dec 27, 2018 · 1 comment
Open

driver端异常恢复, 如何确保exactly once语义的呢? #50

jingwangfei opened this issue Dec 27, 2018 · 1 comment

Comments

@jingwangfei
Copy link

jingwangfei commented Dec 27, 2018

嗨, 大佬, 我有一个问题.
当一个jobSet, 有部分job已经执行成功, 此时, driver端异常退出.
那么, 恢复后, 针对这个jobSet, 还会执行那些已经成功的job吗?
如果不执行, 那么在源码中, 是如何体现的?
如果执行, 那么肯定就不遵守exactly once语义了, 那么我们平时说的streaming的exactly once语义, 又是如何理解呢?

@jingwangfei
Copy link
Author

jingwangfei commented Dec 27, 2018

我刚看了源码, 在这种情况下, 针对该jobSet, 还会执行那些已经成功的job.

恢复时, 会调用restart()方法, 从ck中读取信息重启jobGenerator.
那些在driver端异常退出前, 未被执行完成的jobSet, 会重新进行调度.

private def restart() {   
// 一个time时间点, 对应一个jobSet.
// driver down批次, ck时间和当前重启时间之间的批次
val downTimes = ......

// 在失败之前未经处理的批次
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)

// 未处理批次 + down批次, 并按照时间排序, 越早的越靠前
timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }.distinct.sorted(Time.ordering)

// 调度
  timesToReschedule.foreach { time =>
      jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
    }
}

以上, 仅仅是个人意见, 欢迎一起讨论 & 指导.
但是, 还有一个问题, 我们通常所说的 ''streaming遵守exactly once语义''是什么意思呢?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant