Merge branch 'master' of github.com:mesos/spark

This commit is contained in:
Matei Zaharia 2013-05-13 17:40:21 -07:00
commit b9aef263df
27 changed files with 439 additions and 207 deletions

View file

@ -148,22 +148,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.getMethod("main", classOf[Array[String]]) .getMethod("main", classOf[Array[String]])
val t = new Thread { val t = new Thread {
override def run() { override def run() {
var mainArgs: Array[String] = null // Copy
var startIndex = 0 var mainArgs: Array[String] = new Array[String](args.userArgs.size())
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
// I am sure there is a better 'scala' way to do this .... but I am just trying to get things to work right now !
if (args.userArgs.isEmpty || args.userArgs.get(0) != "yarn-standalone") {
// ensure that first param is ALWAYS "yarn-standalone"
mainArgs = new Array[String](args.userArgs.size() + 1)
mainArgs.update(0, "yarn-standalone")
startIndex = 1
}
else {
mainArgs = new Array[String](args.userArgs.size())
}
args.userArgs.copyToArray(mainArgs, startIndex, args.userArgs.size())
mainMethod.invoke(null, mainArgs) mainMethod.invoke(null, mainArgs)
} }
} }

View file

@ -69,7 +69,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
" --class CLASS_NAME Name of your application's main class (required)\n" + " --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" + " --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" + " Mutliple invocations are possible, each will be passed in order.\n" +
" Note that first argument will ALWAYS be yarn-standalone : will be added if missing.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" + " --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1)\n" + " --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n") " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")

View file

