Incorporate pwendell's code review suggestions.
This commit is contained in:
parent
882d544856
commit
963d6f065a
|
@ -222,9 +222,9 @@ private[spark] class Executor(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
val objectSer = SparkEnv.get.serializer.newInstance()
|
val resultSer = SparkEnv.get.serializer.newInstance()
|
||||||
val beforeSerialization = System.currentTimeMillis()
|
val beforeSerialization = System.currentTimeMillis()
|
||||||
val valueBytes = objectSer.serialize(value)
|
val valueBytes = resultSer.serialize(value)
|
||||||
val afterSerialization = System.currentTimeMillis()
|
val afterSerialization = System.currentTimeMillis()
|
||||||
|
|
||||||
for (m <- task.metrics) {
|
for (m <- task.metrics) {
|
||||||
|
@ -232,7 +232,7 @@ private[spark] class Executor(
|
||||||
m.executorDeserializeTime = (taskStart - startTime).toInt
|
m.executorDeserializeTime = (taskStart - startTime).toInt
|
||||||
m.executorRunTime = (taskFinish - taskStart).toInt
|
m.executorRunTime = (taskFinish - taskStart).toInt
|
||||||
m.jvmGCTime = gcTime - startGCTime
|
m.jvmGCTime = gcTime - startGCTime
|
||||||
m.serializationTime = (afterSerialization - beforeSerialization).toInt
|
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
|
||||||
}
|
}
|
||||||
|
|
||||||
val accumUpdates = Accumulators.values
|
val accumUpdates = Accumulators.values
|
||||||
|
|
|
@ -44,9 +44,9 @@ class TaskMetrics extends Serializable {
|
||||||
var jvmGCTime: Long = _
|
var jvmGCTime: Long = _
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Amount of time spent serializing the result of the task
|
* Amount of time spent serializing the task result
|
||||||
*/
|
*/
|
||||||
var serializationTime: Long = _
|
var resultSerializationTime: Long = _
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
|
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
|
||||||
|
|
|
@ -102,8 +102,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
val serializationTimes = validTasks.map{case (info, metrics, exception) =>
|
val serializationTimes = validTasks.map{case (info, metrics, exception) =>
|
||||||
metrics.get.serializationTime.toDouble}
|
metrics.get.resultSerializationTime.toDouble}
|
||||||
val serializationQuantiles = "Serialization Time" +: Distribution(serializationTimes).get.getQuantiles().map(
|
val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
|
||||||
ms => parent.formatDuration(ms.toLong))
|
ms => parent.formatDuration(ms.toLong))
|
||||||
|
|
||||||
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
|
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
|
||||||
|
|
|
@ -314,7 +314,6 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
||||||
|
|
||||||
def createTaskResult(id: Int): DirectTaskResult[Int] = {
|
def createTaskResult(id: Int): DirectTaskResult[Int] = {
|
||||||
val objectSer = SparkEnv.get.serializer.newInstance()
|
val objectSer = SparkEnv.get.serializer.newInstance()
|
||||||
new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty,
|
new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, new TaskMetrics)
|
||||||
new TaskMetrics)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue