[SPARK-32091][CORE] Ignore timeout error when remove blocks on the lost executor

### What changes were proposed in this pull request?

This PR adds the check to see whether the executor is lost (by asking the `CoarseGrainedSchedulerBackend`) after timeout error raised in `BlockManagerMasterEndponit` due to removing blocks(e.g. RDD, broadcast, shuffle). If the executor is lost, we will ignore the error. Otherwise, throw the error.

### Why are the changes needed?

When removing blocks(e.g. RDD, broadcast, shuffle), `BlockManagerMaserEndpoint` will make RPC calls to each known `BlockManagerSlaveEndpoint` to remove the specific blocks. The PRC call sometimes could end in a timeout when the executor has been lost, but only notified the `BlockManagerMasterEndpoint` after the removing call has already happened. The timeout error could therefore fail the whole job.

In this case, we actually could just ignore the error since those blocks on the lost executor could be considered as removed already.

### Does this PR introduce _any_ user-facing change?

Yes. In case of users hits this issue, they will have the job executed successfully instead of throwing the exception.

### How was this patch tested?

Added unit tests.

Closes #28924 from Ngone51/ignore-timeout-error-for-inactive-executor.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yi.wu 2020-07-10 13:36:29 +00:00 committed by Wenchen Fan
parent c8779d9dfc
commit 578b90cdec
6 changed files with 191 additions and 19 deletions

View file

@ -132,4 +132,6 @@ private[spark] object CoarseGrainedClusterMessages {
// Used internally by executors to shut themselves down.
case object Shutdown extends CoarseGrainedClusterMessage
// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage
}

View file

@ -285,6 +285,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
Option(delegationTokens.get()),
rp)
context.reply(reply)
case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId))
case e =>
logError(s"Received unexpected ask ${e}")
}

View file

@ -142,7 +142,8 @@ class BlockManagerMaster(
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}
@ -153,7 +154,8 @@ class BlockManagerMaster(
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}
@ -166,7 +168,8 @@ class BlockManagerMaster(
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}

View file

@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.Random
import scala.util.control.NonFatal
import com.google.common.cache.CacheBuilder
@ -32,8 +33,9 @@ import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
@ -95,6 +97,9 @@ class BlockManagerMasterEndpoint(
private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined
private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf)
private lazy val driverEndpoint =
RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
@ -168,6 +173,50 @@ class BlockManagerMasterEndpoint(
stop()
}
/**
* A function that used to handle the failures when removing blocks. In general, the failure
* should be considered as non-fatal since it won't cause any correctness issue. Therefore,
* this function would prefer to log the exception and return the default value. We only throw
* the exception when there's a TimeoutException from an active executor, which implies the
* unhealthy status of the executor while the driver still not be aware of it.
* @param blockType should be one of "RDD", "shuffle", "broadcast", "block", used for log
* @param blockId the string value of a certain block id, used for log
* @param bmId the BlockManagerId of the BlockManager, where we're trying to remove the block
* @param defaultValue the return value of a failure removal. e.g., 0 means no blocks are removed
* @tparam T the generic type for defaultValue, Int or Boolean.
* @return the defaultValue or throw exception if the executor is active but reply late.
*/
private def handleBlockRemovalFailure[T](
blockType: String,
blockId: String,
bmId: BlockManagerId,
defaultValue: T): PartialFunction[Throwable, T] = {
case e: IOException =>
logWarning(s"Error trying to remove $blockType $blockId" +
s" from block manager $bmId", e)
defaultValue
case t: TimeoutException =>
val executorId = bmId.executorId
val isAlive = try {
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(executorId))
} catch {
// ignore the non-fatal error from driverEndpoint since the caller doesn't really
// care about the return result of removing blocks. And so we could avoid breaking
// down the whole application.
case NonFatal(e) =>
logError(s"Fail to know the executor $executorId is alive or not.", e)
false
}
if (!isAlive) {
logWarning(s"Error trying to remove $blockType $blockId. " +
s"The executor $executorId may have been lost.", t)
defaultValue
} else {
throw t
}
}
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks
// from the slaves.
@ -207,10 +256,8 @@ class BlockManagerMasterEndpoint(
}
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
bmInfo.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove RDD ${removeMsg.rddId} " +
s"from block manager ${bmInfo.blockManagerId}", e)
0 // zero blocks were removed
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
}
}.toSeq
@ -235,7 +282,10 @@ class BlockManagerMasterEndpoint(
val removeMsg = RemoveShuffle(shuffleId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Boolean](removeMsg)
bm.slaveEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
}
}.toSeq
)
}
@ -252,10 +302,8 @@ class BlockManagerMasterEndpoint(
}
val futures = requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove broadcast $broadcastId from block manager " +
s"${bm.blockManagerId}", e)
0 // zero blocks were removed
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("broadcast", broadcastId.toString, bm.blockManagerId, 0)
}
}.toSeq
@ -350,11 +398,14 @@ class BlockManagerMasterEndpoint(
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
if (blockManager.isDefined) {
blockManager.foreach { bm =>
// Remove the block from the slave's BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)).recover {
// use false as default value means no blocks were removed
handleBlockRemovalFailure("block", blockId.toString, bm.blockManagerId, false)
}
}
}
}

View file

@ -17,6 +17,8 @@
package org.apache.spark.util
import scala.concurrent.duration._
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Network._
@ -54,6 +56,14 @@ private[spark] object RpcUtils {
RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
}
/**
* Infinite timeout is used internally, so there's no timeout configuration property that
* controls it. Therefore, we use "infinite" without any specific reason as its timeout
* configuration property. And its timeout property should never be accessed since infinite
* means we never timeout.
*/
val INFINITE_TIMEOUT = new RpcTimeout(Long.MaxValue.nanos, "infinite")
private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
/** Returns the configured max message size for messages in bytes. */

