refactor codes less than 100 character per line

This commit is contained in:
Andrew xia 2013-07-30 11:41:38 +08:00
parent 614ee16cc4
commit 5406013997
9 changed files with 39 additions and 24 deletions

View file

@ -44,9 +44,10 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable,
SchedulingMode}
import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener,
SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}

View file

@ -473,7 +473,8 @@ class DAGScheduler(
}
if (tasks.size > 0) {
val properties = idToActiveJob(stage.priority).properties
sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size, properties)))
sparkListeners.foreach(_.onStageSubmitted(
SparkListenerStageSubmitted(stage, tasks.size, properties)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)

View file

@ -8,17 +8,18 @@ import spark.executor.TaskMetrics
sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) extends SparkListenerEvents
case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
extends SparkListenerEvents
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
trait SparkListener {
@ -26,12 +27,12 @@ trait SparkListener {
* Called when a stage is completed, with information on the completed stage
*/
def onStageCompleted(stageCompleted: StageCompleted) { }
/**
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
/**
* Called when a task ends
*/
@ -41,12 +42,12 @@ trait SparkListener {
* Called when a job starts
*/
def onJobStart(jobStart: SparkListenerJobStart) { }
/**
* Called when a job ends
*/
def onJobEnd(jobEnd: SparkListenerJobEnd) { }
}
/**

View file

@ -9,10 +9,10 @@ import scala.collection.mutable.HashSet
import spark._
import spark.TaskState.TaskState
import spark.scheduler._
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@ -257,7 +257,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue) {
logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
logInfo("parentName:%s, name:%s, runningTasks:%s".format(
manager.parent.name, manager.name, manager.runningTasks))
}
for (manager <- sortedTaskSetQueue) {

View file

@ -644,7 +644,8 @@ private[spark] class ClusterTaskSetManager(
}
}
// TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
// TODO: for now we just find Pool not TaskSetManager
// we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}

View file

@ -24,7 +24,8 @@ private[spark] trait SchedulableBuilder {
def addTaskSetManager(manager: Schedulable, properties: Properties)
}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
override def buildPools() {
// nothing
@ -35,7 +36,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
@ -88,7 +90,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
// finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
@ -102,8 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
parentPool = rootPool.getSchedulableByName(poolName)
if (parentPool == null) {
// we will create a new pool that user has configured in app instead of being defined in xml file
parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
// we will create a new pool that user has configured in app
// instead of being defined in xml file
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))

View file

@ -1,7 +1,8 @@
package spark.scheduler.cluster
/**
* "FAIR" and "FIFO" determines which policy is used to order tasks amongst a Schedulable's sub-queues
* "FAIR" and "FIFO" determines which policy is used
* to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {

View file

@ -40,9 +40,12 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</div>
</div> ++
<h3>Pools </h3> ++ poolTable.toNodeSeq ++
<h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq++
<h3>Completed Stages : {completedStages.size}</h3> ++ completedStagesTable.toNodeSeq++
<h3>Failed Stages : {failedStages.size}</h3> ++ failedStagesTable.toNodeSeq
<h3>Active Stages : {activeStages.size}</h3> ++
activeStagesTable.toNodeSeq++
<h3>Completed Stages : {completedStages.size}</h3> ++
completedStagesTable.toNodeSeq++
<h3>Failed Stages : {failedStages.size}</h3> ++
failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages/Pools", Jobs)
}

View file

@ -53,7 +53,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
activeStages += stage
var poolName = DEFAULT_POOL_NAME
if (stageSubmitted.properties != null) {
poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool",
DEFAULT_POOL_NAME)
}
stageToPool(stage) = poolName
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())