[SPARK-7058] Include RDD deserialization time in "task deserialization time" metric

The web UI's "task deserialization time" metric is slightly misleading because it does not capture the time taken to deserialize the broadcasted RDD.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #5635 from JoshRosen/SPARK-7058 and squashes the following commits:

ed90f75 [Josh Rosen] Update UI tooltip
a3743b4 [Josh Rosen] Update comments.
4f52910 [Josh Rosen] Roll back whitespace change
e9cf9f4 [Josh Rosen] Remove unused variable
9f32e55 [Josh Rosen] Expose executorDeserializeTime on Task instead of pushing runtime calculation into Task.
21f5b47 [Josh Rosen] Don't double-count the broadcast deserialization time in task runtime
1752f0e [Josh Rosen] [SPARK-7058] Incorporate RDD deserialization time in task deserialization time metric
This commit is contained in:
Josh Rosen 2015-04-23 13:19:03 -07:00 committed by Kay Ousterhout
parent c1213e6a92
commit 6afde2c781
5 changed files with 20 additions and 3 deletions

View file

@ -220,8 +220,12 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
m.setExecutorRunTime(taskFinish - taskStart)
// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
m.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}

View file

@ -53,9 +53,11 @@ private[spark] class ResultTask[T, U](
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))

View file

@ -56,9 +56,11 @@ private[spark] class ShuffleMapTask(
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null

View file

@ -87,11 +87,18 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
protected var _executorDeserializeTime: Long = 0
/**
* Whether the task has been killed.
*/
def killed: Boolean = _killed
/**
* Returns the amount of time spent deserializing the RDD and function to be run.
*/
def executorDeserializeTime: Long = _executorDeserializeTime
/**
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can

View file

@ -24,7 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""
val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor."
val TASK_DESERIALIZATION_TIME =
"""Time spent deserializing the task closure on the executor, including the time to read the
broadcasted task."""
val SHUFFLE_READ_BLOCKED_TIME =
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."