[SPARK-12411][CORE] Decrease executor heartbeat timeout to match heartbeat interval
Previously, the rpc timeout was the default network timeout, which is the same value the driver uses to determine dead executors. This means if there is a network issue, the executor is determined dead after one heartbeat attempt. There is a separate config for the heartbeat interval which is a better value to use for the heartbeat RPC. With this change, the executor will make multiple heartbeat attempts even with RPC issues. Author: Nong Li <nong@databricks.com> Closes #10365 from nongli/spark-12411.
This commit is contained in:
parent
60da0e11f6
commit
0514e8d4b6
|
@ -30,6 +30,7 @@ import scala.util.control.NonFatal
|
|||
import org.apache.spark._
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.memory.TaskMemoryManager
|
||||
import org.apache.spark.rpc.RpcTimeout
|
||||
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
|
||||
import org.apache.spark.shuffle.FetchFailedException
|
||||
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
|
||||
|
@ -445,7 +446,8 @@ private[spark] class Executor(
|
|||
|
||||
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
|
||||
try {
|
||||
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message)
|
||||
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
|
||||
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
|
||||
if (response.reregisterBlockManager) {
|
||||
logInfo("Told to re-register on heartbeat")
|
||||
env.blockManager.reregister()
|
||||
|
|
Loading…
Reference in a new issue