SPARK-1937: fix issue with task locality

Don't check executor/host availability when creating a TaskSetManager. Because the executors may haven't been registered when the TaskSetManager is created, in which case all tasks will be considered "has no preferred locations", and thus losing data locality in later scheduling.

Author: Rui Li <rui.li@intel.com>
Author: lirui-intel <rui.li@intel.com>

Closes #892 from lirui-intel/delaySchedule and squashes the following commits:

8444d7c [Rui Li] fix code style
fafd57f [Rui Li] keep locality constraints within the valid levels
18f9e05 [Rui Li] restrict allowed locality
5b3fb2f [Rui Li] refine UT
99f843e [Rui Li] add unit test and fix bug
fff4123 [Rui Li] fix computing valid locality levels
685ed3d [Rui Li] remove delay shedule for pendingTasksWithNoPrefs
7b0177a [Rui Li] remove redundant code
c7b93b5 [Rui Li] revise patch
3d7da02 [lirui-intel] Update TaskSchedulerImpl.scala
cab4c71 [Rui Li] revised patch
539a578 [Rui Li] fix code style
cf0d6ac [Rui Li] fix code style
3dfae86 [Rui Li] re-compute pending tasks when new host is added
a225ac2 [Rui Li] SPARK-1937: fix issue with task locality
This commit is contained in:
Rui Li 2014-06-24 11:40:37 -07:00 committed by Matei Zaharia
parent 420c1c3e1b
commit 924b7082b1
3 changed files with 71 additions and 14 deletions

View file

@ -210,11 +210,14 @@ private[spark] class TaskSchedulerImpl(
SparkEnv.set(sc.env) SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname // Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) { for (o <- offers) {
executorIdToHost(o.executorId) = o.host executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) { if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]() executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host) executorAdded(o.executorId, o.host)
newExecAvail = true
} }
} }
@ -227,12 +230,15 @@ private[spark] class TaskSchedulerImpl(
for (taskSet <- sortedTaskSets) { for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format( logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks)) taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
} }
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them. // of locality levels so that it gets a chance to launch local tasks on all of them.
var launchedTask = false var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do { do {
launchedTask = false launchedTask = false
for (i <- 0 until shuffledOffers.size) { for (i <- 0 until shuffledOffers.size) {

View file

@ -118,7 +118,7 @@ private[spark] class TaskSetManager(
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
// Set containing pending tasks with no locality preferences. // Set containing pending tasks with no locality preferences.
val pendingTasksWithNoPrefs = new ArrayBuffer[Int] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
// Set containing all pending tasks (also used as a stack, as above). // Set containing all pending tasks (also used as a stack, as above).
val allPendingTasks = new ArrayBuffer[Int] val allPendingTasks = new ArrayBuffer[Int]
@ -153,8 +153,8 @@ private[spark] class TaskSetManager(
} }
// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
val myLocalityLevels = computeValidLocalityLevels() var myLocalityLevels = computeValidLocalityLevels()
val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
// Delay scheduling variables: we keep track of our current locality level and the time we // Delay scheduling variables: we keep track of our current locality level and the time we
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires. // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
@ -181,16 +181,14 @@ private[spark] class TaskSetManager(
var hadAliveLocations = false var hadAliveLocations = false
for (loc <- tasks(index).preferredLocations) { for (loc <- tasks(index).preferredLocations) {
for (execId <- loc.executorId) { for (execId <- loc.executorId) {
if (sched.isExecutorAlive(execId)) {
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
hadAliveLocations = true
}
} }
if (sched.hasExecutorsAliveOnHost(loc.host)) { if (sched.hasExecutorsAliveOnHost(loc.host)) {
hadAliveLocations = true
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) { for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
}
hadAliveLocations = true hadAliveLocations = true
} }
} }
@ -725,10 +723,12 @@ private[spark] class TaskSetManager(
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY} import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality] val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) { if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL levels += PROCESS_LOCAL
} }
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) { if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL levels += NODE_LOCAL
} }
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) { if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
@ -738,4 +738,21 @@ private[spark] class TaskSetManager(
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", ")) logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray levels.toArray
} }
// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
def executorAdded() {
def newLocAvail(index: Int): Boolean = {
for (loc <- tasks(index).preferredLocations) {
if (sched.hasExecutorsAliveOnHost(loc.host) ||
sched.getRackForHost(loc.host).isDefined) {
return true
}
}
false
}
logInfo("Re-computing pending task lists.")
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
}
} }

View file

@ -77,6 +77,10 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
override def isExecutorAlive(execId: String): Boolean = executors.contains(execId) override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host) override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
def addExecutor(execId: String, host: String) {
executors.put(execId, host)
}
} }
class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
@ -400,6 +404,36 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.taskSetsFailed.contains(taskSet.id)) assert(sched.taskSetsFailed.contains(taskSet.id))
} }
test("new executors get added") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// All tasks added to no-pref list since no preferred location is available
assert(manager.pendingTasksWithNoPrefs.size === 4)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
// Add a new executor
sched.addExecutor("execD", "host1")
manager.executorAdded()
// Task 0 and 1 should be removed from no-pref list
assert(manager.pendingTasksWithNoPrefs.size === 2)
// Valid locality should contain NODE_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
// Add another executor
sched.addExecutor("execC", "host2")
manager.executorAdded()
// No-pref list now only contains task 3
assert(manager.pendingTasksWithNoPrefs.size === 1)
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
}
def createTaskResult(id: Int): DirectTaskResult[Int] = { def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance() val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)