[SPARK-14485][CORE] ignore task finished for executor lost and removed by driver

Now, when executor is removed by driver with heartbeats timeout, driver will re-queue the task on this executor and send a kill command to cluster to kill this executor.
But, in a situation, the running task of this executor is finished and return result to driver before this executor killed by kill command sent by driver. At this situation, driver will accept the task finished event and ignore speculative task and re-queued task.
But, as we know, this executor has removed by driver, the result of this finished task can not save in driver because the BlockManagerId has also removed from BlockManagerMaster by driver. So, the result data of this stage is not complete, and then, it will cause fetch failure. For more details, [link to jira issues SPARK-14485](https://issues.apache.org/jira/browse/SPARK-14485)
This PR introduce a mechanism to ignore this kind of task finished.

N/A

Author: zhonghaihua <793507405@qq.com>

Closes #12258 from zhonghaihua/ignoreTaskFinishForExecutorLostAndRemovedByDriver.
This commit is contained in:
zhonghaihua 2016-06-07 16:30:58 -07:00 committed by Marcelo Vanzin
parent 6ecedf39b4
commit 695dbc816a

View file

@ -352,9 +352,11 @@ private[spark] class TaskSchedulerImpl(
}
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
var executorId: String = null
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
executorId = execId
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
@ -362,7 +364,17 @@ private[spark] class TaskSchedulerImpl(
}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
// In some case, executor has already been removed by driver for heartbeats timeout,
// but at sometime, before executor killed by cluster, the task of running on this
// executor is finished and return task success state to driver. However, this kinds
// of task should be ignored, because the task on this executor is already re-queued
// by driver. For more details, can check in SPARK-14485.
if (executorId != null && !executorIdToTaskCount.contains(executorId)) {
logInfo(s"Ignoring update with state $state for TID $tid because its executor " +
s"has already been removed by driver")
} else {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
}
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)