Renaming stage finished function
This commit is contained in:
parent
07f568e1bf
commit
c423be7d8e
|
@ -412,7 +412,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
|
|||
val task = event.task
|
||||
val stage = idToStage(task.stageId)
|
||||
|
||||
def stageFinished(stage: Stage) = {
|
||||
def markStageAsFinished(stage: Stage) = {
|
||||
val serviceTime = stageSubmissionTimes.remove(stage) match {
|
||||
case Some(t) => (System.currentTimeMillis() - t).toString
|
||||
case _ => "Unkown"
|
||||
|
@ -438,7 +438,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
|
|||
if (job.numFinished == job.numPartitions) {
|
||||
activeJobs -= job
|
||||
resultStageToJob -= stage
|
||||
stageFinished(stage)
|
||||
markStageAsFinished(stage)
|
||||
}
|
||||
job.listener.taskSucceeded(rt.outputId, event.result)
|
||||
}
|
||||
|
@ -457,7 +457,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
|
|||
stage.addOutputLoc(smt.partition, status)
|
||||
}
|
||||
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
|
||||
stageFinished(stage)
|
||||
markStageAsFinished(stage)
|
||||
logInfo("looking for newly runnable stages")
|
||||
logInfo("running: " + running)
|
||||
logInfo("waiting: " + waiting)
|
||||
|
|
Loading…
Reference in a new issue