[SPARK-6640][Core] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver
This PR moved the code of creating `HeartbeatReceiver` above the code of creating `schedulerBackend` to resolve the race condition. Author: zsxwing <zsxwing@gmail.com> Closes #5306 from zsxwing/SPARK-6640 and squashes the following commits: 840399d [zsxwing] Don't send TaskScheduler through Akka a90616a [zsxwing] Fix docs dd202c7 [zsxwing] Fix typo d7c250d [zsxwing] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver
This commit is contained in:
parent
2c43ea38ee
commit
88504b75ee
|
@ -37,6 +37,12 @@ private[spark] case class Heartbeat(
|
|||
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
|
||||
blockManagerId: BlockManagerId)
|
||||
|
||||
/**
|
||||
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
|
||||
* created.
|
||||
*/
|
||||
private[spark] case object TaskSchedulerIsSet
|
||||
|
||||
private[spark] case object ExpireDeadHosts
|
||||
|
||||
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
|
||||
|
@ -44,9 +50,11 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
|
|||
/**
|
||||
* Lives in the driver to receive heartbeats from executors..
|
||||
*/
|
||||
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
|
||||
private[spark] class HeartbeatReceiver(sc: SparkContext)
|
||||
extends Actor with ActorLogReceive with Logging {
|
||||
|
||||
private var scheduler: TaskScheduler = null
|
||||
|
||||
// executor ID -> timestamp of when the last heartbeat from this executor was received
|
||||
private val executorLastSeen = new mutable.HashMap[String, Long]
|
||||
|
||||
|
@ -71,12 +79,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
|
|||
}
|
||||
|
||||
override def receiveWithLogging: PartialFunction[Any, Unit] = {
|
||||
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
|
||||
val unknownExecutor = !scheduler.executorHeartbeatReceived(
|
||||
executorId, taskMetrics, blockManagerId)
|
||||
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
|
||||
executorLastSeen(executorId) = System.currentTimeMillis()
|
||||
sender ! response
|
||||
case TaskSchedulerIsSet =>
|
||||
scheduler = sc.taskScheduler
|
||||
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
|
||||
if (scheduler != null) {
|
||||
val unknownExecutor = !scheduler.executorHeartbeatReceived(
|
||||
executorId, taskMetrics, blockManagerId)
|
||||
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
|
||||
executorLastSeen(executorId) = System.currentTimeMillis()
|
||||
sender ! response
|
||||
} else {
|
||||
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
|
||||
// case rarely happens. However, if it really happens, log it and ask the executor to
|
||||
// register itself again.
|
||||
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
|
||||
sender ! HeartbeatResponse(reregisterBlockManager = true)
|
||||
}
|
||||
case ExpireDeadHosts =>
|
||||
expireDeadHosts()
|
||||
}
|
||||
|
|
|
@ -356,11 +356,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
val sparkUser = Utils.getCurrentUserName()
|
||||
executorEnvs("SPARK_USER") = sparkUser
|
||||
|
||||
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
|
||||
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
|
||||
private val heartbeatReceiver = env.actorSystem.actorOf(
|
||||
Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")
|
||||
|
||||
// Create and start the scheduler
|
||||
private[spark] var (schedulerBackend, taskScheduler) =
|
||||
SparkContext.createTaskScheduler(this, master)
|
||||
private val heartbeatReceiver = env.actorSystem.actorOf(
|
||||
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
|
||||
|
||||
heartbeatReceiver ! TaskSchedulerIsSet
|
||||
|
||||
@volatile private[spark] var dagScheduler: DAGScheduler = _
|
||||
try {
|
||||
dagScheduler = new DAGScheduler(this)
|
||||
|
|
Loading…
Reference in a new issue