SparkContext.addSparkListener; "std" listener in StatsReportListener

This commit is contained in:
Imran Rashid 2013-02-10 14:19:37 -08:00
parent b7d9e24394
commit 383af599bb
2 changed files with 39 additions and 0 deletions

View file

@ -469,6 +469,10 @@ class SparkContext(
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
}
def addSparkListener(listener: SparkListener) {
dagScheduler.sparkListeners += listener
}
/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.

View file

@ -1,5 +1,8 @@
package spark.scheduler
import spark.util.Distribution
import spark.{Utils, Logging}
trait SparkListener {
def onStageCompleted(stageCompleted: StageCompleted)
}
@ -7,3 +10,35 @@ trait SparkListener {
sealed trait SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
class StatsReportListener extends SparkListener with Logging {
def onStageCompleted(stageCompleted: StageCompleted) {
println("Finished stage: " + stageCompleted.stageInfo)
showDistribution("task runtime:", stageCompleted.stageInfo.getTaskRuntimeDistribution, "%4.0f")
showDistribution("shuffle bytes written:", stageCompleted.stageInfo.getShuffleBytesWrittenDistribution, d => Utils.memoryBytesToString(d.toLong))
showDistribution("fetch wait time:",stageCompleted.stageInfo.getRemoteFetchWaitTimeDistribution, "%4.0f")
showDistribution("remote bytes read:", stageCompleted.stageInfo.getRemoteBytesReadDistribution, d => Utils.memoryBytesToString(d.toLong))
}
//for profiling, the extremes are more interesting
val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
val probabilities = percentiles.map{_ / 100.0}
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
def f(d:Double) = format.format(d)
showDistribution(heading, dOpt, f _)
}
def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) {
dOpt.foreach { d =>
val stats = d.statCounter
logInfo(heading + stats)
val quantiles = d.getQuantiles(probabilities).map{formatNumber}
logInfo(percentilesHeader)
logInfo("\t" + quantiles.mkString("\t"))
}
}
}