@ -92,7 +92,6 @@ class ClientArguments(val args: Array[String]) {
" --class CLASS_NAME Name of your application's main class (required)\n" + " --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" + " --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" + " Mutliple invocations are possible, each will be passed in order.\n" +
" Note that first argument will ALWAYS be yarn-standalone : will be added if missing.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" + " --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +

View file

@ -489,7 +489,7 @@ abstract class RDD[T: ClassManifest](
*/ */
def foreachPartition(f: Iterator[T] => Unit) { def foreachPartition(f: Iterator[T] => Unit) {
val cleanF = sc.clean(f) val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => f(iter)) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
} }
/** /**

View file

@ -31,8 +31,11 @@ class SparkEnv (
val blockManager: BlockManager, val blockManager: BlockManager,
val connectionManager: ConnectionManager, val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer, val httpFileServer: HttpFileServer,
val sparkFilesDir: String val sparkFilesDir: String,
) { // To be set only as part of initialization of SparkContext.
// (executorId, defaultHostPort) => executorHostPort
// If executorId is NOT found, return defaultHostPort
var executorIdToHostPort: Option[(String, String) => String]) {
def stop() { def stop() {
httpFileServer.stop() httpFileServer.stop()
@ -46,6 +49,17 @@ class SparkEnv (
// down, but let's call it anyway in case it gets fixed in a later release // down, but let's call it anyway in case it gets fixed in a later release
actorSystem.awaitTermination() actorSystem.awaitTermination()
} }
def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = {
val env = SparkEnv.get
if (env.executorIdToHostPort.isEmpty) {
// default to using host, not host port. Relevant to non cluster modes.
return defaultHostPort
}
env.executorIdToHostPort.get(executorId, defaultHostPort)
}
} }
object SparkEnv extends Logging { object SparkEnv extends Logging {
@ -168,7 +182,7 @@ object SparkEnv extends Logging {
blockManager, blockManager,
connectionManager, connectionManager,
httpFileServer, httpFileServer,
sparkFilesDir) sparkFilesDir,
None)
} }
} }

View file

@ -14,9 +14,17 @@ private[spark] case object Success extends TaskEndReason
private[spark] private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
private[spark] private[spark] case class FetchFailed(
case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends TaskEndReason
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason private[spark] case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement])
extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason

View file

@ -335,7 +335,7 @@ private object Utils extends Logging {
retval retval
} }
/* /*
// Used by DEBUG code : remove when all testing done // Used by DEBUG code : remove when all testing done
private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$") private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$")
def checkHost(host: String, message: String = "") { def checkHost(host: String, message: String = "") {
@ -357,15 +357,6 @@ private object Utils extends Logging {
Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message) Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message)
} }
} }
*/
// Once testing is complete in various modes, replace with this ?
def checkHost(host: String, message: String = "") {}
def checkHostPort(hostPort: String, message: String = "") {}
def getUserNameFromEnvironment(): String = {
SparkHadoopUtil.getUserNameFromEnvironment
}
// Used by DEBUG code : remove when all testing done // Used by DEBUG code : remove when all testing done
def logErrorWithStack(msg: String) { def logErrorWithStack(msg: String) {
@ -373,6 +364,20 @@ private object Utils extends Logging {
// temp code for debug // temp code for debug
System.exit(-1) System.exit(-1)
} }
*/
// Once testing is complete in various modes, replace with this ?
def checkHost(host: String, message: String = "") {}
def checkHostPort(hostPort: String, message: String = "") {}
// Used by DEBUG code : remove when all testing done
def logErrorWithStack(msg: String) {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
}
def getUserNameFromEnvironment(): String = {
SparkHadoopUtil.getUserNameFromEnvironment
}
// Typically, this will be of order of number of nodes in cluster // Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something. // If not, we should change it to LRUCache or something.

View file

@ -54,7 +54,10 @@ private[spark] class Worker(
def createWorkDir() { def createWorkDir() {
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try { try {
if ( (workDir.exists() && !workDir.isDirectory) || (!workDir.exists() && !workDir.mkdirs()) ) { // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
// So attempting to create and then check if directory was created or not.
workDir.mkdirs()
if ( !workDir.exists() || !workDir.isDirectory) {
logError("Failed to create work directory " + workDir) logError("Failed to create work directory " + workDir)
System.exit(1) System.exit(1)
} }

View file

@ -122,7 +122,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
} }
case t: Throwable => { case t: Throwable => {
val reason = ExceptionFailure(t) val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may // TODO: Should we exit the whole executor here? On the one hand, the failed task may

View file

@ -1,7 +1,7 @@
package spark.rdd package spark.rdd
import scala.collection.mutable.HashMap
import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext} import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
import spark.storage.BlockManager
private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition { private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
val index = idx val index = idx
@ -11,12 +11,7 @@ private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String]) class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) { extends RDD[T](sc, Nil) {
@transient lazy val locations_ = { @transient lazy val locations_ = BlockManager.blockIdsToExecutorLocations(blockIds, SparkEnv.get)
val blockManager = SparkEnv.get.blockManager
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
val locations = blockManager.getLocations(blockIds)
HashMap(blockIds.zip(locations):_*)
}
override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => { override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]

View file

@ -1,6 +1,6 @@
package spark.rdd package spark.rdd
import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException} import java.io.{ObjectOutputStream, IOException}
private[spark] class ZippedPartitionsPartition( private[spark] class ZippedPartitionsPartition(
@ -38,9 +38,31 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
} }
override def getPreferredLocations(s: Partition): Seq[String] = { override def getPreferredLocations(s: Partition): Seq[String] = {
// Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below
// become diminishingly small : so we might need to look at alternate strategies to alleviate this.
// If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the
// cluster - paying with n/w and cache cost.
// Maybe pick a node which figures max amount of time ?
// Choose node which is hosting 'larger' of some subset of blocks ?
// Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions
val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) val rddSplitZip = rdds.zip(splits)
preferredLocations.reduce((x, y) => x.intersect(y))
// exact match.
val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2))
val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y))
// Remove exact match and then do host local match.
val otherNodePreferredLocations = rddSplitZip.map(x => {
x._1.preferredLocations(x._2).map(hostPort => {
val host = Utils.parseHostPort(hostPort)._1
if (exactMatchLocations.contains(host)) null else host
}).filter(_ != null)
})
val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y))
otherNodeLocalLocations ++ exactMatchLocations
} }
override def clearDependencies() { override def clearDependencies() {

View file

@ -1,6 +1,6 @@
package spark.rdd package spark.rdd
import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException} import java.io.{ObjectOutputStream, IOException}
@ -48,8 +48,27 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
} }
override def getPreferredLocations(s: Partition): Seq[String] = { override def getPreferredLocations(s: Partition): Seq[String] = {
// Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need
// to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we
// will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost.
// Maybe pick one or the other ? (so that atleast one block is local ?).
// Choose node which is hosting 'larger' of the blocks ?
// Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2)) val pref1 = rdd1.preferredLocations(partition1)
val pref2 = rdd2.preferredLocations(partition2)
// exact match - instance local and host local.
val exactMatchLocations = pref1.intersect(pref2)
// remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local.
val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2)
// Can have mix of instance local (hostPort) and node local (host) locations as preference !
exactMatchLocations ++ otherNodeLocalLocations
} }
override def clearDependencies() { override def clearDependencies() {

View file

@ -12,7 +12,7 @@ import spark.executor.TaskMetrics
import spark.partial.ApproximateActionListener import spark.partial.ApproximateActionListener
import spark.partial.ApproximateEvaluator import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult import spark.partial.PartialResult
import spark.storage.BlockManagerMaster import spark.storage.{BlockManager, BlockManagerMaster}
import spark.util.{MetadataCleaner, TimeStampedHashMap} import spark.util.{MetadataCleaner, TimeStampedHashMap}
/** /**
@ -117,9 +117,8 @@ class DAGScheduler(
private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
if (!cacheLocs.contains(rdd.id)) { if (!cacheLocs.contains(rdd.id)) {
val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray
cacheLocs(rdd.id) = blockManagerMaster.getLocations(blockIds).map { val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env, blockManagerMaster)
locations => locations.map(_.hostPort).toList cacheLocs(rdd.id) = blockIds.map(locs.getOrElse(_, Nil))
}.toArray
} }
cacheLocs(rdd.id) cacheLocs(rdd.id)
} }

View file

@ -70,12 +70,11 @@ private[spark] class ResultTask[T, U](
rdd.partitions(partition) rdd.partitions(partition)
} }
// data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts. private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
val preferredLocs: Seq[String] = if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq
{ {
// DEBUG code // DEBUG code
preferredLocs.foreach (host => Utils.checkHost(host, "preferredLocs : " + preferredLocs)) preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
} }
override def run(attemptId: Long): U = { override def run(attemptId: Long): U = {

View file

@ -85,18 +85,11 @@ private[spark] class ShuffleMapTask(
protected def this() = this(0, null, null, 0, null) protected def this() = this(0, null, null, 0, null)
// Data locality is on a per host basis, not hyper specific to container (host:port). @transient private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
// Unique on set of hosts.
// TODO(rxin): The above statement seems problematic. Even if partitions are on the same host,
// the worker would still need to serialize / deserialize those data when they are in
// different jvm processes. Often that is very costly ...
@transient
private val preferredLocs: Seq[String] =
if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq
{ {
// DEBUG code // DEBUG code
preferredLocs.foreach (host => Utils.checkHost(host, "preferredLocs : " + preferredLocs)) preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
} }
var split = if (rdd == null) { var split = if (rdd == null) {

View file

@ -32,28 +32,28 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val TASK_REVIVAL_INTERVAL = System.getProperty("spark.tasks.revive.interval", "0").toLong val TASK_REVIVAL_INTERVAL = System.getProperty("spark.tasks.revive.interval", "0").toLong
/* /*
This property controls how aggressive we should be to modulate waiting for host local task scheduling. This property controls how aggressive we should be to modulate waiting for node local task scheduling.
To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for host locality of tasks before To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for node locality of tasks before
scheduling on other nodes. We have modified this in yarn branch such that offers to task set happen in prioritized order : scheduling on other nodes. We have modified this in yarn branch such that offers to task set happen in prioritized order :
host-local, rack-local and then others node-local, rack-local and then others
But once all available host local (and no pref) tasks are scheduled, instead of waiting for 3 sec before But once all available node local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
scheduling to other nodes (which degrades performance for time sensitive tasks and on larger clusters), we can scheduling to other nodes (which degrades performance for time sensitive tasks and on larger clusters), we can
modulate that : to also allow rack local nodes or any node. The default is still set to HOST - so that previous behavior is modulate that : to also allow rack local nodes or any node. The default is still set to HOST - so that previous behavior is
maintained. This is to allow tuning the tension between pulling rdd data off node and scheduling computation asap. maintained. This is to allow tuning the tension between pulling rdd data off node and scheduling computation asap.
TODO: rename property ? The value is one of TODO: rename property ? The value is one of
- HOST_LOCAL (default, no change w.r.t current behavior), - NODE_LOCAL (default, no change w.r.t current behavior),
- RACK_LOCAL and - RACK_LOCAL and
- ANY - ANY
Note that this property makes more sense when used in conjugation with spark.tasks.revive.interval > 0 : else it is not very effective. Note that this property makes more sense when used in conjugation with spark.tasks.revive.interval > 0 : else it is not very effective.
Additional Note: For non trivial clusters, there is a 4x - 5x reduction in running time (in some of our experiments) based on whether Additional Note: For non trivial clusters, there is a 4x - 5x reduction in running time (in some of our experiments) based on whether
it is left at default HOST_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY. it is left at default NODE_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
If cluster is rack aware, then setting it to RACK_LOCAL gives best tradeoff and a 3x - 4x performance improvement while minimizing IO impact. If cluster is rack aware, then setting it to RACK_LOCAL gives best tradeoff and a 3x - 4x performance improvement while minimizing IO impact.
Also, it brings down the variance in running time drastically. Also, it brings down the variance in running time drastically.
*/ */
val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "HOST_LOCAL")) val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "NODE_LOCAL"))
val activeTaskSets = new HashMap[String, TaskSetManager] val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@ -73,15 +73,15 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val activeExecutorIds = new HashSet[String] val activeExecutorIds = new HashSet[String]
// TODO: We might want to remove this and merge it with execId datastructures - but later. // TODO: We might want to remove this and merge it with execId datastructures - but later.
// Which hosts in the cluster are alive (contains hostPort's) - used for hyper local and local task locality. // Which hosts in the cluster are alive (contains hostPort's) - used for process local and node local task locality.
private val hostPortsAlive = new HashSet[String] private val hostPortsAlive = new HashSet[String]
private val hostToAliveHostPorts = new HashMap[String, HashSet[String]] private val hostToAliveHostPorts = new HashMap[String, HashSet[String]]
// The set of executors we have on each host; this is used to compute hostsAlive, which // The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host // in turn is used to decide when we can attain data locality on a given host
val executorsByHostPort = new HashMap[String, HashSet[String]] private val executorsByHostPort = new HashMap[String, HashSet[String]]
val executorIdToHostPort = new HashMap[String, String] private val executorIdToHostPort = new HashMap[String, String]
// JAR server, if any JARs were added by the user to the SparkContext // JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null var jarServer: HttpServer = null
@ -102,6 +102,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) { def initialize(context: SchedulerBackend) {
backend = context backend = context
// resolve executorId to hostPort mapping.
def executorToHostPort(executorId: String, defaultHostPort: String): String = {
executorIdToHostPort.getOrElse(executorId, defaultHostPort)
}
// Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler
// Will that be a design violation ?
SparkEnv.get.executorIdToHostPort = Some(executorToHostPort)
} }
def newTaskId(): Long = nextTaskId.getAndIncrement() def newTaskId(): Long = nextTaskId.getAndIncrement()
@ -209,14 +217,31 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
// Build a list of tasks to assign to each slave // Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
// merge availableCpus into nodeToAvailableCpus block ?
val availableCpus = offers.map(o => o.cores).toArray val availableCpus = offers.map(o => o.cores).toArray
val nodeToAvailableCpus = {
val map = new HashMap[String, Int]()
for (offer <- offers) {
val hostPort = offer.hostPort
val cores = offer.cores
// DEBUG code
Utils.checkHostPort(hostPort)
val host = Utils.parseHostPort(hostPort)._1
map.put(host, map.getOrElse(host, 0) + cores)
}
map
}
var launchedTask = false var launchedTask = false
for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) { for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
// Split offers based on host local, rack local and off-rack tasks. // Split offers based on node local, rack local and off-rack tasks.
val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val processLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val nodeLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val otherOffers = new HashMap[String, ArrayBuffer[Int]]() val otherOffers = new HashMap[String, ArrayBuffer[Int]]()
@ -224,21 +249,30 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val hostPort = offers(i).hostPort val hostPort = offers(i).hostPort
// DEBUG code // DEBUG code
Utils.checkHostPort(hostPort) Utils.checkHostPort(hostPort)
val numProcessLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
if (numProcessLocalTasks > 0){
val list = processLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
for (j <- 0 until numProcessLocalTasks) list += i
}
val host = Utils.parseHostPort(hostPort)._1 val host = Utils.parseHostPort(hostPort)._1
val numHostLocalTasks = math.max(0, math.min(manager.numPendingTasksForHost(hostPort), availableCpus(i))) val numNodeLocalTasks = math.max(0,
if (numHostLocalTasks > 0){ // Remove process local tasks (which are also host local btw !) from this
val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) math.min(manager.numPendingTasksForHost(hostPort) - numProcessLocalTasks, nodeToAvailableCpus(host)))
for (j <- 0 until numHostLocalTasks) list += i if (numNodeLocalTasks > 0){
val list = nodeLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
for (j <- 0 until numNodeLocalTasks) list += i
} }
val numRackLocalTasks = math.max(0, val numRackLocalTasks = math.max(0,
// Remove host local tasks (which are also rack local btw !) from this // Remove node local tasks (which are also rack local btw !) from this
math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHostLocalTasks, availableCpus(i))) math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numProcessLocalTasks - numNodeLocalTasks, nodeToAvailableCpus(host)))
if (numRackLocalTasks > 0){ if (numRackLocalTasks > 0){
val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
for (j <- 0 until numRackLocalTasks) list += i for (j <- 0 until numRackLocalTasks) list += i
} }
if (numHostLocalTasks <= 0 && numRackLocalTasks <= 0){ if (numNodeLocalTasks <= 0 && numRackLocalTasks <= 0){
// add to others list - spread even this across cluster. // add to others list - spread even this across cluster.
val list = otherOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) val list = otherOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
list += i list += i
@ -246,12 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
val offersPriorityList = new ArrayBuffer[Int]( val offersPriorityList = new ArrayBuffer[Int](
hostLocalOffers.size + rackLocalOffers.size + otherOffers.size) processLocalOffers.size + nodeLocalOffers.size + rackLocalOffers.size + otherOffers.size)
// First host local, then rack, then others
val numHostLocalOffers = { // First process local, then host local, then rack, then others
val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers)
offersPriorityList ++= hostLocalPriorityList // numNodeLocalOffers contains count of both process local and host offers.
hostLocalPriorityList.size val numNodeLocalOffers = {
val processLocalPriorityList = ClusterScheduler.prioritizeContainers(processLocalOffers)
offersPriorityList ++= processLocalPriorityList
val nodeLocalPriorityList = ClusterScheduler.prioritizeContainers(nodeLocalOffers)
offersPriorityList ++= nodeLocalPriorityList
processLocalPriorityList.size + nodeLocalPriorityList.size
} }
val numRackLocalOffers = { val numRackLocalOffers = {
val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers) val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers)
@ -262,8 +303,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var lastLoop = false var lastLoop = false
val lastLoopIndex = TASK_SCHEDULING_AGGRESSION match { val lastLoopIndex = TASK_SCHEDULING_AGGRESSION match {
case TaskLocality.HOST_LOCAL => numHostLocalOffers case TaskLocality.NODE_LOCAL => numNodeLocalOffers
case TaskLocality.RACK_LOCAL => numRackLocalOffers + numHostLocalOffers case TaskLocality.RACK_LOCAL => numRackLocalOffers + numNodeLocalOffers
case TaskLocality.ANY => offersPriorityList.size case TaskLocality.ANY => offersPriorityList.size
} }
@ -302,8 +343,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// prevent more looping // prevent more looping
launchedTask = false launchedTask = false
} else if (!lastLoop && !launchedTask) { } else if (!lastLoop && !launchedTask) {
// Do this only if TASK_SCHEDULING_AGGRESSION != HOST_LOCAL // Do this only if TASK_SCHEDULING_AGGRESSION != NODE_LOCAL
if (TASK_SCHEDULING_AGGRESSION != TaskLocality.HOST_LOCAL) { if (TASK_SCHEDULING_AGGRESSION != TaskLocality.NODE_LOCAL) {
// fudge launchedTask to ensure we loop once more // fudge launchedTask to ensure we loop once more
launchedTask = true launchedTask = true
// dont loop anymore // dont loop anymore
@ -477,6 +518,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
def getExecutorsAliveOnHost(host: String): Option[Set[String]] = { def getExecutorsAliveOnHost(host: String): Option[Set[String]] = {
Utils.checkHost(host)
val retval = hostToAliveHostPorts.get(host) val retval = hostToAliveHostPorts.get(host)
if (retval.isDefined) { if (retval.isDefined) {
return Some(retval.get.toSet) return Some(retval.get.toSet)
@ -485,6 +528,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
None None
} }
def isExecutorAliveOnHostPort(hostPort: String): Boolean = {
// Even if hostPort is a host, it does not matter - it is just a specific check.
// But we do have to ensure that only hostPort get into hostPortsAlive !
// So no check against Utils.checkHostPort
hostPortsAlive.contains(hostPort)
}
// By default, rack is unknown // By default, rack is unknown
def getRackForHost(value: String): Option[String] = None def getRackForHost(value: String): Option[String] = None

View file

@ -13,17 +13,21 @@ import spark.scheduler._
import spark.TaskState.TaskState import spark.TaskState.TaskState
import java.nio.ByteBuffer import java.nio.ByteBuffer
private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
val HOST_LOCAL, RACK_LOCAL, ANY = Value // process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
type TaskLocality = Value type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
// Must not be the constraint.
assert (constraint != TaskLocality.PROCESS_LOCAL)
constraint match { constraint match {
case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL
case TaskLocality.RACK_LOCAL => condition == TaskLocality.HOST_LOCAL || condition == TaskLocality.RACK_LOCAL case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
// For anything else, allow // For anything else, allow
case _ => true case _ => true
} }
@ -32,12 +36,16 @@ private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL
def parse(str: String): TaskLocality = { def parse(str: String): TaskLocality = {
// better way to do this ? // better way to do this ?
try { try {
TaskLocality.withName(str) val retval = TaskLocality.withName(str)
// Must not specify PROCESS_LOCAL !
assert (retval != TaskLocality.PROCESS_LOCAL)
retval
} catch { } catch {
case nEx: NoSuchElementException => { case nEx: NoSuchElementException => {
logWarning("Invalid task locality specified '" + str + "', defaulting to HOST_LOCAL"); logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL");
// default to preserve earlier behavior // default to preserve earlier behavior
HOST_LOCAL NODE_LOCAL
} }
} }
} }
@ -76,7 +84,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Last time when we launched a preferred task (for delay scheduling) // Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis var lastPreferredLaunchTime = System.currentTimeMillis
// List of pending tasks for each node (hyper local to container). These collections are actually // List of pending tasks for each node (process local to container). These collections are actually
// treated as stacks, in which new tasks are added to the end of the // treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect // ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put // tasks that repeatedly fail because whenever a task failed, it is put
@ -133,35 +141,55 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
addPendingTask(i) addPendingTask(i)
} }
private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, rackLocal: Boolean = false): ArrayBuffer[String] = { // Note that it follows the hierarchy.
// DEBUG code // if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
_taskPreferredLocations.foreach(h => Utils.checkHost(h, "taskPreferredLocation " + _taskPreferredLocations)) // if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
val taskPreferredLocations = if (! rackLocal) _taskPreferredLocations else { if (TaskLocality.PROCESS_LOCAL == taskLocality) {
// Expand set to include all 'seen' rack local hosts. // straight forward comparison ! Special case it.
// This works since container allocation/management happens within master - so any rack locality information is updated in msater. val retval = new HashSet[String]()
// Best case effort, and maybe sort of kludge for now ... rework it later ? scheduler.synchronized {
val hosts = new HashSet[String] for (location <- _taskPreferredLocations) {
_taskPreferredLocations.foreach(h => { if (scheduler.isExecutorAliveOnHostPort(location)) {
val rackOpt = scheduler.getRackForHost(h) retval += location
if (rackOpt.isDefined) {
val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
if (hostsOpt.isDefined) {
hosts ++= hostsOpt.get
} }
} }
}
// Ensure that irrespective of what scheduler says, host is always added ! return retval
hosts += h
})
hosts
} }
val retval = new ArrayBuffer[String] val taskPreferredLocations =
if (TaskLocality.NODE_LOCAL == taskLocality) {
_taskPreferredLocations
} else {
assert (TaskLocality.RACK_LOCAL == taskLocality)
// Expand set to include all 'seen' rack local hosts.
// This works since container allocation/management happens within master - so any rack locality information is updated in msater.
// Best case effort, and maybe sort of kludge for now ... rework it later ?
val hosts = new HashSet[String]
_taskPreferredLocations.foreach(h => {
val rackOpt = scheduler.getRackForHost(h)
if (rackOpt.isDefined) {
val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
if (hostsOpt.isDefined) {
hosts ++= hostsOpt.get
}
}
// Ensure that irrespective of what scheduler says, host is always added !
hosts += h
})
hosts
}
val retval = new HashSet[String]
scheduler.synchronized { scheduler.synchronized {
for (prefLocation <- taskPreferredLocations) { for (prefLocation <- taskPreferredLocations) {
val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(prefLocation) val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1)
if (aliveLocationsOpt.isDefined) { if (aliveLocationsOpt.isDefined) {
retval ++= aliveLocationsOpt.get retval ++= aliveLocationsOpt.get
} }
@ -175,29 +203,37 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
private def addPendingTask(index: Int) { private def addPendingTask(index: Int) {
// We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate // We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
// hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it. // hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched) val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL)
val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, true) val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
if (rackLocalLocations.size == 0) { if (rackLocalLocations.size == 0) {
// Current impl ensures this. // Current impl ensures this.
assert (processLocalLocations.size == 0)
assert (hostLocalLocations.size == 0) assert (hostLocalLocations.size == 0)
pendingTasksWithNoPrefs += index pendingTasksWithNoPrefs += index
} else { } else {
// host locality // process local locality
for (hostPort <- hostLocalLocations) { for (hostPort <- processLocalLocations) {
// DEBUG Code // DEBUG Code
Utils.checkHostPort(hostPort) Utils.checkHostPort(hostPort)
val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer()) val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer())
hostPortList += index hostPortList += index
}
// host locality (includes process local)
for (hostPort <- hostLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
val host = Utils.parseHostPort(hostPort)._1 val host = Utils.parseHostPort(hostPort)._1
val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
hostList += index hostList += index
} }
// rack locality // rack locality (includes process local and host local)
for (rackLocalHostPort <- rackLocalLocations) { for (rackLocalHostPort <- rackLocalLocations) {
// DEBUG Code // DEBUG Code
Utils.checkHostPort(rackLocalHostPort) Utils.checkHostPort(rackLocalHostPort)
@ -211,7 +247,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
allPendingTasks += index allPendingTasks += index
} }
// Return the pending tasks list for a given host port (hyper local), or an empty list if // Return the pending tasks list for a given host port (process local), or an empty list if
// there is no map entry for that host // there is no map entry for that host
private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = { private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = {
// DEBUG Code // DEBUG Code
@ -233,6 +269,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer()) pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
} }
// Number of pending tasks for a given host Port (which would be process local)
def numPendingTasksForHostPort(hostPort: String): Int = {
getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
// Number of pending tasks for a given host (which would be data local) // Number of pending tasks for a given host (which would be data local)
def numPendingTasksForHost(hostPort: String): Int = { def numPendingTasksForHost(hostPort: String): Int = {
getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) ) getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
@ -264,13 +305,13 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// task must have a preference for this host/rack/no preferred locations at all. // task must have a preference for this host/rack/no preferred locations at all.
private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
assert (TaskLocality.isAllowed(locality, TaskLocality.HOST_LOCAL)) assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL))
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
if (speculatableTasks.size > 0) { if (speculatableTasks.size > 0) {
val localTask = speculatableTasks.find { val localTask = speculatableTasks.find {
index => index =>
val locations = findPreferredLocations(tasks(index).preferredLocations, sched) val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort) val attemptLocs = taskAttempts(index).map(_.hostPort)
(locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort) (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
} }
@ -284,7 +325,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
val rackTask = speculatableTasks.find { val rackTask = speculatableTasks.find {
index => index =>
val locations = findPreferredLocations(tasks(index).preferredLocations, sched, true) val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort) val attemptLocs = taskAttempts(index).map(_.hostPort)
locations.contains(hostPort) && !attemptLocs.contains(hostPort) locations.contains(hostPort) && !attemptLocs.contains(hostPort)
} }
@ -311,6 +352,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Dequeue a pending task for a given node and return its index. // Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well. // If localOnly is set to false, allow non-local tasks as well.
private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
if (processLocalTask != None) {
return processLocalTask
}
val localTask = findTaskFromList(getPendingTasksForHost(hostPort)) val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
if (localTask != None) { if (localTask != None) {
return localTask return localTask
@ -341,30 +387,31 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return findSpeculativeTask(hostPort, locality) return findSpeculativeTask(hostPort, locality)
} }
// Does a host count as a preferred location for a task? This is true if private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = {
// either the task has preferred locations and this host is one, or it has Utils.checkHostPort(hostPort)
// no preferred locations (in which we still count the launch as preferred).
private def isPreferredLocation(task: Task[_], hostPort: String): Boolean = {
val locs = task.preferredLocations
// DEBUG code
locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs))
if (locs.contains(hostPort) || locs.isEmpty) return true val locs = task.preferredLocations
locs.contains(hostPort)
}
private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = {
val locs = task.preferredLocations
// If no preference, consider it as host local
if (locs.isEmpty) return true
val host = Utils.parseHostPort(hostPort)._1 val host = Utils.parseHostPort(hostPort)._1
locs.contains(host) locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
} }
// Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location). // Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location).
// This is true if either the task has preferred locations and this host is one, or it has // This is true if either the task has preferred locations and this host is one, or it has
// no preferred locations (in which we still count the launch as preferred). // no preferred locations (in which we still count the launch as preferred).
def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = { private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
val locs = task.preferredLocations val locs = task.preferredLocations
// DEBUG code
locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs))
val preferredRacks = new HashSet[String]() val preferredRacks = new HashSet[String]()
for (preferredHost <- locs) { for (preferredHost <- locs) {
val rack = sched.getRackForHost(preferredHost) val rack = sched.getRackForHost(preferredHost)
@ -386,7 +433,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val locality = if (overrideLocality != null) overrideLocality else { val locality = if (overrideLocality != null) overrideLocality else {
// expand only if we have waited for more than LOCALITY_WAIT for a host local task ... // expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
val time = System.currentTimeMillis val time = System.currentTimeMillis
if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.HOST_LOCAL else TaskLocality.ANY if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY
} }
findTask(hostPort, locality) match { findTask(hostPort, locality) match {
@ -395,8 +442,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val task = tasks(index) val task = tasks(index)
val taskId = sched.newTaskId() val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch // Figure out whether this should count as a preferred launch
val taskLocality = if (isPreferredLocation(task, hostPort)) TaskLocality.HOST_LOCAL else val taskLocality =
if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else TaskLocality.ANY if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL else
if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL else
if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else
TaskLocality.ANY
val prefStr = taskLocality.toString val prefStr = taskLocality.toString
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format( logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
taskSet.id, index, taskId, execId, hostPort, prefStr)) taskSet.id, index, taskId, execId, hostPort, prefStr))
@ -406,7 +456,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality) val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality)
taskInfos(taskId) = info taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index) taskAttempts(index) = info :: taskAttempts(index)
if (TaskLocality.HOST_LOCAL == taskLocality) { if (TaskLocality.NODE_LOCAL == taskLocality) {
lastPreferredLaunchTime = time lastPreferredLaunchTime = time
} }
// Serialize and return the task // Serialize and return the task
@ -493,7 +543,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return return
case ef: ExceptionFailure => case ef: ExceptionFailure =>
val key = ef.exception.toString val key = ef.description
val now = System.currentTimeMillis val now = System.currentTimeMillis
val (printFull, dupCount) = { val (printFull, dupCount) = {
if (recentExceptions.contains(key)) { if (recentExceptions.contains(key)) {
@ -511,10 +561,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
} }
} }
if (printFull) { if (printFull) {
val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString)) val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n"))) logInfo("Loss was due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
} else { } else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount)) logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
} }
case _ => {} case _ => {}
@ -552,15 +603,22 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
def executorLost(execId: String, hostPort: String) { def executorLost(execId: String, hostPort: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id) logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// If some task has preferred locations only on hostname, and there are no more executors there, // If some task has preferred locations only on hostname, and there are no more executors there,
// put it in the no-prefs list to avoid the wait from delay scheduling // put it in the no-prefs list to avoid the wait from delay scheduling
for (index <- getPendingTasksForHostPort(hostPort)) {
val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, true) // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
// no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
// Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if
// there is no host local node for the task (not if there is no process local node for the task)
for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
// val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
if (newLocs.isEmpty) { if (newLocs.isEmpty) {
assert (findPreferredLocations(tasks(index).preferredLocations, sched).isEmpty)
pendingTasksWithNoPrefs += index pendingTasksWithNoPrefs += index
} }
} }
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
if (tasks(0).isInstanceOf[ShuffleMapTask]) { if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfos if info.executorId == execId) { for ((tid, info) <- taskInfos if info.executorId == execId) {

View file

@ -53,7 +53,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
def runTask(task: Task[_], idInJob: Int, attemptId: Int) { def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
logInfo("Running " + task) logInfo("Running " + task)
val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.HOST_LOCAL) val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
// Set the Spark execution environment for the worker thread // Set the Spark execution environment for the worker thread
SparkEnv.set(env) SparkEnv.set(env)
try { try {
@ -101,8 +101,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
submitTask(task, idInJob) submitTask(task, idInJob)
} else { } else {
// TODO: Do something nicer here to return all the way to the user // TODO: Do something nicer here to return all the way to the user
if (!Thread.currentThread().isInterrupted) if (!Thread.currentThread().isInterrupted) {
listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null) val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
listener.taskEnded(task, failure, null, null, info, null)
}
} }
} }
} }

View file

@ -4,7 +4,7 @@ import java.io.{InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer} import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet, Queue}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import akka.actor.{ActorSystem, Cancellable, Props} import akka.actor.{ActorSystem, Cancellable, Props}
@ -269,23 +269,12 @@ class BlockManager(
} }
/**
* Get locations of the block.
*/
def getLocations(blockId: String): Seq[String] = {
val startTimeMs = System.currentTimeMillis
var managers = master.getLocations(blockId)
val locations = managers.map(_.hostPort)
logDebug("Got block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
/** /**
* Get locations of an array of blocks. * Get locations of an array of blocks.
*/ */
def getLocations(blockIds: Array[String]): Array[Seq[String]] = { def getLocationBlockIds(blockIds: Array[String]): Array[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis val startTimeMs = System.currentTimeMillis
val locations = master.getLocations(blockIds).map(_.map(_.hostPort).toSeq).toArray val locations = master.getLocations(blockIds).toArray
logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations return locations
} }
@ -969,6 +958,46 @@ object BlockManager extends Logging {
} }
} }
} }
def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv, blockManagerMaster: BlockManagerMaster = null): HashMap[String, List[String]] = {
// env == null and blockManagerMaster != null is used in tests
assert (env != null || blockManagerMaster != null)
val locationBlockIds: Seq[Seq[BlockManagerId]] =
if (env != null) {
val blockManager = env.blockManager
blockManager.getLocationBlockIds(blockIds)
} else {
blockManagerMaster.getLocations(blockIds)
}
// Convert from block master locations to executor locations (we need that for task scheduling)
val executorLocations = new HashMap[String, List[String]]()
for (i <- 0 until blockIds.length) {
val blockId = blockIds(i)
val blockLocations = locationBlockIds(i)
val executors = new HashSet[String]()
if (env != null) {
for (bkLocation <- blockLocations) {
val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host)
executors += executorHostPort
// logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
}
} else {
// Typically while testing, etc - revert to simply using host.
for (bkLocation <- blockLocations) {
executors += bkLocation.host
// logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
}
}
executorLocations.put(blockId, executors.toSeq.toList)
}
executorLocations
}
} }
class BlockFetcherIterator( class BlockFetcherIterator(

View file

@ -18,6 +18,9 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._ import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel} import storage.{GetBlock, BlockManagerWorker, StorageLevel}
class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
val clusterUrl = "local-cluster[2,1,512]" val clusterUrl = "local-cluster[2,1,512]"
@ -27,6 +30,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
System.clearProperty("spark.storage.memoryFraction") System.clearProperty("spark.storage.memoryFraction")
} }
test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
// this test will hang. Correct behavior is that executors don't crash but fail tasks
// and the scheduler throws a SparkException.
// numSlaves must be less than numPartitions
val numSlaves = 3
val numPartitions = 10
sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
val data = sc.parallelize(1 to 100, numPartitions).
map(x => throw new NotSerializableExn(new NotSerializableClass))
intercept[SparkException] {
data.count()
}
resetSparkContext()
}
test("local-cluster format") { test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test") sc = new SparkContext("local-cluster[2,1,512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2) assert(sc.parallelize(1 to 2, 2).count() == 2)

View file

@ -80,15 +80,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
} }
test("remote fetch") { test("remote fetch") {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0) val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", "localhost:" + boundPort) System.setProperty("spark.hostPort", hostname + ":" + boundPort)
val masterTracker = new MapOutputTracker() val masterTracker = new MapOutputTracker()
masterTracker.trackerActor = actorSystem.actorOf( masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker") Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker")
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", "localhost", 0) val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker() val slaveTracker = new MapOutputTracker()
slaveTracker.trackerActor = slaveSystem.actorFor( slaveTracker.trackerActor = slaveSystem.actorFor(
"akka://spark@localhost:" + boundPort + "/user/MapOutputTracker") "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")

View file

@ -385,12 +385,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(results === Map(0 -> 42)) assert(results === Map(0 -> 42))
} }
/** Assert that the supplied TaskSet has exactly the given preferredLocations. */ /** Assert that the supplied TaskSet has exactly the given preferredLocations. Note, converts taskSet's locations to host only. */
private def assertLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) { private def assertLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) {
assert(locations.size === taskSet.tasks.size) assert(locations.size === taskSet.tasks.size)
for ((expectLocs, taskLocs) <- for ((expectLocs, taskLocs) <-
taskSet.tasks.map(_.preferredLocations).zip(locations)) { taskSet.tasks.map(_.preferredLocations).zip(locations)) {
assert(expectLocs === taskLocs) assert(expectLocs.map(loc => spark.Utils.parseHostPort(loc)._1) === taskLocs)
} }
} }

