Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor运行一段时间以后Streaming程序失败 #17

Open
tsface opened this issue Apr 10, 2016 · 17 comments
Open

Executor运行一段时间以后Streaming程序失败 #17

tsface opened this issue Apr 10, 2016 · 17 comments

Comments

@tsface
Copy link

tsface commented Apr 10, 2016

你好 @lw-lin
我们在使用Streaming的时候,发现Executor运行一段时间(1小时左右)后,整个程序就会失败,查看CPU,内存,网络,GC情况,都处于安全状态。

error:
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found

最开始的Storage策略配置的是Memory_ONLY,当数据量激增的时候,会报这个错误,所以调整Storage的策略是Memory_And_DIsk,但是程序运行一段时间还是会报这个错误。同时,会抛出Executor和ReceiverTracker的通信超时(120s)。

请问这个有什么好的排查方法吗,谢谢。

ps:部署模式yarn-cluster

@lw-lin
Copy link
Owner

lw-lin commented Apr 10, 2016

@tsface

block input-x-xxxxx not found 这个错误,就是数据已经找不到了:

  • MEMORY_ONLY 时,如果数据过多,内存中就会发生数据替换,被替换出的数据直接被丢掉,所以在后面计算时需要这部分数据的时候,就找不到了;
  • MEMORY_AND_DISK 时,数据过多,替换出的数据会 flush 到硬盘上,所以比 MEMORY_ONLY 时报错几率小很多;但 Executor 失效时(比如其它原因内存溢出后被 Yarn kill 掉、或网络太忙导致心跳发不出去被 driver 认为丢失了等),也会导致在 Memory 或在 Disk 上的数据没有了,虽然几率已经小很多了,但也还是会报这个错误的。

通常解决方法是几种:

  • 如果确实配置的资源不够数据量的需求,那么酌情加大配置的内存或者 Executor 个数;
  • 资源 OK 的前提下:
    • 配置为 RAM_AND_DISK_SER_2,这样会将数据保存到两个 Executor 上,就算一个 Executor 失效,还会在另一个 Executor 上有数据;但两个 Executor 同时失效的话,还是会报之前的错;
    • 启用 WAL;这样数据会同时保存到 Executor 和 HDFS 上,即使 Executor 失效,还是可以从 HDFS 上读出数据来;启用方法详见 这里的 "Configuring write ahead logs" 一节;
    • 容忍数据部分丢失,那么在每个 batch 里都 try-batch 一下,这样即使一个 batch 的 job 失效,那么整个 Streaming 程序还会继续运行。

不过还是建议先看下 Executor 打出来的具体日志,看看需要加资源还是说能够容忍部分数据损失,再酌情选择解决方法。可在本帖后随时反馈;希望有帮助!

@lw-lin
Copy link
Owner

lw-lin commented Apr 11, 2016

@tsface

上面提到的 try-catch 代码:

val inputDStream = ssc.fileStream("")
inputDStream.foreachRDD(rdd => {
  try {
    // do something
  } catch {
    case e => e.printStackTrace()
  }
})

@tsface
Copy link
Author

tsface commented Apr 11, 2016

@lw-lin
谢谢你的解答

try-catch的代码还没有试过,测试了下MEMORY_AND_DISK_2,性能比MEMORY_AND_DISK差很多,目前测试业务下数据处理性能差不多是这样的关系 :4 * MEMORY_AND_DISK_2 = MEMORY_AND_DISK 。

Executor被kill的原因,是Active Job队列里面任务开始积压,处理时延增加。Job的提交周期是1秒,由于CPU平均使用率到95%左右,Receiver接收速率不变,每个Job处理时延增加到了5到10s,目前Job的提交Interval能动态指定吗?

@lw-lin
Copy link
Owner

lw-lin commented Apr 11, 2016

@tsface

现在有几个 receiver?几个 Executor、每个 Executor 几个 core?

block interval 是多大?batch interval(即 batch duration) 呢?每个 batch 处理多少 records?

@tsface
Copy link
Author

tsface commented Apr 11, 2016

@lw-lin

  • receiver : 2
  • Executor : 2
  • Vcore: 每个机器64个VCore,全分配给Executor

