Renamed 'priority' to 'jobId' and assorted minor changes

This commit is contained in:
Mark Hamstra 2013-08-16 12:14:52 -07:00
parent 8cae72e94e
commit ad18410427
5 changed files with 60 additions and 59 deletions

View file

@ -25,7 +25,7 @@ import java.util.Properties
* Tracks information about an active job in the DAGScheduler. * Tracks information about an active job in the DAGScheduler.
*/ */
private[spark] class ActiveJob( private[spark] class ActiveJob(
val runId: Int, val jobId: Int,
val finalStage: Stage, val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _, val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int], val partitions: Array[Int],

View file

@ -104,11 +104,11 @@ class DAGScheduler(
private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent] private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
val nextRunId = new AtomicInteger(0) val nextJobId = new AtomicInteger(0)
val nextStageId = new AtomicInteger(0) val nextStageId = new AtomicInteger(0)
val idToStage = new TimeStampedHashMap[Int, Stage] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
val shuffleToMapStage = new TimeStampedHashMap[Int, Stage] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
@ -171,14 +171,14 @@ class DAGScheduler(
/** /**
* Get or create a shuffle map stage for the given shuffle dependency's map side. * Get or create a shuffle map stage for the given shuffle dependency's map side.
* The priority value passed in will be used if the stage doesn't already exist with * The jobId value passed in will be used if the stage doesn't already exist with
* a lower priority (we assume that priorities always increase across jobs for now). * a lower jobId (jobId always increases across jobs.)
*/ */
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = { private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match { shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage case Some(stage) => stage
case None => case None =>
val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority) val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage shuffleToMapStage(shuffleDep.shuffleId) = stage
stage stage
} }
@ -186,13 +186,13 @@ class DAGScheduler(
/** /**
* Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or * Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
* as a result stage for the final RDD used directly in an action. The stage will also be given * as a result stage for the final RDD used directly in an action. The stage will also be
* the provided priority. * associated with the provided jobId.
*/ */
private def newStage( private def newStage(
rdd: RDD[_], rdd: RDD[_],
shuffleDep: Option[ShuffleDependency[_,_]], shuffleDep: Option[ShuffleDependency[_,_]],
priority: Int, jobId: Int,
callSite: Option[String] = None) callSite: Option[String] = None)
: Stage = : Stage =
{ {
@ -203,17 +203,17 @@ class DAGScheduler(
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
} }
val id = nextStageId.getAndIncrement() val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite) val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
idToStage(id) = stage stageIdToStage(id) = stage
stageToInfos(stage) = StageInfo(stage) stageToInfos(stage) = StageInfo(stage)
stage stage
} }
/** /**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the * Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided priority if they haven't already been created with a lower priority. * provided jobId if they haven't already been created with a lower jobId.
*/ */
private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = { private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage] val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]] val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) { def visit(r: RDD[_]) {
@ -224,7 +224,7 @@ class DAGScheduler(
for (dep <- r.dependencies) { for (dep <- r.dependencies) {
dep match { dep match {
case shufDep: ShuffleDependency[_,_] => case shufDep: ShuffleDependency[_,_] =>
parents += getShuffleMapStage(shufDep, priority) parents += getShuffleMapStage(shufDep, jobId)
case _ => case _ =>
visit(dep.rdd) visit(dep.rdd)
} }
@ -245,7 +245,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) { for (dep <- rdd.dependencies) {
dep match { dep match {
case shufDep: ShuffleDependency[_,_] => case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority) val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) { if (!mapStage.isAvailable) {
missing += mapStage missing += mapStage
} }
@ -282,7 +282,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter, val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
properties) properties)
return (toSubmit, waiter) (toSubmit, waiter)
} }
def runJob[T, U: ClassManifest]( def runJob[T, U: ClassManifest](
@ -329,8 +329,8 @@ class DAGScheduler(
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray val partitions = (0 until rdd.partitions.size).toArray
eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener, properties)) eventQueue.put(JobSubmitted(rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
return listener.awaitResult() // Will throw an exception if the job fails listener.awaitResult() // Will throw an exception if the job fails
} }
/** /**
@ -340,11 +340,11 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match { event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) => case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
val runId = nextRunId.getAndIncrement() val jobId = nextJobId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId, Some(callSite)) val finalStage = newStage(finalRDD, None, jobId, Some(callSite))
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties) val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs() clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")") " output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents) logInfo("Parents of final stage: " + finalStage.parents)
@ -354,7 +354,7 @@ class DAGScheduler(
runLocally(job) runLocally(job)
} else { } else {
listenerBus.post(SparkListenerJobStart(job, properties)) listenerBus.post(SparkListenerJobStart(job, properties))
idToActiveJob(runId) = job idToActiveJob(jobId) = job
activeJobs += job activeJobs += job
resultStageToJob(finalStage) = job resultStageToJob(finalStage) = job
submitStage(finalStage) submitStage(finalStage)
@ -375,7 +375,7 @@ class DAGScheduler(
handleTaskCompletion(completion) handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) => case TaskSetFailed(taskSet, reason) =>
abortStage(idToStage(taskSet.stageId), reason) abortStage(stageIdToStage(taskSet.stageId), reason)
case StopDAGScheduler => case StopDAGScheduler =>
// Cancel any active jobs // Cancel any active jobs
@ -386,7 +386,7 @@ class DAGScheduler(
} }
return true return true
} }
return false false
} }
/** /**
@ -398,7 +398,7 @@ class DAGScheduler(
clearCacheLocs() clearCacheLocs()
val failed2 = failed.toArray val failed2 = failed.toArray
failed.clear() failed.clear()
for (stage <- failed2.sortBy(_.priority)) { for (stage <- failed2.sortBy(_.jobId)) {
submitStage(stage) submitStage(stage)
} }
} }
@ -416,7 +416,7 @@ class DAGScheduler(
logTrace("failed: " + failed) logTrace("failed: " + failed)
val waiting2 = waiting.toArray val waiting2 = waiting.toArray
waiting.clear() waiting.clear()
for (stage <- waiting2.sortBy(_.priority)) { for (stage <- waiting2.sortBy(_.jobId)) {
submitStage(stage) submitStage(stage)
} }
} }
@ -463,7 +463,7 @@ class DAGScheduler(
*/ */
protected def runLocally(job: ActiveJob) { protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally") logInfo("Computing the requested partition locally")
new Thread("Local computation of job " + job.runId) { new Thread("Local computation of job " + job.jobId) {
override def run() { override def run() {
runLocallyWithinThread(job) runLocallyWithinThread(job)
} }
@ -531,7 +531,7 @@ class DAGScheduler(
} }
// must be run listener before possible NotSerializableException // must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded" // should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.priority).properties val properties = idToActiveJob(stage.jobId).properties
listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
if (tasks.size > 0) { if (tasks.size > 0) {
@ -552,7 +552,7 @@ class DAGScheduler(
myPending ++= tasks myPending ++= tasks
logDebug("New pending tasks: " + myPending) logDebug("New pending tasks: " + myPending)
taskSched.submitTasks( taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties)) new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
if (!stage.submissionTime.isDefined) { if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis()) stage.submissionTime = Some(System.currentTimeMillis())
} }
@ -569,7 +569,7 @@ class DAGScheduler(
*/ */
private def handleTaskCompletion(event: CompletionEvent) { private def handleTaskCompletion(event: CompletionEvent) {
val task = event.task val task = event.task
val stage = idToStage(task.stageId) val stage = stageIdToStage(task.stageId)
def markStageAsFinished(stage: Stage) = { def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.submissionTime match { val serviceTime = stage.submissionTime match {
@ -598,7 +598,7 @@ class DAGScheduler(
job.numFinished += 1 job.numFinished += 1
// If the whole job has finished, remove it // If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) { if (job.numFinished == job.numPartitions) {
idToActiveJob -= stage.priority idToActiveJob -= stage.jobId
activeJobs -= job activeJobs -= job
resultStageToJob -= stage resultStageToJob -= stage
markStageAsFinished(stage) markStageAsFinished(stage)
@ -635,7 +635,7 @@ class DAGScheduler(
mapOutputTracker.registerMapOutputs( mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId, stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
true) changeGeneration = true)
} }
clearCacheLocs() clearCacheLocs()
if (stage.outputLocs.count(_ == Nil) != 0) { if (stage.outputLocs.count(_ == Nil) != 0) {
@ -669,7 +669,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable // Mark the stage that the reducer was in as unrunnable
val failedStage = idToStage(task.stageId) val failedStage = stageIdToStage(task.stageId)
running -= failedStage running -= failedStage
failed += failedStage failed += failedStage
// TODO: Cancel running tasks in the stage // TODO: Cancel running tasks in the stage
@ -697,7 +697,7 @@ class DAGScheduler(
case other => case other =>
// Unrecognized failure - abort all jobs depending on this stage // Unrecognized failure - abort all jobs depending on this stage
abortStage(idToStage(task.stageId), task + " failed: " + other) abortStage(stageIdToStage(task.stageId), task + " failed: " + other)
} }
} }
@ -718,7 +718,7 @@ class DAGScheduler(
for ((shuffleId, stage) <- shuffleToMapStage) { for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId) stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, true) mapOutputTracker.registerMapOutputs(shuffleId, locs, changeGeneration = true)
} }
if (shuffleToMapStage.isEmpty) { if (shuffleToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch() mapOutputTracker.incrementEpoch()
@ -750,7 +750,7 @@ class DAGScheduler(
val error = new SparkException("Job failed: " + reason) val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error) job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
idToActiveJob -= resultStage.priority idToActiveJob -= resultStage.jobId
activeJobs -= job activeJobs -= job
resultStageToJob -= resultStage resultStageToJob -= resultStage
} }
@ -774,7 +774,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) { for (dep <- rdd.dependencies) {
dep match { dep match {
case shufDep: ShuffleDependency[_,_] => case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority) val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) { if (!mapStage.isAvailable) {
visitedStages += mapStage visitedStages += mapStage
visit(mapStage.rdd) visit(mapStage.rdd)
@ -812,13 +812,13 @@ class DAGScheduler(
} }
case _ => case _ =>
}) })
return Nil Nil
} }
private def cleanup(cleanupTime: Long) { private def cleanup(cleanupTime: Long) {
var sizeBefore = idToStage.size var sizeBefore = stageIdToStage.size
idToStage.clearOldValues(cleanupTime) stageIdToStage.clearOldValues(cleanupTime)
logInfo("idToStage " + sizeBefore + " --> " + idToStage.size) logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size)
sizeBefore = shuffleToMapStage.size sizeBefore = shuffleToMapStage.size
shuffleToMapStage.clearOldValues(cleanupTime) shuffleToMapStage.clearOldValues(cleanupTime)

View file

@ -21,7 +21,7 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends
}) })
metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get() override def getValue: Int = dagScheduler.nextJobId.get()
}) })
metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {

View file

@ -102,7 +102,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
protected def buildJobDep(jobID: Int, stage: Stage) { protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.priority == jobID) { if (stage.jobId == jobID) {
jobIDToStages.get(jobID) match { jobIDToStages.get(jobID) match {
case Some(stageList) => stageList += stage case Some(stageList) => stageList += stage
case None => val stageList = new ListBuffer[Stage] case None => val stageList = new ListBuffer[Stage]
@ -178,12 +178,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}else{ }else{
stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE" stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
} }
if (stage.priority == jobID) { if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false) jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent) recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2)) stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
} else } else
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false) jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
} }
// Record task metrics into job log files // Record task metrics into job log files
@ -260,7 +260,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onJobEnd(jobEnd: SparkListenerJobEnd) { override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job val job = jobEnd.job
var info = "JOB_ID=" + job.runId var info = "JOB_ID=" + job.jobId
jobEnd.jobResult match { jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS" case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) => case JobFailed(exception, _) =>
@ -268,8 +268,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
exception.getMessage.split("\\s+").foreach(info += _ + "_") exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ => case _ =>
} }
jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase) jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.runId) closeLogWriter(job.jobId)
} }
protected def recordJobProperties(jobID: Int, properties: Properties) { protected def recordJobProperties(jobID: Int, properties: Properties) {
@ -282,11 +282,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onJobStart(jobStart: SparkListenerJobStart) { override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job val job = jobStart.job
val properties = jobStart.properties val properties = jobStart.properties
createLogWriter(job.runId) createLogWriter(job.jobId)
recordJobProperties(job.runId, properties) recordJobProperties(job.jobId, properties)
buildJobDep(job.runId, job.finalStage) buildJobDep(job.jobId, job.finalStage)
recordStageDep(job.runId) recordStageDep(job.jobId)
recordStageDepGraph(job.runId, job.finalStage) recordStageDepGraph(job.jobId, job.finalStage)
jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED") jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
} }
} }

View file

@ -33,15 +33,16 @@ import spark.storage.BlockManagerId
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on. * that each output partition is on.
* *
* Each Stage also has a priority, which is (by default) based on the job it was submitted in. * Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* This allows Stages from earlier jobs to be computed first or recovered faster on failure. * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*/ */
private[spark] class Stage( private[spark] class Stage(
val id: Int, val id: Int,
val rdd: RDD[_], val rdd: RDD[_],
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage], val parents: List[Stage],
val priority: Int, val jobId: Int,
callSite: Option[String]) callSite: Option[String])
extends Logging { extends Logging {