diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 951fe1ae3b..f97164669e 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -16,17 +16,24 @@ case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents class StatsReportListener extends SparkListener with Logging { def onStageCompleted(stageCompleted: StageCompleted) { import spark.scheduler.StatsReportListener._ - logInfo("Finished stage: " + stageCompleted.stageInfo) + this.logInfo("Finished stage: " + stageCompleted.stageInfo) showMillisDistribution("task runtime:", stageCompleted.stageInfo.getTaskRuntimeDistribution) showBytesDistribution("shuffle bytes written:", stageCompleted.stageInfo.getShuffleBytesWrittenDistribution) + + //fetch & some io info showMillisDistribution("fetch wait time:",stageCompleted.stageInfo.getRemoteFetchWaitTimeDistribution) showBytesDistribution("remote bytes read:", stageCompleted.stageInfo.getRemoteBytesReadDistribution) showBytesDistribution("task result size:", stageCompleted.stageInfo.getTaskResultSizeDistribution) + + //runtime breakdown + showDistribution("executor (non-fetch) time pct: ", stageCompleted.stageInfo.getExectuorRuntimePercentage, "%2.0f \\%") + showDistribution("fetch wait time pct: ", stageCompleted.stageInfo.getFetchRuntimePercentage, "%2.0f \\%") + showDistribution("other time pct: ", stageCompleted.stageInfo.getOtherRuntimePercentage, "%2.0f \\%") } } -object StatsReportListener { +object StatsReportListener extends Logging { //for profiling, the extremes are more interesting val percentiles = Array[Int](0,5,10,25,50,75,90,95,100) diff --git a/core/src/main/scala/spark/scheduler/StageInfo.scala b/core/src/main/scala/spark/scheduler/StageInfo.scala index 299f43d1c5..ac02d3445c 100644 --- a/core/src/main/scala/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/spark/scheduler/StageInfo.scala @@ -30,6 +30,43 @@ case class StageInfo( } def getTaskResultSizeDistribution = { - Distribution(taskMetrics.flatMap{_.resultSize.map{_.toDouble}}) + Distribution(taskMetrics.map{_.resultSize.toDouble}) + } + + lazy val runtimePercentages = taskMetrics.zip(taskInfos).map{ + case (metrics, info) => RuntimePercentage(info.duration, metrics) + } + + /** + * distribution of the percentage of task runtime of the executor itself, excluding time spent waiting on a fetch + */ + def getExectuorRuntimePercentage = { + Distribution(runtimePercentages.map{_.executorPct}) + } + + /** + * distribution of the percentage of task runtime spent waiting on a fetch + */ + def getFetchRuntimePercentage = { + Distribution(runtimePercentages.flatMap{_.fetchPct}) + } + + /** + * distribution of the percentage of task runtime spent not waiting on fetch, and not actively executing on + * a remote machine (eg., serializing task, sending it over network, sending results back over network) + */ + def getOtherRuntimePercentage = { + Distribution(runtimePercentages.map{_.other}) + } +} + +private[spark] case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) +private[spark] object RuntimePercentage { + def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { + val denom = totalTime.toDouble + val fetch = metrics.remoteFetchWaitTime.map{_ / denom} + val exec = (metrics.executorRunTime - metrics.remoteFetchWaitTime.getOrElse(0l)) / denom + val other = 1.0 - (exec + fetch.getOrElse(0d)) + RuntimePercentage(exec, fetch, other) } }