SPARK-658: Adding logging of stage duration

This commit is contained in:
Patrick Wendell 2013-01-24 15:27:29 -08:00
parent f03d9760fd
commit 07f568e1bf

View file

@ -86,6 +86,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val activeJobs = new HashSet[ActiveJob] val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob]
val stageSubmissionTimes = new HashMap[Stage, Long]
val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
@ -393,6 +394,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
logDebug("New pending tasks: " + myPending) logDebug("New pending tasks: " + myPending)
taskSched.submitTasks( taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority)) new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
if (!stageSubmissionTimes.contains(stage)) {
stageSubmissionTimes.put(stage, System.currentTimeMillis())
}
} else { } else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format( logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@ -407,6 +411,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def handleTaskCompletion(event: CompletionEvent) { def handleTaskCompletion(event: CompletionEvent) {
val task = event.task val task = event.task
val stage = idToStage(task.stageId) val stage = idToStage(task.stageId)
def stageFinished(stage: Stage) = {
val serviceTime = stageSubmissionTimes.remove(stage) match {
case Some(t) => (System.currentTimeMillis() - t).toString
case _ => "Unkown"
}
logInfo("%s (%s) finished in %s ms".format(stage, stage.origin, serviceTime))
running -= stage
}
event.reason match { event.reason match {
case Success => case Success =>
logInfo("Completed " + task) logInfo("Completed " + task)
@ -421,13 +434,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (!job.finished(rt.outputId)) { if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true job.finished(rt.outputId) = true
job.numFinished += 1 job.numFinished += 1
job.listener.taskSucceeded(rt.outputId, event.result)
// If the whole job has finished, remove it // If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) { if (job.numFinished == job.numPartitions) {
activeJobs -= job activeJobs -= job
resultStageToJob -= stage resultStageToJob -= stage
running -= stage stageFinished(stage)
} }
job.listener.taskSucceeded(rt.outputId, event.result)
} }
case None => case None =>
logInfo("Ignoring result from " + rt + " because its job has finished") logInfo("Ignoring result from " + rt + " because its job has finished")
@ -444,8 +457,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
stage.addOutputLoc(smt.partition, status) stage.addOutputLoc(smt.partition, status)
} }
if (running.contains(stage) && pendingTasks(stage).isEmpty) { if (running.contains(stage) && pendingTasks(stage).isEmpty) {
logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages") stageFinished(stage)
running -= stage logInfo("looking for newly runnable stages")
logInfo("running: " + running) logInfo("running: " + running)
logInfo("waiting: " + waiting) logInfo("waiting: " + waiting)
logInfo("failed: " + failed) logInfo("failed: " + failed)