Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于CogroupRDD的一点疑问以及依赖的一点问题 #46

Open
pzz2011 opened this issue Mar 28, 2016 · 2 comments
Open

关于CogroupRDD的一点疑问以及依赖的一点问题 #46

pzz2011 opened this issue Mar 28, 2016 · 2 comments

Comments

@pzz2011
Copy link

pzz2011 commented Mar 28, 2016

我看CogroupRDD的实现,没看懂narrowdependency或shuffledependency对cogrouprdd中partition的影响... 不知道如果a.cogroup(b) , a分别是rangepartitioner和hashpartitioner的话,中间生成的cogrouprdd的分区数莫非和rdd a的一样多?因为cogroup这个算子不能指定numPartitons呀
我看您在JobLogicalPlan章节中对dependency分了4类(或者说两打类), 而且看cogroupRDD的对于依赖的处理,似乎并没有这么复杂,完全无视了所谓的N:1 NarrowDependency。

override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length

// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
  case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
    val dependencyPartition = split.narrowDeps(depNum).get.split
    // Read them from the parent
    val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
    rddIterators += ((it, depNum))

  case shuffleDependency: ShuffleDependency[_, _, _] =>
    // Read map outputs of shuffle
    val it = SparkEnv.get.shuffleManager
      .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
      .read()
    rddIterators += ((it, depNum))
}
@leo-987
Copy link

leo-987 commented Jun 23, 2016

cogroup有一个可选参数指定task数,这个task数是不是就是partition数呢?

@Resemble
Copy link

我看CogroupRDD的实现,没看懂narrowdependency或shuffledependency对cogrouprdd中partition的影响... 不知道如果a.cogroup(b) , a分别是rangepartitioner和hashpartitioner的话,中间生成的cogrouprdd的分区数莫非和rdd a的一样多?因为cogroup这个算子不能指定numPartitons呀
我看您在JobLogicalPlan章节中对dependency分了4类(或者说两打类), 而且看cogroupRDD的对于依赖的处理,似乎并没有这么复杂,完全无视了所谓的N:1 NarrowDependency。

override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length

// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
  case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
    val dependencyPartition = split.narrowDeps(depNum).get.split
    // Read them from the parent
    val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
    rddIterators += ((it, depNum))

  case shuffleDependency: ShuffleDependency[_, _, _] =>
    // Read map outputs of shuffle
    val it = SparkEnv.get.shuffleManager
      .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
      .read()
    rddIterators += ((it, depNum))
}

hi,你看懂了吗,我也不懂这个,为什么分区器相同就是一对一依赖,并且还可能产生 shuffle(因为划分了 stage)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants