a) Add support for hyper local scheduling - specific to a host + port - before trying host local scheduling.
b) Add some fixes to test code to ensure it passes (and fixes some other issues). c) Fix bug in task scheduling which incorrectly used availableCores instead of all cores on the node.
This commit is contained in:
parent
60cabb35cb
commit
d960e7e0f8
|
@ -29,7 +29,11 @@ class SparkEnv (
|
|||
val blockManager: BlockManager,
|
||||
val connectionManager: ConnectionManager,
|
||||
val httpFileServer: HttpFileServer,
|
||||
val sparkFilesDir: String
|
||||
val sparkFilesDir: String,
|
||||
// To be set only as part of initialization of SparkContext.
|
||||
// (executorId, defaultHostPort) => executorHostPort
|
||||
// If executorId is NOT found, return defaultHostPort
|
||||
var executorIdToHostPort: (String, String) => String
|
||||
) {
|
||||
|
||||
def stop() {
|
||||
|
@ -44,6 +48,17 @@ class SparkEnv (
|
|||
// down, but let's call it anyway in case it gets fixed in a later release
|
||||
actorSystem.awaitTermination()
|
||||
}
|
||||
|
||||
|
||||
def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = {
|
||||
val env = SparkEnv.get
|
||||
if (env.executorIdToHostPort == null) {
|
||||
// default to using host, not host port. Relevant to non cluster modes.
|
||||
return defaultHostPort
|
||||
}
|
||||
|
||||
env.executorIdToHostPort(executorId, defaultHostPort)
|
||||
}
|
||||
}
|
||||
|
||||
object SparkEnv extends Logging {
|
||||
|
@ -162,7 +177,7 @@ object SparkEnv extends Logging {
|
|||
blockManager,
|
||||
connectionManager,
|
||||
httpFileServer,
|
||||
sparkFilesDir)
|
||||
sparkFilesDir,
|
||||
null)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -357,15 +357,6 @@ private object Utils extends Logging {
|
|||
Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Once testing is complete in various modes, replace with this ?
|
||||
def checkHost(host: String, message: String = "") {}
|
||||
def checkHostPort(hostPort: String, message: String = "") {}
|
||||
|
||||
def getUserNameFromEnvironment(): String = {
|
||||
SparkHadoopUtil.getUserNameFromEnvironment
|
||||
}
|
||||
|
||||
// Used by DEBUG code : remove when all testing done
|
||||
def logErrorWithStack(msg: String) {
|
||||
|
@ -373,6 +364,20 @@ private object Utils extends Logging {
|
|||
// temp code for debug
|
||||
System.exit(-1)
|
||||
}
|
||||
*/
|
||||
|
||||
// Once testing is complete in various modes, replace with this ?
|
||||
def checkHost(host: String, message: String = "") {}
|
||||
def checkHostPort(hostPort: String, message: String = "") {}
|
||||
|
||||
// Used by DEBUG code : remove when all testing done
|
||||
def logErrorWithStack(msg: String) {
|
||||
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
|
||||
}
|
||||
|
||||
def getUserNameFromEnvironment(): String = {
|
||||
SparkHadoopUtil.getUserNameFromEnvironment
|
||||
}
|
||||
|
||||
// Typically, this will be of order of number of nodes in cluster
|
||||
// If not, we should change it to LRUCache or something.
|
||||
|
|
|
@ -54,7 +54,10 @@ private[spark] class Worker(
|
|||
def createWorkDir() {
|
||||
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
|
||||
try {
|
||||
if ( (workDir.exists() && !workDir.isDirectory) || (!workDir.exists() && !workDir.mkdirs()) ) {
|
||||
// This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
|
||||
// So attempting to create and then check if directory was created or not.
|
||||
workDir.mkdirs()
|
||||
if ( !workDir.exists() || !workDir.isDirectory) {
|
||||
logError("Failed to create work directory " + workDir)
|
||||
System.exit(1)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package spark.rdd
|
||||
|
||||
import scala.collection.mutable.HashMap
|
||||
import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
|
||||
import spark.storage.BlockManager
|
||||
|
||||
private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
|
||||
val index = idx
|
||||
|
@ -11,12 +11,7 @@ private[spark]
|
|||
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
|
||||
extends RDD[T](sc, Nil) {
|
||||
|
||||
@transient lazy val locations_ = {
|
||||
val blockManager = SparkEnv.get.blockManager
|
||||
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
|
||||
val locations = blockManager.getLocations(blockIds)
|
||||
HashMap(blockIds.zip(locations):_*)
|
||||
}
|
||||
@transient lazy val locations_ = BlockManager.blockIdsToExecutorLocations(blockIds, SparkEnv.get)
|
||||
|
||||
override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
|
||||
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
|
||||
|
|
|
@ -49,6 +49,8 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
|
|||
|
||||
override def getPreferredLocations(s: Partition): Seq[String] = {
|
||||
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
|
||||
// TODO: becomes complicated - intersect on hostPort if available, else fallback to host (removing intersected hostPort's).
|
||||
// Since I am not very sure about this RDD, leaving it to others to comment better !
|
||||
rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import spark.executor.TaskMetrics
|
|||
import spark.partial.ApproximateActionListener
|
||||
import spark.partial.ApproximateEvaluator
|
||||
import spark.partial.PartialResult
|
||||
import spark.storage.BlockManagerMaster
|
||||
import spark.storage.{BlockManager, BlockManagerMaster}
|
||||
import spark.util.{MetadataCleaner, TimeStampedHashMap}
|
||||
|
||||
/**
|
||||
|
@ -117,9 +117,8 @@ class DAGScheduler(
|
|||
private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
|
||||
if (!cacheLocs.contains(rdd.id)) {
|
||||
val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray
|
||||
cacheLocs(rdd.id) = blockManagerMaster.getLocations(blockIds).map {
|
||||
locations => locations.map(_.hostPort).toList
|
||||
}.toArray
|
||||
val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env)
|
||||
cacheLocs(rdd.id) = blockIds.map(locs.getOrElse(_, Nil))
|
||||
}
|
||||
cacheLocs(rdd.id)
|
||||
}
|
||||
|
|
|
@ -71,11 +71,11 @@ private[spark] class ResultTask[T, U](
|
|||
}
|
||||
|
||||
// data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts.
|
||||
val preferredLocs: Seq[String] = if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq
|
||||
private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
|
||||
|
||||
{
|
||||
// DEBUG code
|
||||
preferredLocs.foreach (host => Utils.checkHost(host, "preferredLocs : " + preferredLocs))
|
||||
preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
|
||||
}
|
||||
|
||||
override def run(attemptId: Long): U = {
|
||||
|
|
|
@ -85,11 +85,11 @@ private[spark] class ShuffleMapTask(
|
|||
protected def this() = this(0, null, null, 0, null)
|
||||
|
||||
// data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts.
|
||||
private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq
|
||||
private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
|
||||
|
||||
{
|
||||
// DEBUG code
|
||||
preferredLocs.foreach (host => Utils.checkHost(host, "preferredLocs : " + preferredLocs))
|
||||
preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
|
||||
}
|
||||
|
||||
var split = if (rdd == null) {
|
||||
|
|
|
@ -79,9 +79,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
|
||||
// The set of executors we have on each host; this is used to compute hostsAlive, which
|
||||
// in turn is used to decide when we can attain data locality on a given host
|
||||
val executorsByHostPort = new HashMap[String, HashSet[String]]
|
||||
private val executorsByHostPort = new HashMap[String, HashSet[String]]
|
||||
|
||||
val executorIdToHostPort = new HashMap[String, String]
|
||||
private val executorIdToHostPort = new HashMap[String, String]
|
||||
|
||||
// JAR server, if any JARs were added by the user to the SparkContext
|
||||
var jarServer: HttpServer = null
|
||||
|
@ -102,6 +102,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
|
||||
def initialize(context: SchedulerBackend) {
|
||||
backend = context
|
||||
// resolve executorId to hostPort mapping.
|
||||
def executorToHostPort(executorId: String, defaultHostPort: String): String = {
|
||||
executorIdToHostPort.getOrElse(executorId, defaultHostPort)
|
||||
}
|
||||
|
||||
// Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler
|
||||
// Will that be a design violation ?
|
||||
SparkEnv.get.executorIdToHostPort = executorToHostPort
|
||||
}
|
||||
|
||||
def newTaskId(): Long = nextTaskId.getAndIncrement()
|
||||
|
@ -209,13 +217,30 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
}
|
||||
// Build a list of tasks to assign to each slave
|
||||
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
|
||||
// merge availableCpus into hostToAvailableCpus block ?
|
||||
val availableCpus = offers.map(o => o.cores).toArray
|
||||
val hostToAvailableCpus = {
|
||||
val map = new HashMap[String, Int]()
|
||||
for (offer <- offers) {
|
||||
val hostPort = offer.hostPort
|
||||
val cores = offer.cores
|
||||
// DEBUG code
|
||||
Utils.checkHostPort(hostPort)
|
||||
|
||||
val host = Utils.parseHostPort(hostPort)._1
|
||||
|
||||
map.put(host, map.getOrElse(host, 0) + cores)
|
||||
}
|
||||
|
||||
map
|
||||
}
|
||||
var launchedTask = false
|
||||
|
||||
|
||||
for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
|
||||
|
||||
// Split offers based on host local, rack local and off-rack tasks.
|
||||
val hyperLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
|
||||
val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
|
||||
val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
|
||||
val otherOffers = new HashMap[String, ArrayBuffer[Int]]()
|
||||
|
@ -224,8 +249,17 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
val hostPort = offers(i).hostPort
|
||||
// DEBUG code
|
||||
Utils.checkHostPort(hostPort)
|
||||
|
||||
val numHyperLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
|
||||
if (numHyperLocalTasks > 0){
|
||||
val list = hyperLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
|
||||
for (j <- 0 until numHyperLocalTasks) list += i
|
||||
}
|
||||
|
||||
val host = Utils.parseHostPort(hostPort)._1
|
||||
val numHostLocalTasks = math.max(0, math.min(manager.numPendingTasksForHost(hostPort), availableCpus(i)))
|
||||
val numHostLocalTasks = math.max(0,
|
||||
// Remove hyper local tasks (which are also host local btw !) from this
|
||||
math.min(manager.numPendingTasksForHost(hostPort) - numHyperLocalTasks, hostToAvailableCpus(host)))
|
||||
if (numHostLocalTasks > 0){
|
||||
val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
|
||||
for (j <- 0 until numHostLocalTasks) list += i
|
||||
|
@ -233,7 +267,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
|
||||
val numRackLocalTasks = math.max(0,
|
||||
// Remove host local tasks (which are also rack local btw !) from this
|
||||
math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHostLocalTasks, availableCpus(i)))
|
||||
math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHyperLocalTasks - numHostLocalTasks, hostToAvailableCpus(host)))
|
||||
if (numRackLocalTasks > 0){
|
||||
val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
|
||||
for (j <- 0 until numRackLocalTasks) list += i
|
||||
|
@ -246,12 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
}
|
||||
|
||||
val offersPriorityList = new ArrayBuffer[Int](
|
||||
hostLocalOffers.size + rackLocalOffers.size + otherOffers.size)
|
||||
// First host local, then rack, then others
|
||||
hyperLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size)
|
||||
|
||||
// First hyper local, then host local, then rack, then others
|
||||
|
||||
// numHostLocalOffers contains count of both hyper local and host offers.
|
||||
val numHostLocalOffers = {
|
||||
val hyperLocalPriorityList = ClusterScheduler.prioritizeContainers(hyperLocalOffers)
|
||||
offersPriorityList ++= hyperLocalPriorityList
|
||||
|
||||
val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers)
|
||||
offersPriorityList ++= hostLocalPriorityList
|
||||
hostLocalPriorityList.size
|
||||
|
||||
hyperLocalPriorityList.size + hostLocalPriorityList.size
|
||||
}
|
||||
val numRackLocalOffers = {
|
||||
val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers)
|
||||
|
@ -477,6 +518,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
}
|
||||
|
||||
def getExecutorsAliveOnHost(host: String): Option[Set[String]] = {
|
||||
Utils.checkHost(host)
|
||||
|
||||
val retval = hostToAliveHostPorts.get(host)
|
||||
if (retval.isDefined) {
|
||||
return Some(retval.get.toSet)
|
||||
|
@ -485,6 +528,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
None
|
||||
}
|
||||
|
||||
def isExecutorAliveOnHostPort(hostPort: String): Boolean = {
|
||||
// Even if hostPort is a host, it does not matter - it is just a specific check.
|
||||
// But we do have to ensure that only hostPort get into hostPortsAlive !
|
||||
// So no check against Utils.checkHostPort
|
||||
hostPortsAlive.contains(hostPort)
|
||||
}
|
||||
|
||||
// By default, rack is unknown
|
||||
def getRackForHost(value: String): Option[String] = None
|
||||
|
||||
|
|
|
@ -13,14 +13,18 @@ import spark.scheduler._
|
|||
import spark.TaskState.TaskState
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging {
|
||||
private[spark] object TaskLocality extends Enumeration("HYPER_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging {
|
||||
|
||||
val HOST_LOCAL, RACK_LOCAL, ANY = Value
|
||||
// hyper local is expected to be used ONLY within tasksetmanager for now.
|
||||
val HYPER_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value
|
||||
|
||||
type TaskLocality = Value
|
||||
|
||||
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
|
||||
|
||||
// Must not be the constraint.
|
||||
assert (constraint != TaskLocality.HYPER_LOCAL)
|
||||
|
||||
constraint match {
|
||||
case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL
|
||||
case TaskLocality.RACK_LOCAL => condition == TaskLocality.HOST_LOCAL || condition == TaskLocality.RACK_LOCAL
|
||||
|
@ -32,7 +36,11 @@ private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL
|
|||
def parse(str: String): TaskLocality = {
|
||||
// better way to do this ?
|
||||
try {
|
||||
TaskLocality.withName(str)
|
||||
val retval = TaskLocality.withName(str)
|
||||
// Must not specify HYPER_LOCAL !
|
||||
assert (retval != TaskLocality.HYPER_LOCAL)
|
||||
|
||||
retval
|
||||
} catch {
|
||||
case nEx: NoSuchElementException => {
|
||||
logWarning("Invalid task locality specified '" + str + "', defaulting to HOST_LOCAL");
|
||||
|
@ -133,35 +141,55 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
addPendingTask(i)
|
||||
}
|
||||
|
||||
private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, rackLocal: Boolean = false): ArrayBuffer[String] = {
|
||||
// DEBUG code
|
||||
_taskPreferredLocations.foreach(h => Utils.checkHost(h, "taskPreferredLocation " + _taskPreferredLocations))
|
||||
// Note that it follows the hierarchy.
|
||||
// if we search for HOST_LOCAL, the output will include HYPER_LOCAL and
|
||||
// if we search for RACK_LOCAL, it will include HYPER_LOCAL & HOST_LOCAL
|
||||
private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
|
||||
taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
|
||||
|
||||
val taskPreferredLocations = if (! rackLocal) _taskPreferredLocations else {
|
||||
// Expand set to include all 'seen' rack local hosts.
|
||||
// This works since container allocation/management happens within master - so any rack locality information is updated in msater.
|
||||
// Best case effort, and maybe sort of kludge for now ... rework it later ?
|
||||
val hosts = new HashSet[String]
|
||||
_taskPreferredLocations.foreach(h => {
|
||||
val rackOpt = scheduler.getRackForHost(h)
|
||||
if (rackOpt.isDefined) {
|
||||
val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
|
||||
if (hostsOpt.isDefined) {
|
||||
hosts ++= hostsOpt.get
|
||||
if (TaskLocality.HYPER_LOCAL == taskLocality) {
|
||||
// straight forward comparison ! Special case it.
|
||||
val retval = new HashSet[String]()
|
||||
scheduler.synchronized {
|
||||
for (location <- _taskPreferredLocations) {
|
||||
if (scheduler.isExecutorAliveOnHostPort(location)) {
|
||||
retval += location
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that irrespective of what scheduler says, host is always added !
|
||||
hosts += h
|
||||
})
|
||||
|
||||
hosts
|
||||
return retval
|
||||
}
|
||||
|
||||
val retval = new ArrayBuffer[String]
|
||||
val taskPreferredLocations =
|
||||
if (TaskLocality.HOST_LOCAL == taskLocality) {
|
||||
_taskPreferredLocations
|
||||
} else {
|
||||
assert (TaskLocality.RACK_LOCAL == taskLocality)
|
||||
// Expand set to include all 'seen' rack local hosts.
|
||||
// This works since container allocation/management happens within master - so any rack locality information is updated in msater.
|
||||
// Best case effort, and maybe sort of kludge for now ... rework it later ?
|
||||
val hosts = new HashSet[String]
|
||||
_taskPreferredLocations.foreach(h => {
|
||||
val rackOpt = scheduler.getRackForHost(h)
|
||||
if (rackOpt.isDefined) {
|
||||
val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
|
||||
if (hostsOpt.isDefined) {
|
||||
hosts ++= hostsOpt.get
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that irrespective of what scheduler says, host is always added !
|
||||
hosts += h
|
||||
})
|
||||
|
||||
hosts
|
||||
}
|
||||
|
||||
val retval = new HashSet[String]
|
||||
scheduler.synchronized {
|
||||
for (prefLocation <- taskPreferredLocations) {
|
||||
val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(prefLocation)
|
||||
val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1)
|
||||
if (aliveLocationsOpt.isDefined) {
|
||||
retval ++= aliveLocationsOpt.get
|
||||
}
|
||||
|
@ -175,29 +203,37 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
private def addPendingTask(index: Int) {
|
||||
// We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
|
||||
// hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
|
||||
val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched)
|
||||
val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, true)
|
||||
val hyperLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HYPER_LOCAL)
|
||||
val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL)
|
||||
val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
|
||||
|
||||
if (rackLocalLocations.size == 0) {
|
||||
// Current impl ensures this.
|
||||
assert (hyperLocalLocations.size == 0)
|
||||
assert (hostLocalLocations.size == 0)
|
||||
pendingTasksWithNoPrefs += index
|
||||
} else {
|
||||
|
||||
// host locality
|
||||
for (hostPort <- hostLocalLocations) {
|
||||
// hyper local locality
|
||||
for (hostPort <- hyperLocalLocations) {
|
||||
// DEBUG Code
|
||||
Utils.checkHostPort(hostPort)
|
||||
|
||||
val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer())
|
||||
hostPortList += index
|
||||
}
|
||||
|
||||
// host locality (includes hyper local)
|
||||
for (hostPort <- hostLocalLocations) {
|
||||
// DEBUG Code
|
||||
Utils.checkHostPort(hostPort)
|
||||
|
||||
val host = Utils.parseHostPort(hostPort)._1
|
||||
val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
|
||||
hostList += index
|
||||
}
|
||||
|
||||
// rack locality
|
||||
// rack locality (includes hyper local and host local)
|
||||
for (rackLocalHostPort <- rackLocalLocations) {
|
||||
// DEBUG Code
|
||||
Utils.checkHostPort(rackLocalHostPort)
|
||||
|
@ -233,6 +269,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
|
||||
}
|
||||
|
||||
// Number of pending tasks for a given host Port (which would be hyper local)
|
||||
def numPendingTasksForHostPort(hostPort: String): Int = {
|
||||
getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
|
||||
}
|
||||
|
||||
// Number of pending tasks for a given host (which would be data local)
|
||||
def numPendingTasksForHost(hostPort: String): Int = {
|
||||
getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
|
||||
|
@ -270,7 +311,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
if (speculatableTasks.size > 0) {
|
||||
val localTask = speculatableTasks.find {
|
||||
index =>
|
||||
val locations = findPreferredLocations(tasks(index).preferredLocations, sched)
|
||||
val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL)
|
||||
val attemptLocs = taskAttempts(index).map(_.hostPort)
|
||||
(locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
|
||||
}
|
||||
|
@ -284,7 +325,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
|
||||
val rackTask = speculatableTasks.find {
|
||||
index =>
|
||||
val locations = findPreferredLocations(tasks(index).preferredLocations, sched, true)
|
||||
val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
|
||||
val attemptLocs = taskAttempts(index).map(_.hostPort)
|
||||
locations.contains(hostPort) && !attemptLocs.contains(hostPort)
|
||||
}
|
||||
|
@ -311,6 +352,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
// Dequeue a pending task for a given node and return its index.
|
||||
// If localOnly is set to false, allow non-local tasks as well.
|
||||
private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
|
||||
val hyperLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
|
||||
if (hyperLocalTask != None) {
|
||||
return hyperLocalTask
|
||||
}
|
||||
|
||||
val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
|
||||
if (localTask != None) {
|
||||
return localTask
|
||||
|
@ -341,30 +387,31 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
return findSpeculativeTask(hostPort, locality)
|
||||
}
|
||||
|
||||
// Does a host count as a preferred location for a task? This is true if
|
||||
// either the task has preferred locations and this host is one, or it has
|
||||
// no preferred locations (in which we still count the launch as preferred).
|
||||
private def isPreferredLocation(task: Task[_], hostPort: String): Boolean = {
|
||||
val locs = task.preferredLocations
|
||||
// DEBUG code
|
||||
locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs))
|
||||
private def isHyperLocalLocation(task: Task[_], hostPort: String): Boolean = {
|
||||
Utils.checkHostPort(hostPort)
|
||||
|
||||
if (locs.contains(hostPort) || locs.isEmpty) return true
|
||||
val locs = task.preferredLocations
|
||||
|
||||
locs.contains(hostPort)
|
||||
}
|
||||
|
||||
private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = {
|
||||
val locs = task.preferredLocations
|
||||
|
||||
// If no preference, consider it as host local
|
||||
if (locs.isEmpty) return true
|
||||
|
||||
val host = Utils.parseHostPort(hostPort)._1
|
||||
locs.contains(host)
|
||||
locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
|
||||
}
|
||||
|
||||
// Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location).
|
||||
// This is true if either the task has preferred locations and this host is one, or it has
|
||||
// no preferred locations (in which we still count the launch as preferred).
|
||||
def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
|
||||
private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
|
||||
|
||||
val locs = task.preferredLocations
|
||||
|
||||
// DEBUG code
|
||||
locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs))
|
||||
|
||||
val preferredRacks = new HashSet[String]()
|
||||
for (preferredHost <- locs) {
|
||||
val rack = sched.getRackForHost(preferredHost)
|
||||
|
@ -395,8 +442,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
val task = tasks(index)
|
||||
val taskId = sched.newTaskId()
|
||||
// Figure out whether this should count as a preferred launch
|
||||
val taskLocality = if (isPreferredLocation(task, hostPort)) TaskLocality.HOST_LOCAL else
|
||||
if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else TaskLocality.ANY
|
||||
val taskLocality =
|
||||
if (isHyperLocalLocation(task, hostPort)) TaskLocality.HYPER_LOCAL else
|
||||
if (isHostLocalLocation(task, hostPort)) TaskLocality.HOST_LOCAL else
|
||||
if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else
|
||||
TaskLocality.ANY
|
||||
val prefStr = taskLocality.toString
|
||||
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
|
||||
taskSet.id, index, taskId, execId, hostPort, prefStr))
|
||||
|
@ -552,15 +602,22 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
|||
|
||||
def executorLost(execId: String, hostPort: String) {
|
||||
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
|
||||
|
||||
// If some task has preferred locations only on hostname, and there are no more executors there,
|
||||
// put it in the no-prefs list to avoid the wait from delay scheduling
|
||||
for (index <- getPendingTasksForHostPort(hostPort)) {
|
||||
val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, true)
|
||||
|
||||
// host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
|
||||
// no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
|
||||
// Note: NOT checking hyper local list - since host local list is super set of that. We need to ad to no prefs only if
|
||||
// there is no host local node for the task (not if there is no hyper local node for the task)
|
||||
for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
|
||||
// val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
|
||||
val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL)
|
||||
if (newLocs.isEmpty) {
|
||||
assert (findPreferredLocations(tasks(index).preferredLocations, sched).isEmpty)
|
||||
pendingTasksWithNoPrefs += index
|
||||
}
|
||||
}
|
||||
|
||||
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
|
||||
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
|
||||
for ((tid, info) <- taskInfos if info.executorId == execId) {
|
||||
|
|
|
@ -4,7 +4,7 @@ import java.io.{InputStream, OutputStream}
|
|||
import java.nio.{ByteBuffer, MappedByteBuffer}
|
||||
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
|
||||
import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet, Queue}
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
import akka.actor.{ActorSystem, Cancellable, Props}
|
||||
|
@ -271,23 +271,12 @@ class BlockManager(
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get locations of the block.
|
||||
*/
|
||||
def getLocations(blockId: String): Seq[String] = {
|
||||
val startTimeMs = System.currentTimeMillis
|
||||
var managers = master.getLocations(blockId)
|
||||
val locations = managers.map(_.hostPort)
|
||||
logDebug("Got block locations in " + Utils.getUsedTimeMs(startTimeMs))
|
||||
return locations
|
||||
}
|
||||
|
||||
/**
|
||||
* Get locations of an array of blocks.
|
||||
*/
|
||||
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
|
||||
def getLocationBlockIds(blockIds: Array[String]): Array[Seq[BlockManagerId]] = {
|
||||
val startTimeMs = System.currentTimeMillis
|
||||
val locations = master.getLocations(blockIds).map(_.map(_.hostPort).toSeq).toArray
|
||||
val locations = master.getLocations(blockIds).toArray
|
||||
logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
|
||||
return locations
|
||||
}
|
||||
|
@ -947,6 +936,32 @@ object BlockManager extends Logging {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv): HashMap[String, List[String]] = {
|
||||
val blockManager = env.blockManager
|
||||
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
|
||||
val locationBlockIds = blockManager.getLocationBlockIds(blockIds)
|
||||
|
||||
// Convert from block master locations to executor locations (we need that for task scheduling)
|
||||
val executorLocations = new HashMap[String, List[String]]()
|
||||
for (i <- 0 until blockIds.length) {
|
||||
val blockId = blockIds(i)
|
||||
val blockLocations = locationBlockIds(i)
|
||||
|
||||
val executors = new HashSet[String]()
|
||||
|
||||
for (bkLocation <- blockLocations) {
|
||||
val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host)
|
||||
executors += executorHostPort
|
||||
// logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
|
||||
}
|
||||
|
||||
executorLocations.put(blockId, executors.toSeq.toList)
|
||||
}
|
||||
|
||||
executorLocations
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class BlockFetcherIterator(
|
||||
|
|
|
@ -80,12 +80,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
|
||||
test("remote fetch") {
|
||||
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0)
|
||||
val hostname = "localhost"
|
||||
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0)
|
||||
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
|
||||
val masterTracker = new MapOutputTracker()
|
||||
masterTracker.trackerActor = actorSystem.actorOf(
|
||||
Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker")
|
||||
|
||||
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", "localhost", 0)
|
||||
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
|
||||
val slaveTracker = new MapOutputTracker()
|
||||
slaveTracker.trackerActor = slaveSystem.actorFor(
|
||||
"akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
|
||||
|
|
|
@ -385,12 +385,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
assert(results === Map(0 -> 42))
|
||||
}
|
||||
|
||||
/** Assert that the supplied TaskSet has exactly the given preferredLocations. */
|
||||
/** Assert that the supplied TaskSet has exactly the given preferredLocations. Note, converts taskSet's locations to host only. */
|
||||
private def assertLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) {
|
||||
assert(locations.size === taskSet.tasks.size)
|
||||
for ((expectLocs, taskLocs) <-
|
||||
taskSet.tasks.map(_.preferredLocations).zip(locations)) {
|
||||
assert(expectLocs === taskLocs)
|
||||
assert(expectLocs.map(loc => spark.Utils.parseHostPort(loc)._1) === taskLocs)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
|||
oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
|
||||
val initialize = PrivateMethod[Unit]('initialize)
|
||||
SizeEstimator invokePrivate initialize()
|
||||
// Set some value ...
|
||||
System.setProperty("spark.hostPort", spark.Utils.localHostName() + ":" + 1111)
|
||||
}
|
||||
|
||||
after {
|
||||
|
|
Loading…
Reference in a new issue