From ad18410427190572f90754624469a7e806c78971 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 16 Aug 2013 12:14:52 -0700 Subject: [PATCH 1/2] Renamed 'priority' to 'jobId' and assorted minor changes --- .../scala/spark/scheduler/ActiveJob.scala | 2 +- .../scala/spark/scheduler/DAGScheduler.scala | 84 +++++++++---------- .../spark/scheduler/DAGSchedulerSource.scala | 2 +- .../scala/spark/scheduler/JobLogger.scala | 24 +++--- .../main/scala/spark/scheduler/Stage.scala | 7 +- 5 files changed, 60 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala index 71cc94edb6..fecc3e9648 100644 --- a/core/src/main/scala/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala @@ -25,7 +25,7 @@ import java.util.Properties * Tracks information about an active job in the DAGScheduler. */ private[spark] class ActiveJob( - val runId: Int, + val jobId: Int, val finalStage: Stage, val func: (TaskContext, Iterator[_]) => _, val partitions: Array[Int], diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 35b31f45a7..7823d0c8cf 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -104,11 +104,11 @@ class DAGScheduler( private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent] - val nextRunId = new AtomicInteger(0) + val nextJobId = new AtomicInteger(0) val nextStageId = new AtomicInteger(0) - val idToStage = new TimeStampedHashMap[Int, Stage] + val stageIdToStage = new TimeStampedHashMap[Int, Stage] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage] @@ -171,14 +171,14 @@ class DAGScheduler( /** * Get or create a shuffle map stage for the given shuffle dependency's map side. - * The priority value passed in will be used if the stage doesn't already exist with - * a lower priority (we assume that priorities always increase across jobs for now). + * The jobId value passed in will be used if the stage doesn't already exist with + * a lower jobId (jobId always increases across jobs.) */ - private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = { + private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => - val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority) + val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage } @@ -186,13 +186,13 @@ class DAGScheduler( /** * Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or - * as a result stage for the final RDD used directly in an action. The stage will also be given - * the provided priority. + * as a result stage for the final RDD used directly in an action. The stage will also be + * associated with the provided jobId. */ private def newStage( rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], - priority: Int, + jobId: Int, callSite: Option[String] = None) : Stage = { @@ -203,17 +203,17 @@ class DAGScheduler( mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) } val id = nextStageId.getAndIncrement() - val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite) - idToStage(id) = stage + val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) + stageIdToStage(id) = stage stageToInfos(stage) = StageInfo(stage) stage } /** * Get or create the list of parent stages for a given RDD. The stages will be assigned the - * provided priority if they haven't already been created with a lower priority. + * provided jobId if they haven't already been created with a lower jobId. */ - private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = { + private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(r: RDD[_]) { @@ -224,7 +224,7 @@ class DAGScheduler( for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => - parents += getShuffleMapStage(shufDep, priority) + parents += getShuffleMapStage(shufDep, jobId) case _ => visit(dep.rdd) } @@ -245,7 +245,7 @@ class DAGScheduler( for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => - val mapStage = getShuffleMapStage(shufDep, stage.priority) + val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { missing += mapStage } @@ -282,7 +282,7 @@ class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) - return (toSubmit, waiter) + (toSubmit, waiter) } def runJob[T, U: ClassManifest]( @@ -329,8 +329,8 @@ class DAGScheduler( val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.partitions.size).toArray - eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener, properties)) - return listener.awaitResult() // Will throw an exception if the job fails + eventQueue.put(JobSubmitted(rdd, func2, partitions, allowLocal = false, callSite, listener, properties)) + listener.awaitResult() // Will throw an exception if the job fails } /** @@ -340,11 +340,11 @@ class DAGScheduler( private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { event match { case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) => - val runId = nextRunId.getAndIncrement() - val finalStage = newStage(finalRDD, None, runId, Some(callSite)) - val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties) + val jobId = nextJobId.getAndIncrement() + val finalStage = newStage(finalRDD, None, jobId, Some(callSite)) + val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() - logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + + logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) @@ -354,7 +354,7 @@ class DAGScheduler( runLocally(job) } else { listenerBus.post(SparkListenerJobStart(job, properties)) - idToActiveJob(runId) = job + idToActiveJob(jobId) = job activeJobs += job resultStageToJob(finalStage) = job submitStage(finalStage) @@ -375,7 +375,7 @@ class DAGScheduler( handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => - abortStage(idToStage(taskSet.stageId), reason) + abortStage(stageIdToStage(taskSet.stageId), reason) case StopDAGScheduler => // Cancel any active jobs @@ -386,7 +386,7 @@ class DAGScheduler( } return true } - return false + false } /** @@ -398,7 +398,7 @@ class DAGScheduler( clearCacheLocs() val failed2 = failed.toArray failed.clear() - for (stage <- failed2.sortBy(_.priority)) { + for (stage <- failed2.sortBy(_.jobId)) { submitStage(stage) } } @@ -416,7 +416,7 @@ class DAGScheduler( logTrace("failed: " + failed) val waiting2 = waiting.toArray waiting.clear() - for (stage <- waiting2.sortBy(_.priority)) { + for (stage <- waiting2.sortBy(_.jobId)) { submitStage(stage) } } @@ -463,7 +463,7 @@ class DAGScheduler( */ protected def runLocally(job: ActiveJob) { logInfo("Computing the requested partition locally") - new Thread("Local computation of job " + job.runId) { + new Thread("Local computation of job " + job.jobId) { override def run() { runLocallyWithinThread(job) } @@ -531,7 +531,7 @@ class DAGScheduler( } // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" - val properties = idToActiveJob(stage.priority).properties + val properties = idToActiveJob(stage.jobId).properties listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) if (tasks.size > 0) { @@ -552,7 +552,7 @@ class DAGScheduler( myPending ++= tasks logDebug("New pending tasks: " + myPending) taskSched.submitTasks( - new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties)) + new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) if (!stage.submissionTime.isDefined) { stage.submissionTime = Some(System.currentTimeMillis()) } @@ -569,7 +569,7 @@ class DAGScheduler( */ private def handleTaskCompletion(event: CompletionEvent) { val task = event.task - val stage = idToStage(task.stageId) + val stage = stageIdToStage(task.stageId) def markStageAsFinished(stage: Stage) = { val serviceTime = stage.submissionTime match { @@ -598,7 +598,7 @@ class DAGScheduler( job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { - idToActiveJob -= stage.priority + idToActiveJob -= stage.jobId activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) @@ -635,7 +635,7 @@ class DAGScheduler( mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, - true) + changeGeneration = true) } clearCacheLocs() if (stage.outputLocs.count(_ == Nil) != 0) { @@ -669,7 +669,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable - val failedStage = idToStage(task.stageId) + val failedStage = stageIdToStage(task.stageId) running -= failedStage failed += failedStage // TODO: Cancel running tasks in the stage @@ -697,7 +697,7 @@ class DAGScheduler( case other => // Unrecognized failure - abort all jobs depending on this stage - abortStage(idToStage(task.stageId), task + " failed: " + other) + abortStage(stageIdToStage(task.stageId), task + " failed: " + other) } } @@ -718,7 +718,7 @@ class DAGScheduler( for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray - mapOutputTracker.registerMapOutputs(shuffleId, locs, true) + mapOutputTracker.registerMapOutputs(shuffleId, locs, changeGeneration = true) } if (shuffleToMapStage.isEmpty) { mapOutputTracker.incrementEpoch() @@ -750,7 +750,7 @@ class DAGScheduler( val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) - idToActiveJob -= resultStage.priority + idToActiveJob -= resultStage.jobId activeJobs -= job resultStageToJob -= resultStage } @@ -774,7 +774,7 @@ class DAGScheduler( for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => - val mapStage = getShuffleMapStage(shufDep, stage.priority) + val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { visitedStages += mapStage visit(mapStage.rdd) @@ -812,13 +812,13 @@ class DAGScheduler( } case _ => }) - return Nil + Nil } private def cleanup(cleanupTime: Long) { - var sizeBefore = idToStage.size - idToStage.clearOldValues(cleanupTime) - logInfo("idToStage " + sizeBefore + " --> " + idToStage.size) + var sizeBefore = stageIdToStage.size + stageIdToStage.clearOldValues(cleanupTime) + logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size) sizeBefore = shuffleToMapStage.size shuffleToMapStage.clearOldValues(cleanupTime) diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala index 87d27cc70d..98c4fb7e59 100644 --- a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala @@ -21,7 +21,7 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends }) metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] { - override def getValue: Int = dagScheduler.nextRunId.get() + override def getValue: Int = dagScheduler.nextJobId.get() }) metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] { diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 7194fcaa49..1bc9fabdff 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -102,7 +102,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) protected def buildJobDep(jobID: Int, stage: Stage) { - if (stage.priority == jobID) { + if (stage.jobId == jobID) { jobIDToStages.get(jobID) match { case Some(stageList) => stageList += stage case None => val stageList = new ListBuffer[Stage] @@ -178,12 +178,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }else{ stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE" } - if (stage.priority == jobID) { + if (stage.jobId == jobID) { jobLogInfo(jobID, indentString(indent) + stageInfo, false) recordRddInStageGraph(jobID, stage.rdd, indent) stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2)) } else - jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false) + jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false) } // Record task metrics into job log files @@ -260,7 +260,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { override def onJobEnd(jobEnd: SparkListenerJobEnd) { val job = jobEnd.job - var info = "JOB_ID=" + job.runId + var info = "JOB_ID=" + job.jobId jobEnd.jobResult match { case JobSucceeded => info += " STATUS=SUCCESS" case JobFailed(exception, _) => @@ -268,8 +268,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { exception.getMessage.split("\\s+").foreach(info += _ + "_") case _ => } - jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase) - closeLogWriter(job.runId) + jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase) + closeLogWriter(job.jobId) } protected def recordJobProperties(jobID: Int, properties: Properties) { @@ -282,11 +282,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { override def onJobStart(jobStart: SparkListenerJobStart) { val job = jobStart.job val properties = jobStart.properties - createLogWriter(job.runId) - recordJobProperties(job.runId, properties) - buildJobDep(job.runId, job.finalStage) - recordStageDep(job.runId) - recordStageDepGraph(job.runId, job.finalStage) - jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED") + createLogWriter(job.jobId) + recordJobProperties(job.jobId, properties) + buildJobDep(job.jobId, job.finalStage) + recordStageDep(job.jobId) + recordStageDepGraph(job.jobId, job.finalStage) + jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED") } } diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 5428daeb94..c599c00ac4 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -33,15 +33,16 @@ import spark.storage.BlockManagerId * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes * that each output partition is on. * - * Each Stage also has a priority, which is (by default) based on the job it was submitted in. - * This allows Stages from earlier jobs to be computed first or recovered faster on failure. + * Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO + * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered + * faster on failure. */ private[spark] class Stage( val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage val parents: List[Stage], - val priority: Int, + val jobId: Int, callSite: Option[String]) extends Logging { From 1630fbf838fff101e36324934a165f95fb324482 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 20 Aug 2013 00:17:16 -0700 Subject: [PATCH 2/2] changeGeneration --> changeEpoch renaming --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 7823d0c8cf..9402f18a0f 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -635,7 +635,7 @@ class DAGScheduler( mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, - changeGeneration = true) + changeEpoch = true) } clearCacheLocs() if (stage.outputLocs.count(_ == Nil) != 0) { @@ -718,7 +718,7 @@ class DAGScheduler( for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray - mapOutputTracker.registerMapOutputs(shuffleId, locs, changeGeneration = true) + mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) } if (shuffleToMapStage.isEmpty) { mapOutputTracker.incrementEpoch()