spark-968, changes for avoid a NPE

This commit is contained in:
wangda.tan 2013-12-17 17:57:27 +08:00
parent 36060f4f50
commit 59e53fa21c
2 changed files with 29 additions and 25 deletions

View file

@ -150,7 +150,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks += taskStart.taskInfo
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
@ -168,20 +168,22 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
// update shuffle read/write
val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
shuffleRead match {
case Some(s) =>
val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
executorToShuffleRead.put(eid, newShuffleRead)
case _ => {}
}
val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
shuffleWrite match {
case Some(s) => {
val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
executorToShuffleWrite.put(eid, newShuffleWrite)
if (null != taskEnd.taskMetrics) {
val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
shuffleRead match {
case Some(s) =>
val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
executorToShuffleRead.put(eid, newShuffleRead)
case _ => {}
}
val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
shuffleWrite match {
case Some(s) => {
val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
executorToShuffleWrite.put(eid, newShuffleWrite)
}
case _ => {}
}
case _ => {}
}
}
}

View file

@ -147,18 +147,20 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
y.duration += taskEnd.taskInfo.duration
// update shuffle read/write
val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
shuffleRead match {
case Some(s) =>
y.shuffleRead += s.remoteBytesRead
case _ => {}
}
val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
shuffleWrite match {
case Some(s) => {
y.shuffleWrite += s.shuffleBytesWritten
if (null != taskEnd.taskMetrics) {
val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
shuffleRead match {
case Some(s) =>
y.shuffleRead += s.remoteBytesRead
case _ => {}
}
val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
shuffleWrite match {
case Some(s) => {
y.shuffleWrite += s.shuffleBytesWritten
}
case _ => {}
}
case _ => {}
}
}
case _ => {}