View file

@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.{Future, TimeoutException}
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.reflect.ClassTag
@ -49,8 +49,9 @@ import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransport
import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalBlockStoreClient}
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerBlockUpdated}
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
@ -93,6 +94,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.set(MEMORY_STORAGE_FRACTION, 0.999)
.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
.set(Network.RPC_ASK_TIMEOUT, "5s")
}
private def makeBlockManager(
@ -137,8 +139,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf = new SparkConf(false)
init(conf)
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
rpcEnv = RpcEnv.create("test", conf.get(config.DRIVER_HOST_ADDRESS),
conf.get(config.DRIVER_PORT), conf, securityMgr)
conf.set(DRIVER_PORT, rpcEnv.address.port)
conf.set(DRIVER_HOST_ADDRESS, rpcEnv.address.host)
// Mock SparkContext to reduce the memory usage of tests. It's fine since the only reason we
// need to create a SparkContext is to initialize LiveListenerBus.
@ -177,6 +181,105 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
blockManager.stop()
}
/**
* Setup driverEndpoint, executor-1(BlockManager), executor-2(BlockManager) to simulate
* the real cluster before the tests. Any requests from driver to executor-1 will be responded
* in time. However, any requests from driver to executor-2 will be timeouted, in order to test
* the specific handling of `TimeoutException`, which is raised at driver side.
*
* And, when `withLost` is true, we will not register the executor-2 to the driver. Therefore,
* it behaves like a lost executor in terms of driver's view. When `withLost` is false, we'll
* register the executor-2 normally.
*/
private def setupBlockManagerMasterWithBlocks(withLost: Boolean): Unit = {
// set up a simple DriverEndpoint which simply adds executorIds and
// checks whether a certain executorId has been added before.
val driverEndpoint = rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
new RpcEndpoint {
private val executorSet = mutable.HashSet[String]()
override val rpcEnv: RpcEnv = this.rpcEnv
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _, _, _, _, _, _) =>
executorSet += executorId
context.reply(true)
case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
context.reply(executorSet.contains(executorId))
}
}
)
def createAndRegisterBlockManager(timeout: Boolean): BlockManagerId = {
val id = if (timeout) "timeout" else "normal"
val bmRef = rpcEnv.setupEndpoint(s"bm-$id", new RpcEndpoint {
override val rpcEnv: RpcEnv = this.rpcEnv
private def reply[T](context: RpcCallContext, response: T): Unit = {
if (timeout) {
Thread.sleep(conf.getTimeAsMs(Network.RPC_ASK_TIMEOUT.key) + 1000)
}
context.reply(response)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RemoveRdd(_) => reply(context, 1)
case RemoveBroadcast(_, _) => reply(context, 1)
case RemoveShuffle(_) => reply(context, true)
}
})
val bmId = BlockManagerId(s"exec-$id", "localhost", 1234, None)
master.registerBlockManager(bmId, Array.empty, 2000, 0, bmRef)
}
// set up normal bm1
val bm1Id = createAndRegisterBlockManager(false)
// set up bm2, which intentionally takes more time than RPC_ASK_TIMEOUT to
// remove rdd/broadcast/shuffle in order to raise timeout error
val bm2Id = createAndRegisterBlockManager(true)
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.RegisterExecutor(
bm1Id.executorId, null, bm1Id.host, 1, Map.empty, Map.empty,
Map.empty, 0))
if (!withLost) {
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.RegisterExecutor(
bm2Id.executorId, null, bm1Id.host, 1, Map.empty, Map.empty, Map.empty, 0))
}
eventually(timeout(5.seconds)) {
// make sure both bm1 and bm2 are registered at driver side BlockManagerMaster
verify(master, times(2))
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
assert(driverEndpoint.askSync[Boolean](
CoarseGrainedClusterMessages.IsExecutorAlive(bm1Id.executorId)))
assert(driverEndpoint.askSync[Boolean](
CoarseGrainedClusterMessages.IsExecutorAlive(bm2Id.executorId)) === !withLost)
}
// update RDD block info for bm1 and bm2 (Broadcast and shuffle don't report block
// locations to BlockManagerMaster)
master.updateBlockInfo(bm1Id, RDDBlockId(0, 0), StorageLevel.MEMORY_ONLY, 100, 0)
master.updateBlockInfo(bm2Id, RDDBlockId(0, 1), StorageLevel.MEMORY_ONLY, 100, 0)
}
test("SPARK-32091: count failures from active executors when remove rdd/broadcast/shuffle") {
setupBlockManagerMasterWithBlocks(false)
// fail because bm2 will timeout and it's not lost anymore
assert(intercept[Exception](master.removeRdd(0, true))
.getCause.isInstanceOf[TimeoutException])
assert(intercept[Exception](master.removeBroadcast(0, true, true))
.getCause.isInstanceOf[TimeoutException])
assert(intercept[Exception](master.removeShuffle(0, true))
.getCause.isInstanceOf[TimeoutException])
}
test("SPARK-32091: ignore failures from lost executors when remove rdd/broadcast/shuffle") {
setupBlockManagerMasterWithBlocks(true)
// succeed because bm1 will remove rdd/broadcast successfully and bm2 will
// timeout but ignored as it's lost
master.removeRdd(0, true)
master.removeBroadcast(0, true, true)
master.removeShuffle(0, true)
}
test("StorageLevel object caching") {
val level1 = StorageLevel(false, false, false, 3)
// this should return the same object as level1