1.remove redundant spacing in source code

2.replace get/set functions with val and var defination
This commit is contained in:
Andrew xia 2013-03-29 08:20:35 +08:00
parent d1d9bdaabe
commit def3d1c84a
11 changed files with 91 additions and 144 deletions

View file

@ -13,7 +13,7 @@ private[spark] class ActiveJob(
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val callSite: String,
val listener: JobListener,
val listener: JobListener,
val properties: Properties) {
val numPartitions = partitions.length

View file

@ -241,7 +241,7 @@ class DAGScheduler(
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
resultHandler: (Int, U) => Unit,
properties: Properties = null)
{
if (partitions.size == 0) {
@ -263,7 +263,7 @@ class DAGScheduler(
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
callSite: String,
timeout: Long,
timeout: Long,
properties: Properties = null)
: PartialResult[R] =
{

View file

@ -60,7 +60,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var backend: SchedulerBackend = null
val mapOutputTracker = SparkEnv.get.mapOutputTracker
var taskSetQueuesManager: TaskSetQueuesManager = null
override def setListener(listener: TaskSchedulerListener) {
@ -131,11 +131,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
def taskFinished(manager: TaskSetManager) {
def taskFinished(manager: TaskSetManager) {
this.synchronized {
taskSetQueuesManager.taskFinished(manager)
taskSetQueuesManager.taskFinished(manager)
}
}
}
/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
@ -144,7 +144,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
synchronized {
SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
@ -228,7 +227,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
}
if (failedExecutor != None) {
listener.executorLost(failedExecutor.get)
listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
if (taskFailed) {
@ -299,7 +298,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
// Call listener.executorLost without holding the lock on this to prevent deadlock
if (failedExecutor != None) {
listener.executorLost(failedExecutor.get)
listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}

View file

@ -8,26 +8,26 @@ import spark.Logging
* A FIFO Implementation of the TaskSetQueuesManager
*/
private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
val tasksetSchedulingAlgorithm = new FIFOSchedulingAlgorithm()
override def addTaskSetManager(manager: TaskSetManager) {
activeTaskSetsQueue += manager
}
override def removeTaskSetManager(manager: TaskSetManager) {
activeTaskSetsQueue -= manager
}
override def taskFinished(manager: TaskSetManager) {
//do nothing
}
override def removeExecutor(executorId: String, host: String) {
activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
}
override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] =
{
for(manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator))
@ -48,5 +48,4 @@ private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with
}
return shouldRevive
}
}

View file

@ -14,15 +14,14 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* A Fair Implementation of the TaskSetQueuesManager
*
*
* Currently we support minShare,weight for fair scheduler between pools
* Within a pool, it supports FIFO or FS
* Also, currently we could allocate pools dynamically
*
*/
private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val poolNameToPool= new HashMap[String, Pool]
var pools = new ArrayBuffer[Pool]
val poolScheduleAlgorithm = new FairSchedulingAlgorithm()
@ -36,9 +35,9 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
val POOL_DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
val POOL_DEFAULT_MINIMUM_SHARES = 2
val POOL_DEFAULT_WEIGHT = 1
loadPoolProperties()
def loadPoolProperties() {
//first check if the file exists
val file = new File(schedulerAllocFile)
@ -51,26 +50,25 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE
var minShares = POOL_DEFAULT_MINIMUM_SHARES
var weight = POOL_DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text
if( xmlSchedulingMode != "")
{
try
try
{
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
}
catch{
case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text
if(xmlMinShares != "")
{
minShares = xmlMinShares.toInt
}
val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text
if(xmlWeight != "")
{
@ -84,15 +82,15 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
}
}
if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME))
if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME))
{
val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
pools += pool
poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool
logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
}
}
}
override def addTaskSetManager(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
if(manager.taskSet.properties != null)
@ -100,19 +98,19 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
if(!poolNameToPool.contains(poolName))
{
//we will create a new pool that user has configured in app,but not contained in xml file
//we will create a new pool that user has configured in app instead of being defined in xml file
val pool = new Pool(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
pools += pool
poolNameToPool(poolName) = pool
logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
}
}
poolNameToPool(poolName).addTaskSetManager(manager)
logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName)
logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName)
}
override def removeTaskSetManager(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
if(manager.taskSet.properties != null)
{
@ -121,10 +119,9 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id,poolName))
val pool = poolNameToPool(poolName)
pool.removeTaskSetManager(manager)
pool.setRunningTasks(pool.getRunningTasks() - manager.getRunningTasks())
pool.runningTasks -= manager.runningTasks
}
override def taskFinished(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
if(manager.taskSet.properties != null)
@ -132,40 +129,40 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
}
val pool = poolNameToPool(poolName)
pool.setRunningTasks(pool.getRunningTasks() - 1)
manager.setRunningTasks(manager.getRunningTasks() - 1)
pool.runningTasks -= 1
manager.runningTasks -=1
}
override def removeExecutor(executorId: String, host: String) {
for (pool <- pools) {
pool.removeExecutor(executorId,host)
}
pool.removeExecutor(executorId,host)
}
}
override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] =
{
val sortedPools = pools.sortWith(poolScheduleAlgorithm.comparator)
for(pool <- sortedPools)
{
logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.getMinShare(),pool.getRunningTasks()))
logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks))
}
for (pool <- sortedPools)
{
val task = pool.receiveOffer(execId,host,avaiableCpus)
if(task != None)
{
pool.setRunningTasks(pool.getRunningTasks() + 1)
pool.runningTasks += 1
return task
}
}
return None
}
override def checkSpeculatableTasks(): Boolean =
override def checkSpeculatableTasks(): Boolean =
{
var shouldRevive = false
for (pool <- pools)
for (pool <- pools)
{
shouldRevive |= pool.checkSpeculatableTasks()
}

View file

@ -7,13 +7,21 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* An Schedulable entity that represent collection of TaskSetManager
*/
private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,val minShare:Int, val weight:Int) extends Schedulable with Logging {
private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMode, initMinShare:Int, initWeight:Int) extends Schedulable with Logging
{
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
var numRunningTasks: Int = 0
var taskSetSchedulingAlgorithm: SchedulingAlgorithm =
var weight = initWeight
var minShare = initMinShare
var runningTasks = 0
val priority = 0
val stageId = 0
var taskSetSchedulingAlgorithm: SchedulingAlgorithm =
{
schedulingMode match
schedulingMode match
{
case SchedulingMode.FAIR =>
val schedule = new FairSchedulingAlgorithm()
@ -23,26 +31,6 @@ private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,v
schedule
}
}
override def getMinShare():Int =
{
return minShare
}
override def getRunningTasks():Int =
{
return numRunningTasks
}
def setRunningTasks(taskNum : Int)
{
numRunningTasks = taskNum
}
override def getWeight(): Int =
{
return weight
}
def addTaskSetManager(manager:TaskSetManager)
{
@ -74,15 +62,14 @@ private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,v
val sortedActiveTasksSetQueue = activeTaskSetsQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
for(manager <- sortedActiveTasksSetQueue)
{
logDebug("taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(manager.taskSet.id,manager.numTasks,manager.getMinShare(),manager.getWeight(),manager.getRunningTasks()))
logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks))
}
for(manager <- sortedActiveTasksSetQueue)
{
val task = manager.slaveOffer(execId,host,availableCpus)
if (task != None)
{
manager.setRunningTasks(manager.getRunningTasks() + 1)
manager.runningTasks += 1
return task
}
}

View file

@ -1,21 +1,12 @@
package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
/**
* An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers)
* An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
def getMinShare(): Int
def getRunningTasks(): Int
def getPriority(): Int =
{
return 0
}
def getWeight(): Int
def getStageId(): Int =
{
return 0
}
def weight:Int
def minShare:Int
def runningTasks:Int
def priority:Int
def stageId:Int
}

View file

@ -1,7 +1,7 @@
package spark.scheduler.cluster
/**
* An interface for sort algorithm
* An interface for sort algorithm
* FIFO: FIFO algorithm for TaskSetManagers
* FS: FS algorithm for Pools, and FIFO or FS for TaskSetManagers
*/
@ -13,13 +13,13 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm
{
override def comparator(s1: Schedulable, s2: Schedulable): Boolean =
{
val priority1 = s1.getPriority()
val priority2 = s2.getPriority()
val priority1 = s1.priority
val priority2 = s2.priority
var res = Math.signum(priority1 - priority2)
if (res == 0)
{
val stageId1 = s1.getStageId()
val stageId2 = s2.getStageId()
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = Math.signum(stageId1 - stageId2)
}
if (res < 0)
@ -29,7 +29,7 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm
else
{
return false
}
}
}
}
@ -37,16 +37,18 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
{
def comparator(s1: Schedulable, s2:Schedulable): Boolean =
{
val minShare1 = s1.getMinShare()
val minShare2 = s2.getMinShare()
val s1Needy = s1.getRunningTasks() < minShare1
val s2Needy = s2.getRunningTasks() < minShare2
val minShareRatio1 = s1.getRunningTasks().toDouble / Math.max(minShare1,1.0).toDouble
val minShareRatio2 = s2.getRunningTasks().toDouble / Math.max(minShare2,1.0).toDouble
val taskToWeightRatio1 = s1.getRunningTasks().toDouble / s1.getWeight().toDouble
val taskToWeightRatio2 = s2.getRunningTasks().toDouble / s2.getWeight().toDouble
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1,1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2,1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true
if(s1Needy && !s2Needy)
{
res = true
@ -57,7 +59,7 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
}
else if (s1Needy && s2Needy)
{
res = minShareRatio1 <= minShareRatio2
res = minShareRatio1 <= minShareRatio2
}
else
{

View file

@ -1,4 +1,4 @@
package spark.scheduler.cluster
package spark.scheduler.cluster
object SchedulingMode extends Enumeration("FAIR","FIFO")
{

View file

@ -29,7 +29,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val MAX_TASK_FAILURES = 4
val TASKSET_MINIMUM_SHARES = 1
val TASKSET_WEIGHT = 1
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
@ -38,7 +37,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Serializer for closures and tasks.
val ser = SparkEnv.get.closureSerializer.newInstance()
var weight = TASKSET_WEIGHT
var minShare = TASKSET_MINIMUM_SHARES
var runningTasks = 0
val priority = taskSet.priority
val stageId = taskSet.stageId
val tasks = taskSet.tasks
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
@ -46,7 +50,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val numFailures = new Array[Int](numTasks)
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
var numRunningTasks =0;
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@ -100,36 +103,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
addPendingTask(i)
}
override def getMinShare(): Int =
{
return TASKSET_MINIMUM_SHARES
}
override def getRunningTasks(): Int =
{
return numRunningTasks
}
def setRunningTasks(taskNum :Int)
{
numRunningTasks = taskNum
}
override def getPriority(): Int =
{
return priority
}
override def getWeight(): Int =
{
return TASKSET_WEIGHT
}
override def getStageId(): Int =
{
return taskSet.stageId
}
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive

View file

@ -5,7 +5,6 @@ import scala.collection.mutable.ArrayBuffer
/**
* An interface for managing TaskSet queue/s that allows plugging different policy for
* offering tasks to resources
*
*/
private[spark] trait TaskSetQueuesManager {
def addTaskSetManager(manager: TaskSetManager): Unit