每个Block差不多6M左右,batch duration: 1s, 每个batch处理的events没有注意,应该是12000多个吧。

测试了下try-catch,可以解决Executor被kill的情况👍

@lw-lin
Copy link
Owner

lw-lin commented Apr 11, 2016

@tsface
好的,try-catch 只是个应急手段,看起来还是建议调整下 block interval 和每个 executor 的 core 数~

@tsface
Copy link
Author

tsface commented Apr 14, 2016

@lw-lin
关于这个问题我跟踪了下Driver和Executor端的debug日志,日志中有些问题暂时不明白,想请教下,下面信息是从Driver端Akka消息的角度整理的,完成的日志太大,不方便post

Driver:node4
Executors:node2,node3

  1. node2的上报数据更新请求:UpdateBlockInfo(input-0-1460509204597),处理成功
  2. node2发送AddBlock请求,处理成功,数据块存储在node2,块大小5.6MB
  3. sparkDriver端产生GetLocations消息,处理成功
  4. sparkDriver端发出RemoveBlock消息,此时node2上的BlockManager执行了remove操作
  5. node2发送UpdateBlockInfo消息,此时块的存储级别变为None
  6. sparkDriver端产生GetLocations,产生java.lang.Exception: Could not compute split, block input-0-1460509204597 not found (4次)(7~11的日志发生了两分钟后)
  7. node3上报更新请求:UpdateBlockInfo,存储块input-0-1460509204597到node3,块大小4.8MB
  8. node3产生AddBlock消息
  9. sparkDriver端产生GetLocations消息,处理成功
  10. sparkDriver端发出RemoveBlock消息,此时node3上的BlockManager执行了remove操作
  11. node3发送UpdateBlockInfo消息,Driver的blockid信息被删除

问题:

  1. 在4中,为什么sparkDriver的Akka端会产生RemoveBlock的消息,这个消息到底是怎么产生的?
  2. 在数据被删除的情况下JobScheduler将Tasks分发给node3,导致node3 getRemote的时候找不到数据,这个在任务调度的时序上是怎么样的一个过程?
  3. 既然数据已经被删除,为什么在node3上这个块数据又出现了,而且块的大小改变了?

谢谢!

@lw-lin
Copy link
Owner

lw-lin commented Apr 14, 2016

@tsface
收到。这个应用是跑在 Spark 1.? 的环境上的?Receiver 的 StorageLevel 是怎么设置的?整个 DAG 拓扑中有 window 操作吗?

@tsface
Copy link
Author

tsface commented Apr 14, 2016

@lw-lin
spark 1.5.2
StorageLevel : MEMORY_AND_DISK
没有window操作

@weibin0516
Copy link

@tsface 有没有调用过 StreamingContext#remember ?

@tsface
Copy link
Author

tsface commented Apr 18, 2016

@keepsimplefocus
没有

@weibin0516
Copy link

weibin0516 commented Apr 20, 2016

@tsface 我怀疑和 spark streaming 的清理机制有关。在 jobSet 完成和 checkpoint 的时候都会触发清理操作,这个时候可能会把需要用到的 blocks 删掉。http://www.jianshu.com/p/5e096df2618d 可能会给你一些启发,希望有帮助~

@airphoto
Copy link

@tsface 请问,你的问题解决了吗?我也遇到了相同的问题
我的运行模式是 standalone client模式,同时在网上请教了一下,说是也有可能driver的内存设置的太小,导致gc时间太长,我的driver内存设置是默认的(应该是1g吧),请问有可能是这个原因吗?

@proflin
Copy link

proflin commented Apr 22, 2016

@tsface
能把完整日志(driver+相关 executors)发给我吗?lwlin7#gmail.com
之前没遇到过这种情况

@tsface
Copy link
Author

tsface commented Apr 22, 2016

@proflin
已发

@tsface
Copy link
Author

tsface commented Apr 22, 2016

@keepsimplefocus
谢谢,源码中加了日志,在跟踪

@abetterme
Copy link

请问这个问题解决了吗,我也遇到了类似的问题

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants