From 59e53fa21caa202a57093c74ada128fca2be5bac Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Tue, 17 Dec 2013 17:57:27 +0800 Subject: [PATCH] spark-968, changes for avoid a NPE --- .../apache/spark/ui/exec/ExecutorsUI.scala | 30 ++++++++++--------- .../spark/ui/jobs/JobProgressListener.scala | 24 ++++++++------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 808bbe8c8f..f62ae37466 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -150,7 +150,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { activeTasks += taskStart.taskInfo } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration @@ -168,20 +168,22 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } // update shuffle read/write - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead - executorToShuffleRead.put(eid, newShuffleRead) - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten - executorToShuffleWrite.put(eid, newShuffleWrite) + if (null != taskEnd.taskMetrics) { + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead + executorToShuffleRead.put(eid, newShuffleRead) + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten + executorToShuffleWrite.put(eid, newShuffleWrite) + } + case _ => {} } - case _ => {} } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 8c92ff19a6..64ce715993 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -147,18 +147,20 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList y.duration += taskEnd.taskInfo.duration // update shuffle read/write - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - y.shuffleRead += s.remoteBytesRead - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - y.shuffleWrite += s.shuffleBytesWritten + if (null != taskEnd.taskMetrics) { + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + y.shuffleRead += s.remoteBytesRead + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + y.shuffleWrite += s.shuffleBytesWritten + } + case _ => {} } - case _ => {} } } case _ => {}