diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala index b6d3c2c089..105eaecb22 100644 --- a/core/src/main/scala/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala @@ -13,7 +13,7 @@ private[spark] class ActiveJob( val func: (TaskContext, Iterator[_]) => _, val partitions: Array[Int], val callSite: String, - val listener: JobListener, + val listener: JobListener, val properties: Properties) { val numPartitions = partitions.length diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 717cc27739..0a64a4f041 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -241,7 +241,7 @@ class DAGScheduler( partitions: Seq[Int], callSite: String, allowLocal: Boolean, - resultHandler: (Int, U) => Unit, + resultHandler: (Int, U) => Unit, properties: Properties = null) { if (partitions.size == 0) { @@ -263,7 +263,7 @@ class DAGScheduler( func: (TaskContext, Iterator[T]) => U, evaluator: ApproximateEvaluator[U, R], callSite: String, - timeout: Long, + timeout: Long, properties: Properties = null) : PartialResult[R] = { diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 092b0a0cfc..be0d480aa0 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -60,7 +60,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) var backend: SchedulerBackend = null val mapOutputTracker = SparkEnv.get.mapOutputTracker - + var taskSetQueuesManager: TaskSetQueuesManager = null override def setListener(listener: TaskSchedulerListener) { @@ -131,11 +131,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def taskFinished(manager: TaskSetManager) { + def taskFinished(manager: TaskSetManager) { this.synchronized { - taskSetQueuesManager.taskFinished(manager) + taskSetQueuesManager.taskFinished(manager) } - } + } /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task @@ -144,7 +144,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = { synchronized { - SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname for (o <- offers) { @@ -228,7 +227,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) taskSetToUpdate.get.statusUpdate(tid, state, serializedData) } if (failedExecutor != None) { - listener.executorLost(failedExecutor.get) + listener.executorLost(failedExecutor.get) backend.reviveOffers() } if (taskFailed) { @@ -299,7 +298,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } // Call listener.executorLost without holding the lock on this to prevent deadlock if (failedExecutor != None) { - listener.executorLost(failedExecutor.get) + listener.executorLost(failedExecutor.get) backend.reviveOffers() } } diff --git a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala index 868b11c8d6..5949ee773f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala @@ -8,26 +8,26 @@ import spark.Logging * A FIFO Implementation of the TaskSetQueuesManager */ private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with Logging { - + var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] val tasksetSchedulingAlgorithm = new FIFOSchedulingAlgorithm() - + override def addTaskSetManager(manager: TaskSetManager) { activeTaskSetsQueue += manager } - + override def removeTaskSetManager(manager: TaskSetManager) { activeTaskSetsQueue -= manager } - + override def taskFinished(manager: TaskSetManager) { //do nothing } - + override def removeExecutor(executorId: String, host: String) { activeTaskSetsQueue.foreach(_.executorLost(executorId, host)) } - + override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] = { for(manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator)) @@ -48,5 +48,4 @@ private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with } return shouldRevive } - } diff --git a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala index 4e26cedfda..0609600f35 100644 --- a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala @@ -14,15 +14,14 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** * A Fair Implementation of the TaskSetQueuesManager - * + * * Currently we support minShare,weight for fair scheduler between pools * Within a pool, it supports FIFO or FS * Also, currently we could allocate pools dynamically - * */ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with Logging { - - val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified") + + val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified") val poolNameToPool= new HashMap[String, Pool] var pools = new ArrayBuffer[Pool] val poolScheduleAlgorithm = new FairSchedulingAlgorithm() @@ -36,9 +35,9 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with val POOL_DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO val POOL_DEFAULT_MINIMUM_SHARES = 2 val POOL_DEFAULT_WEIGHT = 1 - + loadPoolProperties() - + def loadPoolProperties() { //first check if the file exists val file = new File(schedulerAllocFile) @@ -51,26 +50,25 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE var minShares = POOL_DEFAULT_MINIMUM_SHARES var weight = POOL_DEFAULT_WEIGHT - - + val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text if( xmlSchedulingMode != "") { - try + try { schedulingMode = SchedulingMode.withName(xmlSchedulingMode) } catch{ - case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode") + case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode") } } - + val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text if(xmlMinShares != "") { minShares = xmlMinShares.toInt } - + val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text if(xmlWeight != "") { @@ -84,15 +82,15 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with } } - if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME)) + if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME)) { val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT) pools += pool poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)) - } + } } - + override def addTaskSetManager(manager: TaskSetManager) { var poolName = POOL_DEFAULT_POOL_NAME if(manager.taskSet.properties != null) @@ -100,19 +98,19 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME) if(!poolNameToPool.contains(poolName)) { - //we will create a new pool that user has configured in app,but not contained in xml file + //we will create a new pool that user has configured in app instead of being defined in xml file val pool = new Pool(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT) pools += pool poolNameToPool(poolName) = pool - logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)) + logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)) } } poolNameToPool(poolName).addTaskSetManager(manager) - logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName) + logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName) } - + override def removeTaskSetManager(manager: TaskSetManager) { - + var poolName = POOL_DEFAULT_POOL_NAME if(manager.taskSet.properties != null) { @@ -121,10 +119,9 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id,poolName)) val pool = poolNameToPool(poolName) pool.removeTaskSetManager(manager) - pool.setRunningTasks(pool.getRunningTasks() - manager.getRunningTasks()) - + pool.runningTasks -= manager.runningTasks } - + override def taskFinished(manager: TaskSetManager) { var poolName = POOL_DEFAULT_POOL_NAME if(manager.taskSet.properties != null) @@ -132,40 +129,40 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME) } val pool = poolNameToPool(poolName) - pool.setRunningTasks(pool.getRunningTasks() - 1) - manager.setRunningTasks(manager.getRunningTasks() - 1) + pool.runningTasks -= 1 + manager.runningTasks -=1 } - + override def removeExecutor(executorId: String, host: String) { for (pool <- pools) { - pool.removeExecutor(executorId,host) - } + pool.removeExecutor(executorId,host) + } } - + override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] = { val sortedPools = pools.sortWith(poolScheduleAlgorithm.comparator) for(pool <- sortedPools) { - logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.getMinShare(),pool.getRunningTasks())) + logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks)) } for (pool <- sortedPools) { val task = pool.receiveOffer(execId,host,avaiableCpus) if(task != None) { - pool.setRunningTasks(pool.getRunningTasks() + 1) + pool.runningTasks += 1 return task } } return None } - - override def checkSpeculatableTasks(): Boolean = + + override def checkSpeculatableTasks(): Boolean = { var shouldRevive = false - for (pool <- pools) + for (pool <- pools) { shouldRevive |= pool.checkSpeculatableTasks() } diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala index 68e1d2a75a..8fdca5d2b4 100644 --- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala +++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala @@ -7,13 +7,21 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** * An Schedulable entity that represent collection of TaskSetManager */ -private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,val minShare:Int, val weight:Int) extends Schedulable with Logging { - +private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMode, initMinShare:Int, initWeight:Int) extends Schedulable with Logging +{ + var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] - var numRunningTasks: Int = 0 - var taskSetSchedulingAlgorithm: SchedulingAlgorithm = + + var weight = initWeight + var minShare = initMinShare + var runningTasks = 0 + + val priority = 0 + val stageId = 0 + + var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { - schedulingMode match + schedulingMode match { case SchedulingMode.FAIR => val schedule = new FairSchedulingAlgorithm() @@ -23,26 +31,6 @@ private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,v schedule } } - - override def getMinShare():Int = - { - return minShare - } - - override def getRunningTasks():Int = - { - return numRunningTasks - } - - def setRunningTasks(taskNum : Int) - { - numRunningTasks = taskNum - } - - override def getWeight(): Int = - { - return weight - } def addTaskSetManager(manager:TaskSetManager) { @@ -74,15 +62,14 @@ private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,v val sortedActiveTasksSetQueue = activeTaskSetsQueue.sortWith(taskSetSchedulingAlgorithm.comparator) for(manager <- sortedActiveTasksSetQueue) { - - logDebug("taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(manager.taskSet.id,manager.numTasks,manager.getMinShare(),manager.getWeight(),manager.getRunningTasks())) + logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks)) } for(manager <- sortedActiveTasksSetQueue) { val task = manager.slaveOffer(execId,host,availableCpus) if (task != None) { - manager.setRunningTasks(manager.getRunningTasks() + 1) + manager.runningTasks += 1 return task } } diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala index 837f9c4983..6f4f104f42 100644 --- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala +++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala @@ -1,21 +1,12 @@ package spark.scheduler.cluster -import scala.collection.mutable.ArrayBuffer - /** - * An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers) + * An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers) */ private[spark] trait Schedulable { - - def getMinShare(): Int - def getRunningTasks(): Int - def getPriority(): Int = - { - return 0 - } - def getWeight(): Int - def getStageId(): Int = - { - return 0 - } + def weight:Int + def minShare:Int + def runningTasks:Int + def priority:Int + def stageId:Int } diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala index f8919e7374..2f8123587f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala @@ -1,7 +1,7 @@ package spark.scheduler.cluster /** - * An interface for sort algorithm + * An interface for sort algorithm * FIFO: FIFO algorithm for TaskSetManagers * FS: FS algorithm for Pools, and FIFO or FS for TaskSetManagers */ @@ -13,13 +13,13 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { - val priority1 = s1.getPriority() - val priority2 = s2.getPriority() + val priority1 = s1.priority + val priority2 = s2.priority var res = Math.signum(priority1 - priority2) if (res == 0) { - val stageId1 = s1.getStageId() - val stageId2 = s2.getStageId() + val stageId1 = s1.stageId + val stageId2 = s2.stageId res = Math.signum(stageId1 - stageId2) } if (res < 0) @@ -29,7 +29,7 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm else { return false - } + } } } @@ -37,16 +37,18 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { def comparator(s1: Schedulable, s2:Schedulable): Boolean = { - val minShare1 = s1.getMinShare() - val minShare2 = s2.getMinShare() - val s1Needy = s1.getRunningTasks() < minShare1 - val s2Needy = s2.getRunningTasks() < minShare2 - val minShareRatio1 = s1.getRunningTasks().toDouble / Math.max(minShare1,1.0).toDouble - val minShareRatio2 = s2.getRunningTasks().toDouble / Math.max(minShare2,1.0).toDouble - val taskToWeightRatio1 = s1.getRunningTasks().toDouble / s1.getWeight().toDouble - val taskToWeightRatio2 = s2.getRunningTasks().toDouble / s2.getWeight().toDouble + val minShare1 = s1.minShare + val minShare2 = s2.minShare + val runningTasks1 = s1.runningTasks + val runningTasks2 = s2.runningTasks + val s1Needy = runningTasks1 < minShare1 + val s2Needy = runningTasks2 < minShare2 + val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1,1.0).toDouble + val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2,1.0).toDouble + val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble + val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var res:Boolean = true - + if(s1Needy && !s2Needy) { res = true @@ -57,7 +59,7 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm } else if (s1Needy && s2Needy) { - res = minShareRatio1 <= minShareRatio2 + res = minShareRatio1 <= minShareRatio2 } else { diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala index 6be4f3cd84..480af2c1a3 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala @@ -1,4 +1,4 @@ -package spark.scheduler.cluster +package spark.scheduler.cluster object SchedulingMode extends Enumeration("FAIR","FIFO") { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 064593f486..ddc4fa6642 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -29,7 +29,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val MAX_TASK_FAILURES = 4 val TASKSET_MINIMUM_SHARES = 1 - val TASKSET_WEIGHT = 1 // Quantile of tasks at which to start speculation val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble @@ -38,7 +37,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Serializer for closures and tasks. val ser = SparkEnv.get.closureSerializer.newInstance() + var weight = TASKSET_WEIGHT + var minShare = TASKSET_MINIMUM_SHARES + var runningTasks = 0 val priority = taskSet.priority + val stageId = taskSet.stageId + val tasks = taskSet.tasks val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) @@ -46,7 +50,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksFinished = 0 - var numRunningTasks =0; // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis @@ -100,36 +103,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe addPendingTask(i) } - override def getMinShare(): Int = - { - return TASKSET_MINIMUM_SHARES - } - - override def getRunningTasks(): Int = - { - return numRunningTasks - } - - def setRunningTasks(taskNum :Int) - { - numRunningTasks = taskNum - } - - override def getPriority(): Int = - { - return priority - } - - override def getWeight(): Int = - { - return TASKSET_WEIGHT - } - - override def getStageId(): Int = - { - return taskSet.stageId - } - // Add a task to all the pending-task lists that it should be on. private def addPendingTask(index: Int) { val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala index c117ee7a85..86971d47e6 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala @@ -5,7 +5,6 @@ import scala.collection.mutable.ArrayBuffer /** * An interface for managing TaskSet queue/s that allows plugging different policy for * offering tasks to resources - * */ private[spark] trait TaskSetQueuesManager { def addTaskSetManager(manager: TaskSetManager): Unit