Merge pull request #844 from markhamstra/priorityRename

Renamed 'priority' to 'jobId' and assorted minor changes
This commit is contained in:
Matei Zaharia 2013-08-20 10:06:06 -07:00
commit d61337f640
5 changed files with 60 additions and 59 deletions

View file

@ -25,7 +25,7 @@ import java.util.Properties
* Tracks information about an active job in the DAGScheduler.
*/
private[spark] class ActiveJob(
val runId: Int,
val jobId: Int,
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],

View file

@ -104,11 +104,11 @@ class DAGScheduler(
private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
val nextRunId = new AtomicInteger(0)
val nextJobId = new AtomicInteger(0)
val nextStageId = new AtomicInteger(0)
val idToStage = new TimeStampedHashMap[Int, Stage]
val stageIdToStage = new TimeStampedHashMap[Int, Stage]
val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
@ -171,14 +171,14 @@ class DAGScheduler(
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The priority value passed in will be used if the stage doesn't already exist with
* a lower priority (we assume that priorities always increase across jobs for now).
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority)
val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
@ -186,13 +186,13 @@ class DAGScheduler(
/**
* Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
* as a result stage for the final RDD used directly in an action. The stage will also be given
* the provided priority.
* as a result stage for the final RDD used directly in an action. The stage will also be
* associated with the provided jobId.
*/
private def newStage(
rdd: RDD[_],
shuffleDep: Option[ShuffleDependency[_,_]],
priority: Int,
jobId: Int,
callSite: Option[String] = None)
: Stage =
{
@ -203,17 +203,17 @@ class DAGScheduler(
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
}
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite)
idToStage(id) = stage
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
stageToInfos(stage) = StageInfo(stage)
stage
}
/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided priority if they haven't already been created with a lower priority.
* provided jobId if they haven't already been created with a lower jobId.
*/
private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = {
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
@ -224,7 +224,7 @@ class DAGScheduler(
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
parents += getShuffleMapStage(shufDep, priority)
parents += getShuffleMapStage(shufDep, jobId)
case _ =>
visit(dep.rdd)
}
@ -245,7 +245,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
@ -282,7 +282,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
properties)
return (toSubmit, waiter)
(toSubmit, waiter)
}
def runJob[T, U: ClassManifest](
@ -329,8 +329,8 @@ class DAGScheduler(
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener, properties))
return listener.awaitResult() // Will throw an exception if the job fails
eventQueue.put(JobSubmitted(rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
listener.awaitResult() // Will throw an exception if the job fails
}
/**
@ -340,11 +340,11 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId, Some(callSite))
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
val jobId = nextJobId.getAndIncrement()
val finalStage = newStage(finalRDD, None, jobId, Some(callSite))
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
@ -354,7 +354,7 @@ class DAGScheduler(
runLocally(job)
} else {
listenerBus.post(SparkListenerJobStart(job, properties))
idToActiveJob(runId) = job
idToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
submitStage(finalStage)
@ -375,7 +375,7 @@ class DAGScheduler(
handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
abortStage(idToStage(taskSet.stageId), reason)
abortStage(stageIdToStage(taskSet.stageId), reason)
case StopDAGScheduler =>
// Cancel any active jobs
@ -386,7 +386,7 @@ class DAGScheduler(
}
return true
}
return false
false
}
/**
@ -398,7 +398,7 @@ class DAGScheduler(
clearCacheLocs()
val failed2 = failed.toArray
failed.clear()
for (stage <- failed2.sortBy(_.priority)) {
for (stage <- failed2.sortBy(_.jobId)) {
submitStage(stage)
}
}
@ -416,7 +416,7 @@ class DAGScheduler(
logTrace("failed: " + failed)
val waiting2 = waiting.toArray
waiting.clear()
for (stage <- waiting2.sortBy(_.priority)) {
for (stage <- waiting2.sortBy(_.jobId)) {
submitStage(stage)
}
}
@ -463,7 +463,7 @@ class DAGScheduler(
*/
protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally")
new Thread("Local computation of job " + job.runId) {
new Thread("Local computation of job " + job.jobId) {
override def run() {
runLocallyWithinThread(job)
}
@ -531,7 +531,7 @@ class DAGScheduler(
}
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.priority).properties
val properties = idToActiveJob(stage.jobId).properties
listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
if (tasks.size > 0) {
@ -552,7 +552,7 @@ class DAGScheduler(
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis())
}
@ -569,7 +569,7 @@ class DAGScheduler(
*/
private def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stage = idToStage(task.stageId)
val stage = stageIdToStage(task.stageId)
def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.submissionTime match {
@ -598,7 +598,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
idToActiveJob -= stage.priority
idToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
@ -635,7 +635,7 @@ class DAGScheduler(
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
true)
changeEpoch = true)
}
clearCacheLocs()
if (stage.outputLocs.count(_ == Nil) != 0) {
@ -669,7 +669,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = idToStage(task.stageId)
val failedStage = stageIdToStage(task.stageId)
running -= failedStage
failed += failedStage
// TODO: Cancel running tasks in the stage
@ -697,7 +697,7 @@ class DAGScheduler(
case other =>
// Unrecognized failure - abort all jobs depending on this stage
abortStage(idToStage(task.stageId), task + " failed: " + other)
abortStage(stageIdToStage(task.stageId), task + " failed: " + other)
}
}
@ -718,7 +718,7 @@ class DAGScheduler(
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
if (shuffleToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
@ -750,7 +750,7 @@ class DAGScheduler(
val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
idToActiveJob -= resultStage.priority
idToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
}
@ -774,7 +774,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
visitedStages += mapStage
visit(mapStage.rdd)
@ -812,13 +812,13 @@ class DAGScheduler(
}
case _ =>
})
return Nil
Nil
}
private def cleanup(cleanupTime: Long) {
var sizeBefore = idToStage.size
idToStage.clearOldValues(cleanupTime)
logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
var sizeBefore = stageIdToStage.size
stageIdToStage.clearOldValues(cleanupTime)
logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size)
sizeBefore = shuffleToMapStage.size
shuffleToMapStage.clearOldValues(cleanupTime)

View file

@ -21,7 +21,7 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends
})
metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get()
override def getValue: Int = dagScheduler.nextJobId.get()
})
metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {

View file

@ -102,7 +102,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.priority == jobID) {
if (stage.jobId == jobID) {
jobIDToStages.get(jobID) match {
case Some(stageList) => stageList += stage
case None => val stageList = new ListBuffer[Stage]
@ -178,12 +178,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}else{
stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
}
if (stage.priority == jobID) {
if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
} else
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
}
// Record task metrics into job log files
@ -260,7 +260,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job
var info = "JOB_ID=" + job.runId
var info = "JOB_ID=" + job.jobId
jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) =>
@ -268,8 +268,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
}
jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.runId)
jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.jobId)
}
protected def recordJobProperties(jobID: Int, properties: Properties) {
@ -282,11 +282,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job
val properties = jobStart.properties
createLogWriter(job.runId)
recordJobProperties(job.runId, properties)
buildJobDep(job.runId, job.finalStage)
recordStageDep(job.runId)
recordStageDepGraph(job.runId, job.finalStage)
jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED")
createLogWriter(job.jobId)
recordJobProperties(job.jobId, properties)
buildJobDep(job.jobId, job.finalStage)
recordStageDep(job.jobId)
recordStageDepGraph(job.jobId, job.finalStage)
jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
}
}

View file

@ -33,15 +33,16 @@ import spark.storage.BlockManagerId
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
* Each Stage also has a priority, which is (by default) based on the job it was submitted in.
* This allows Stages from earlier jobs to be computed first or recovered faster on failure.
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*/
private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val priority: Int,
val jobId: Int,
callSite: Option[String])
extends Logging {