add runtime breakdowns

This commit is contained in:
Imran Rashid 2013-02-11 18:37:07 -08:00
parent 176cb20703
commit 6f62a57858
2 changed files with 47 additions and 3 deletions

View file

@ -16,17 +16,24 @@ case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
class StatsReportListener extends SparkListener with Logging { class StatsReportListener extends SparkListener with Logging {
def onStageCompleted(stageCompleted: StageCompleted) { def onStageCompleted(stageCompleted: StageCompleted) {
import spark.scheduler.StatsReportListener._ import spark.scheduler.StatsReportListener._
logInfo("Finished stage: " + stageCompleted.stageInfo) this.logInfo("Finished stage: " + stageCompleted.stageInfo)
showMillisDistribution("task runtime:", stageCompleted.stageInfo.getTaskRuntimeDistribution) showMillisDistribution("task runtime:", stageCompleted.stageInfo.getTaskRuntimeDistribution)
showBytesDistribution("shuffle bytes written:", stageCompleted.stageInfo.getShuffleBytesWrittenDistribution) showBytesDistribution("shuffle bytes written:", stageCompleted.stageInfo.getShuffleBytesWrittenDistribution)
//fetch & some io info
showMillisDistribution("fetch wait time:",stageCompleted.stageInfo.getRemoteFetchWaitTimeDistribution) showMillisDistribution("fetch wait time:",stageCompleted.stageInfo.getRemoteFetchWaitTimeDistribution)
showBytesDistribution("remote bytes read:", stageCompleted.stageInfo.getRemoteBytesReadDistribution) showBytesDistribution("remote bytes read:", stageCompleted.stageInfo.getRemoteBytesReadDistribution)
showBytesDistribution("task result size:", stageCompleted.stageInfo.getTaskResultSizeDistribution) showBytesDistribution("task result size:", stageCompleted.stageInfo.getTaskResultSizeDistribution)
//runtime breakdown
showDistribution("executor (non-fetch) time pct: ", stageCompleted.stageInfo.getExectuorRuntimePercentage, "%2.0f \\%")
showDistribution("fetch wait time pct: ", stageCompleted.stageInfo.getFetchRuntimePercentage, "%2.0f \\%")
showDistribution("other time pct: ", stageCompleted.stageInfo.getOtherRuntimePercentage, "%2.0f \\%")
} }
} }
object StatsReportListener { object StatsReportListener extends Logging {
//for profiling, the extremes are more interesting //for profiling, the extremes are more interesting
val percentiles = Array[Int](0,5,10,25,50,75,90,95,100) val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)

View file

@ -30,6 +30,43 @@ case class StageInfo(
} }
def getTaskResultSizeDistribution = { def getTaskResultSizeDistribution = {
Distribution(taskMetrics.flatMap{_.resultSize.map{_.toDouble}}) Distribution(taskMetrics.map{_.resultSize.toDouble})
}
lazy val runtimePercentages = taskMetrics.zip(taskInfos).map{
case (metrics, info) => RuntimePercentage(info.duration, metrics)
}
/**
* distribution of the percentage of task runtime of the executor itself, excluding time spent waiting on a fetch
*/
def getExectuorRuntimePercentage = {
Distribution(runtimePercentages.map{_.executorPct})
}
/**
* distribution of the percentage of task runtime spent waiting on a fetch
*/
def getFetchRuntimePercentage = {
Distribution(runtimePercentages.flatMap{_.fetchPct})
}
/**
* distribution of the percentage of task runtime spent not waiting on fetch, and not actively executing on
* a remote machine (eg., serializing task, sending it over network, sending results back over network)
*/
def getOtherRuntimePercentage = {
Distribution(runtimePercentages.map{_.other})
}
}
private[spark] case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
private[spark] object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble
val fetch = metrics.remoteFetchWaitTime.map{_ / denom}
val exec = (metrics.executorRunTime - metrics.remoteFetchWaitTime.getOrElse(0l)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d))
RuntimePercentage(exec, fetch, other)
} }
} }