SPARK-3428. TaskMetrics for running tasks is missing GC time metrics

Author: Sandy Ryza <sandy@cloudera.com>

Closes #3684 from sryza/sandy-spark-3428 and squashes the following commits:

cb827fe [Sandy Ryza] SPARK-3428. TaskMetrics for running tasks is missing GC time metrics
This commit is contained in:
Sandy Ryza 2014-12-18 22:40:44 -08:00 committed by Josh Rosen
parent d7fc69a8b5
commit 283263ffaa

View file

@ -145,6 +145,8 @@ private[spark] class Executor(
} }
} }
private def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
class TaskRunner( class TaskRunner(
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer) execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable { extends Runnable {
@ -152,6 +154,7 @@ private[spark] class Executor(
@volatile private var killed = false @volatile private var killed = false
@volatile var task: Task[Any] = _ @volatile var task: Task[Any] = _
@volatile var attemptedTask: Option[Task[Any]] = None @volatile var attemptedTask: Option[Task[Any]] = None
@volatile var startGCTime: Long = _
def kill(interruptThread: Boolean) { def kill(interruptThread: Boolean) {
logInfo(s"Executor is trying to kill $taskName (TID $taskId)") logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
@ -168,8 +171,7 @@ private[spark] class Executor(
logInfo(s"Running $taskName (TID $taskId)") logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0 var taskStart: Long = 0
def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum startGCTime = gcTime
val startGCTime = gcTime
try { try {
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
@ -376,10 +378,13 @@ private[spark] class Executor(
while (!isStopped) { while (!isStopped) {
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
val curGCTime = gcTime
for (taskRunner <- runningTasks.values()) { for (taskRunner <- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) { if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics metrics.updateShuffleReadMetrics
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) { if (isLocal) {
// JobProgressListener will hold an reference of it during // JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see // onExecutorMetricsUpdate(), then JobProgressListener can not see