Bugfixes/improvements to scheduler
Move the PR#517 of apache-incubator-spark to the apache-spark Author: Mridul Muralidharan <mridul@gmail.com> Closes #159 from mridulm/master and squashes the following commits: 5ff59c2 [Mridul Muralidharan] Change property in suite also 167fad8 [Mridul Muralidharan] Address review comments 9bda70e [Mridul Muralidharan] Address review comments, akwats add to failedExecutors 270d841 [Mridul Muralidharan] Address review comments fa5d9f1 [Mridul Muralidharan] Bugfixes/improvements to scheduler : PR #517
This commit is contained in:
parent
6112270c94
commit
ab747d39dd
|
@ -235,7 +235,8 @@ private[spark] class TaskSchedulerImpl(
|
|||
taskIdToExecutorId(tid) = execId
|
||||
activeExecutorIds += execId
|
||||
executorsByHost(host) += execId
|
||||
availableCpus(i) -= 1
|
||||
availableCpus(i) -= taskSet.CPUS_PER_TASK
|
||||
assert (availableCpus(i) >= 0)
|
||||
launchedTask = true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,6 +59,15 @@ private[spark] class TaskSetManager(
|
|||
// CPUs to request per task
|
||||
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
|
||||
|
||||
/*
|
||||
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
|
||||
* does not realize right away leading to repeated task failures. If enabled,
|
||||
* this temporarily prevents a task from re-launching on an executor where
|
||||
* it just failed.
|
||||
*/
|
||||
private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
|
||||
conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
|
||||
|
||||
// Quantile of tasks at which to start speculation
|
||||
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
|
||||
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
|
||||
|
@ -71,7 +80,9 @@ private[spark] class TaskSetManager(
|
|||
val numTasks = tasks.length
|
||||
val copiesRunning = new Array[Int](numTasks)
|
||||
val successful = new Array[Boolean](numTasks)
|
||||
val numFailures = new Array[Int](numTasks)
|
||||
private val numFailures = new Array[Int](numTasks)
|
||||
// key is taskId, value is a Map of executor id to when it failed
|
||||
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
|
||||
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
|
||||
var tasksSuccessful = 0
|
||||
|
||||
|
@ -228,12 +239,18 @@ private[spark] class TaskSetManager(
|
|||
* This method also cleans up any tasks in the list that have already
|
||||
* been launched, since we want that to happen lazily.
|
||||
*/
|
||||
private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
|
||||
while (!list.isEmpty) {
|
||||
val index = list.last
|
||||
list.trimEnd(1)
|
||||
if (copiesRunning(index) == 0 && !successful(index)) {
|
||||
return Some(index)
|
||||
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
|
||||
var indexOffset = list.size
|
||||
|
||||
while (indexOffset > 0) {
|
||||
indexOffset -= 1
|
||||
val index = list(indexOffset)
|
||||
if (!executorIsBlacklisted(execId, index)) {
|
||||
// This should almost always be list.trimEnd(1) to remove tail
|
||||
list.remove(indexOffset)
|
||||
if (copiesRunning(index) == 0 && !successful(index)) {
|
||||
return Some(index)
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
|
@ -244,6 +261,21 @@ private[spark] class TaskSetManager(
|
|||
taskAttempts(taskIndex).exists(_.host == host)
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this re-execution of a failed task on an executor it already failed in before
|
||||
* EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
|
||||
*/
|
||||
private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
|
||||
if (failedExecutors.contains(taskId)) {
|
||||
val failed = failedExecutors.get(taskId).get
|
||||
|
||||
return failed.contains(execId) &&
|
||||
clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a speculative task for a given executor if any are available. The task should not have
|
||||
* an attempt running on this host, in case the host is slow. In addition, the task should meet
|
||||
|
@ -254,10 +286,13 @@ private[spark] class TaskSetManager(
|
|||
{
|
||||
speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
|
||||
|
||||
def canRunOnHost(index: Int): Boolean =
|
||||
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
|
||||
|
||||
if (!speculatableTasks.isEmpty) {
|
||||
// Check for process-local or preference-less tasks; note that tasks can be process-local
|
||||
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
|
||||
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
|
||||
for (index <- speculatableTasks if canRunOnHost(index)) {
|
||||
val prefs = tasks(index).preferredLocations
|
||||
val executors = prefs.flatMap(_.executorId)
|
||||
if (prefs.size == 0 || executors.contains(execId)) {
|
||||
|
@ -268,7 +303,7 @@ private[spark] class TaskSetManager(
|
|||
|
||||
// Check for node-local tasks
|
||||
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
|
||||
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
|
||||
for (index <- speculatableTasks if canRunOnHost(index)) {
|
||||
val locations = tasks(index).preferredLocations.map(_.host)
|
||||
if (locations.contains(host)) {
|
||||
speculatableTasks -= index
|
||||
|
@ -280,7 +315,7 @@ private[spark] class TaskSetManager(
|
|||
// Check for rack-local tasks
|
||||
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
|
||||
for (rack <- sched.getRackForHost(host)) {
|
||||
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
|
||||
for (index <- speculatableTasks if canRunOnHost(index)) {
|
||||
val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
|
||||
if (racks.contains(rack)) {
|
||||
speculatableTasks -= index
|
||||
|
@ -292,7 +327,7 @@ private[spark] class TaskSetManager(
|
|||
|
||||
// Check for non-local tasks
|
||||
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
|
||||
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
|
||||
for (index <- speculatableTasks if canRunOnHost(index)) {
|
||||
speculatableTasks -= index
|
||||
return Some((index, TaskLocality.ANY))
|
||||
}
|
||||
|
@ -309,12 +344,12 @@ private[spark] class TaskSetManager(
|
|||
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
|
||||
: Option[(Int, TaskLocality.Value)] =
|
||||
{
|
||||
for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
|
||||
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
|
||||
return Some((index, TaskLocality.PROCESS_LOCAL))
|
||||
}
|
||||
|
||||
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
|
||||
for (index <- findTaskFromList(getPendingTasksForHost(host))) {
|
||||
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
|
||||
return Some((index, TaskLocality.NODE_LOCAL))
|
||||
}
|
||||
}
|
||||
|
@ -322,19 +357,19 @@ private[spark] class TaskSetManager(
|
|||
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
|
||||
for {
|
||||
rack <- sched.getRackForHost(host)
|
||||
index <- findTaskFromList(getPendingTasksForRack(rack))
|
||||
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
|
||||
} {
|
||||
return Some((index, TaskLocality.RACK_LOCAL))
|
||||
}
|
||||
}
|
||||
|
||||
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
|
||||
for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
|
||||
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
|
||||
return Some((index, TaskLocality.PROCESS_LOCAL))
|
||||
}
|
||||
|
||||
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
|
||||
for (index <- findTaskFromList(allPendingTasks)) {
|
||||
for (index <- findTaskFromList(execId, allPendingTasks)) {
|
||||
return Some((index, TaskLocality.ANY))
|
||||
}
|
||||
}
|
||||
|
@ -460,6 +495,7 @@ private[spark] class TaskSetManager(
|
|||
logInfo("Ignorning task-finished event for TID " + tid + " because task " +
|
||||
index + " has already completed successfully")
|
||||
}
|
||||
failedExecutors.remove(index)
|
||||
maybeFinishTaskSet()
|
||||
}
|
||||
|
||||
|
@ -480,7 +516,7 @@ private[spark] class TaskSetManager(
|
|||
logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
|
||||
}
|
||||
var taskMetrics : TaskMetrics = null
|
||||
var failureReason = "unknown"
|
||||
var failureReason: String = null
|
||||
reason match {
|
||||
case fetchFailed: FetchFailed =>
|
||||
logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
|
||||
|
@ -488,9 +524,11 @@ private[spark] class TaskSetManager(
|
|||
successful(index) = true
|
||||
tasksSuccessful += 1
|
||||
}
|
||||
// Not adding to failed executors for FetchFailed.
|
||||
isZombie = true
|
||||
|
||||
case TaskKilled =>
|
||||
// Not adding to failed executors for TaskKilled.
|
||||
logWarning("Task %d was killed.".format(tid))
|
||||
|
||||
case ef: ExceptionFailure =>
|
||||
|
@ -504,7 +542,8 @@ private[spark] class TaskSetManager(
|
|||
return
|
||||
}
|
||||
val key = ef.description
|
||||
failureReason = "Exception failure: %s".format(ef.description)
|
||||
failureReason = "Exception failure in TID %s on host %s: %s".format(
|
||||
tid, info.host, ef.description)
|
||||
val now = clock.getTime()
|
||||
val (printFull, dupCount) = {
|
||||
if (recentExceptions.contains(key)) {
|
||||
|
@ -533,11 +572,16 @@ private[spark] class TaskSetManager(
|
|||
failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
|
||||
logWarning(failureReason)
|
||||
|
||||
case _ => {}
|
||||
case _ =>
|
||||
failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
|
||||
}
|
||||
// always add to failed executors
|
||||
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
|
||||
put(info.executorId, clock.getTime())
|
||||
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
|
||||
addPendingTask(index)
|
||||
if (!isZombie && state != TaskState.KILLED) {
|
||||
assert (null != failureReason)
|
||||
numFailures(index) += 1
|
||||
if (numFailures(index) >= maxTaskFailures) {
|
||||
logError("Task %s:%d failed %d times; aborting job".format(
|
||||
|
|
|
@ -298,6 +298,93 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
test("executors should be blacklisted after task failure, in spite of locality preferences") {
|
||||
val rescheduleDelay = 300L
|
||||
val conf = new SparkConf().
|
||||
set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString).
|
||||
// dont wait to jump locality levels in this test
|
||||
set("spark.locality.wait", "0")
|
||||
|
||||
sc = new SparkContext("local", "test", conf)
|
||||
// two executors on same host, one on different.
|
||||
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
|
||||
("exec1.1", "host1"), ("exec2", "host2"))
|
||||
// affinity to exec1 on host1 - which we will fail.
|
||||
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
|
||||
val clock = new FakeClock
|
||||
val manager = new TaskSetManager(sched, taskSet, 4, clock)
|
||||
|
||||
{
|
||||
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
|
||||
assert(offerResult.isDefined, "Expect resource offer to return a task")
|
||||
|
||||
assert(offerResult.get.index === 0)
|
||||
assert(offerResult.get.executorId === "exec1")
|
||||
|
||||
// Cause exec1 to fail : failure 1
|
||||
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
|
||||
assert(!sched.taskSetsFailed.contains(taskSet.id))
|
||||
|
||||
// Ensure scheduling on exec1 fails after failure 1 due to blacklist
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
|
||||
}
|
||||
|
||||
// Run the task on exec1.1 - should work, and then fail it on exec1.1
|
||||
{
|
||||
val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
|
||||
assert(offerResult.isDefined,
|
||||
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
|
||||
|
||||
assert(offerResult.get.index === 0)
|
||||
assert(offerResult.get.executorId === "exec1.1")
|
||||
|
||||
// Cause exec1.1 to fail : failure 2
|
||||
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
|
||||
assert(!sched.taskSetsFailed.contains(taskSet.id))
|
||||
|
||||
// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
|
||||
assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
|
||||
}
|
||||
|
||||
// Run the task on exec2 - should work, and then fail it on exec2
|
||||
{
|
||||
val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
|
||||
assert(offerResult.isDefined, "Expect resource offer to return a task")
|
||||
|
||||
assert(offerResult.get.index === 0)
|
||||
assert(offerResult.get.executorId === "exec2")
|
||||
|
||||
// Cause exec2 to fail : failure 3
|
||||
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
|
||||
assert(!sched.taskSetsFailed.contains(taskSet.id))
|
||||
|
||||
// Ensure scheduling on exec2 fails after failure 3 due to blacklist
|
||||
assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
|
||||
}
|
||||
|
||||
// After reschedule delay, scheduling on exec1 should be possible.
|
||||
clock.advance(rescheduleDelay)
|
||||
|
||||
{
|
||||
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
|
||||
assert(offerResult.isDefined, "Expect resource offer to return a task")
|
||||
|
||||
assert(offerResult.get.index === 0)
|
||||
assert(offerResult.get.executorId === "exec1")
|
||||
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
|
||||
|
||||
// Cause exec1 to fail : failure 4
|
||||
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
|
||||
}
|
||||
|
||||
// we have failed the same task 4 times now : task id should now be in taskSetsFailed
|
||||
assert(sched.taskSetsFailed.contains(taskSet.id))
|
||||
}
|
||||
|
||||
def createTaskResult(id: Int): DirectTaskResult[Int] = {
|
||||
val valueSer = SparkEnv.get.serializer.newInstance()
|
||||
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
|
||||
|
|
Loading…
Reference in a new issue