View file

@ -47,6 +47,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true") oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize) val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize() SizeEstimator invokePrivate initialize()
// Set some value ...
System.setProperty("spark.hostPort", spark.Utils.localHostName() + ":" + 1111)
} }
after { after {

View file

@ -30,6 +30,9 @@ If you want to test out the YARN deployment mode, you can use the current Spark
# Launching Spark on YARN # Launching Spark on YARN
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager.
The command to launch the YARN Client is as follows: The command to launch the YARN Client is as follows:
SPARK_JAR=<SPARK_YAR_FILE> ./run spark.deploy.yarn.Client \ SPARK_JAR=<SPARK_YAR_FILE> ./run spark.deploy.yarn.Client \

View file

@ -103,7 +103,7 @@ def parse_args():
parser.print_help() parser.print_help()
sys.exit(1) sys.exit(1)
(action, cluster_name) = args (action, cluster_name) = args
if opts.identity_file == None and action in ['launch', 'login']: if opts.identity_file == None and action in ['launch', 'login', 'start']:
print >> stderr, ("ERROR: The -i or --identity-file argument is " + print >> stderr, ("ERROR: The -i or --identity-file argument is " +
"required for " + action) "required for " + action)
sys.exit(1) sys.exit(1)

65
run
View file

@ -22,7 +22,7 @@ fi
# values for that; it doesn't need a lot # values for that; it doesn't need a lot
if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS+=" -Dspark.akka.logLifecycleEvents=true" SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default
fi fi
@ -30,19 +30,19 @@ fi
# Add java opts for master, worker, executor. The opts maybe null # Add java opts for master, worker, executor. The opts maybe null
case "$1" in case "$1" in
'spark.deploy.master.Master') 'spark.deploy.master.Master')
SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS" SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_MASTER_OPTS"
;; ;;
'spark.deploy.worker.Worker') 'spark.deploy.worker.Worker')
SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS" SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_WORKER_OPTS"
;; ;;
'spark.executor.StandaloneExecutorBackend') 'spark.executor.StandaloneExecutorBackend')
SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;; ;;
'spark.executor.MesosExecutorBackend') 'spark.executor.MesosExecutorBackend')
SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;; ;;
'spark.repl.Main') 'spark.repl.Main')
SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS" SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
;; ;;
esac esac
@ -85,11 +85,11 @@ export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size # Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS" JAVA_OPTS="$SPARK_JAVA_OPTS"
JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM" JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists # Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e $FWDIR/conf/java-opts ] ; then if [ -e $FWDIR/conf/java-opts ] ; then
JAVA_OPTS+=" `cat $FWDIR/conf/java-opts`" JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
fi fi
export JAVA_OPTS export JAVA_OPTS
@ -110,30 +110,30 @@ fi
# Build up classpath # Build up classpath
CLASSPATH="$SPARK_CLASSPATH" CLASSPATH="$SPARK_CLASSPATH"
CLASSPATH+=":$FWDIR/conf" CLASSPATH="$CLASSPATH:$FWDIR/conf"
CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
if [ -n "$SPARK_TESTING" ] ; then if [ -n "$SPARK_TESTING" ] ; then
CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
fi fi
CLASSPATH+=":$CORE_DIR/src/main/resources" CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
if [ -e "$FWDIR/lib_managed" ]; then if [ -e "$FWDIR/lib_managed" ]; then
CLASSPATH+=":$FWDIR/lib_managed/jars/*" CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
CLASSPATH+=":$FWDIR/lib_managed/bundles/*" CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
fi fi
CLASSPATH+=":$REPL_DIR/lib/*" CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
if [ -e $REPL_BIN_DIR/target ]; then if [ -e $REPL_BIN_DIR/target ]; then
for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
CLASSPATH+=":$jar" CLASSPATH="$CLASSPATH:$jar"
done done
fi fi
CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH+=":$jar" CLASSPATH="$CLASSPATH:$jar"
done done
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
@ -147,6 +147,17 @@ if [ -e "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar ]; then
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar` export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar`
fi fi
# Add hadoop conf dir - else FileSystem.*, etc fail !
# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
# the configurtion files.
if [ "x" != "x$HADOOP_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
fi
if [ "x" != "x$YARN_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
fi
# Figure out whether to run our class with java or with the scala launcher. # Figure out whether to run our class with java or with the scala launcher.
# In most cases, we'd prefer to execute our process with java because scala # In most cases, we'd prefer to execute our process with java because scala
# creates a shell script as the parent of its Java process, which makes it # creates a shell script as the parent of its Java process, which makes it
@ -156,9 +167,9 @@ fi
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS
else else
CLASSPATH+=":$SCALA_LIBRARY_PATH/scala-library.jar" CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
CLASSPATH+=":$SCALA_LIBRARY_PATH/scala-compiler.jar" CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
CLASSPATH+=":$SCALA_LIBRARY_PATH/jline.jar" CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"
# The JVM doesn't read JAVA_OPTS by default so we need to pass it in # The JVM doesn't read JAVA_OPTS by default so we need to pass it in
EXTRA_ARGS="$JAVA_OPTS" EXTRA_ARGS="$JAVA_OPTS"
fi fi

View file

@ -63,6 +63,19 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\* set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
rem Add hadoop conf dir - else FileSystem.*, etc fail
rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
rem the configurtion files.
if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
:no_hadoop_conf_dir
if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
:no_yarn_conf_dir
rem Figure out the JAR file that our examples were packaged into. rem Figure out the JAR file that our examples were packaged into.
rem First search in the build path from SBT: rem First search in the build path from SBT:
for %%d in ("examples/target/scala-%SCALA_VERSION%/spark-examples*.jar") do ( for %%d in ("examples/target/scala-%SCALA_VERSION%/spark-examples*.jar") do (