Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于第三章第二幅图的理解 #53

Open
zqhxuyuan opened this issue Jul 4, 2016 · 4 comments
Open

关于第三章第二幅图的理解 #53

zqhxuyuan opened this issue Jul 4, 2016 · 4 comments

Comments

@zqhxuyuan
Copy link

zqhxuyuan commented Jul 4, 2016

这里划分task的时候我的理解是从最后的task往前倒退,如果依赖什么就计算什么
那么以FlatMappedValuesRDD的第一个Partition为例,回退到左上角的时候ShuffleRDD的第二个和第三个Partition不应该被计算吧。这两个Partition的线不应该是粗线吧。

sparktask

@lw-lin
Copy link

lw-lin commented Oct 20, 2016

@zqhxuyuan Spark 的 stage 和 stage 是按照前后顺序来下发的,所以先有左上和左下的 RDD 分别计算完成,再有右边的 RDD 通过 iterator 的方式由后面触发同一个 stage 里前面的元素的计算。Hope this helps!

@Angryrou
Copy link

Angryrou commented May 3, 2017

谢谢两位作者的分享!
@lw-lin 您好!正好在看 Jerry 文章的时候对照着 Spark Streaming 遇到了一个问题,想请教一下。

groupByKey 在 map 端是不做 combine 的。PariRDDFunctions.scala 中,groupByKey 在调用 combineByKeyWithTag 的时候,mapSideCombine 的参数设了 false(默认是true)

  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

但是在 Streaming 里,我发现它没用做这个处理。PairDStreamFunctions.scala 中,groupByKey 在调用 combineByKey 的时候却没有设置 mapSideCombine 的值。您觉得这里是它忘了吗?还是有什么道理?

  def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = ssc.withScope {
    val createCombiner = (v: V) => ArrayBuffer[V](v)
    val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
    val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
    combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
      .asInstanceOf[DStream[(K, Iterable[V])]]
  }

@lw-lin
Copy link

lw-lin commented May 3, 2017

@Angryrou good catch.

我翻一下 git blame,5 年前的时候 core 和 streaming 都没有特别设置 mapSideCombine, 参见 streaming 的这里。然后 4 年前,core 这里做了修改,设置了 mapSideCombine = false, 但没有同时修改 streaming 这边。所以就这样了。

你可以给 streaming 这边提个 PR,fix 下 :)

@Angryrou
Copy link

Angryrou commented May 3, 2017

@lw-lin 恩恩 谢谢!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants