Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

《4.1 Executor 端长时容错详解.md》讨论区 #11

Open
lw-lin opened this issue Dec 16, 2015 · 6 comments
Open

《4.1 Executor 端长时容错详解.md》讨论区 #11

lw-lin opened this issue Dec 16, 2015 · 6 comments

Comments

@lw-lin
Copy link
Owner

lw-lin commented Dec 16, 2015

这里是 《4.1 Executor 端长时容错详解.md》 讨论区。

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

@luphappy
Copy link

您好,我想问一下,如果要是想catch spark Exception,应该怎么catch
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchInterval);

    JavaPairInputDStream<String, String> logs = KafkaUtils
    .createDirectStream(jssc, String.class, String.class,
            StringDecoder.class, StringDecoder.class, kafkaParams,
            topicsSet);

     dotask()

这个应该怎么catch才能让application不退出呢

@bhlx3lyx7
Copy link

您好,我想问下,文章中关于 细粒度忽略 的修改原生Spark Streaming的代码在哪里呀

@Adamyuanyuan
Copy link

您好,感谢编辑这么优秀的解析系列,今天我也看到Structured Streaming 更新了,本节的“WriteAheadLog 框架“部分的第一张图:“Spark Streaming 里的 WAL 框架图”应该不对,请问是笔误吗?我在网上找了一张类似的,是否可用?
557a2f047f638_middle

@lw-lin
Copy link
Owner Author

lw-lin commented Jan 3, 2017

@Adamyuanyuan

确实第一张图不对。你找的这张图(Spark 1.2 版本)结构上是正确的,除了 Spark 1.2 以来有些类的名字对不上了需要修改下 —— 稍后我把图给更新一下。Thanks for bringing this up!

@bjkonglu
Copy link

@lw-lin
你在你的文章中提到细粒度忽略处理,你们是怎么修改的,很好奇,想借阅一下你们修改的部分。谢谢!

@lw-lin
Copy link
Owner Author

lw-lin commented Oct 31, 2017

@bjkonglu

主要是修改 BlockRDD.scala,修改后整个文件源码如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.storage.{BlockId, BlockManager, StreamBlockId}
import scala.Some

private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
  val index = idx
}

private[spark]
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
  extends RDD[T](sc, Nil) {

  @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
  @volatile private var _isValid = true

  override def getPartitions: Array[Partition] = {
    assertValid()
    (0 until blockIds.length).map { i =>
      new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
    }.toArray
  }

  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    assertValid()
    val blockManager = SparkEnv.get.blockManager
    val blockId = split.asInstanceOf[BlockRDDPartition].blockId

    //=========================modification begin ======================
    val isStreamBlock = blockId.isInstanceOf[StreamBlockId]
    val enableBlockRddMissingIgnoreFeature =
      SparkEnv.get.conf.getBoolean("spark._.enableBlockRddMissingIgnoreFeature" , false)
    val defaultTaskRetryNum =
      SparkEnv.get.conf.getInt("spark.task.maxFailures" , 4)
    val maxRetryNum =
      SparkEnv.get.conf.getInt("spark._.blockRddMissingMaxRetryNum" , 4).min(defaultTaskRetryNum)
    val attemptTime = context.attemptNumber()
    val shouldIgnore = attemptTime >= maxRetryNum - 1
    //=========================modification  end ======================
    blockManager.get(blockId) match {
      case Some(block) => block.data.asInstanceOf[Iterator[T]]
      case None =>
        //=========================modification begin ======================
        if (enableBlockRddMissingIgnoreFeature && isStreamBlock && shouldIgnore) {
          logError("block missing : " + blockId)
          Seq().iterator
        } else {
          throw new Exception(s"Could not compute split, block $blockId of RDD $id not found")
        }
      //=========================modification  end ======================
    }
  }

  override def getPreferredLocations(split: Partition): Seq[String] = {
    assertValid()
    _locations(split.asInstanceOf[BlockRDDPartition].blockId)
  }

  /**
   * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
   * irreversible operation, as the data in the blocks cannot be recovered back
   * once removed. Use it with caution.
   */
  private[spark] def removeBlocks() {
    blockIds.foreach { blockId =>
      sparkContext.env.blockManager.master.removeBlock(blockId)
    }
    _isValid = false
  }

  /**
   * Whether this BlockRDD is actually usable. This will be false if the data blocks have been
   * removed using `this.removeBlocks`.
   */
  private[spark] def isValid: Boolean = {
    _isValid
  }

  /** Check if this BlockRDD is valid. If not valid, exception is thrown. */
  private[spark] def assertValid() {
    if (!isValid) {
      throw new SparkException(
        "Attempted to use %s after its blocks have been removed!".format(toString))
    }
  }

  protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
    _locations
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants