[SPARK-29273][CORE] Save peakExecutionMemory value when writing task end to event log

in TaskMetrics, there is a exposed metrics peakExecutionMemory, but the value never been set. when a task is finished, it generate a SparkListenerTaskEnd event info, incuding the metrics value. actually the peakExecutionMemory is stored in the Accumulables which is a member of TaskInfo.

so when parse the SparkListenerTaskEnd event, we can get the `internal.metrics.peakExecutionMemory` value from the parsed taskInfo object, and set back to the TaskMetricsInfo with supply a setPeakExecutionMemory method.

Closes #25949 from 012huang/fix-peakExecutionMemory-metrics-value.

Lead-authored-by: huangweiyi <huangweiyi_2006@qq.com>
Co-authored-by: hwy <huangweiyi_2006@qq.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
huangweiyi 2019-10-02 08:45:12 -07:00 committed by Marcelo Vanzin
parent f2ead4d0b5
commit 85dafabeb4
3 changed files with 11 additions and 1 deletions

View file

@ -137,6 +137,7 @@ class TaskMetrics private[spark] () extends Serializable {
private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
private[spark] def setResultSerializationTime(v: Long): Unit =
_resultSerializationTime.setValue(v)
private[spark] def setPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.setValue(v)
private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)

View file

@ -391,6 +391,7 @@ private[spark] object JsonProtocol {
("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
("Executor CPU Time" -> taskMetrics.executorCpuTime) ~
("Peak Execution Memory" -> taskMetrics.peakExecutionMemory) ~
("Result Size" -> taskMetrics.resultSize) ~
("JVM GC Time" -> taskMetrics.jvmGCTime) ~
("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
@ -894,6 +895,10 @@ private[spark] object JsonProtocol {
case JNothing => 0
case x => x.extract[Long]
})
metrics.setPeakExecutionMemory((json \ "Peak Execution Memory") match {
case JNothing => 0
case x => x.extract[Long]
})
metrics.setResultSize((json \ "Result Size").extract[Long])
metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long])
metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])

View file

@ -938,6 +938,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
t.setExecutorDeserializeCpuTime(a)
t.setExecutorRunTime(b)
t.setExecutorCpuTime(b)
t.setPeakExecutionMemory(c)
t.setResultSize(c)
t.setJvmGCTime(d)
t.setResultSerializationTime(a + b)
@ -1243,6 +1244,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Executor Deserialize CPU Time": 300,
| "Executor Run Time": 400,
| "Executor CPU Time": 400,
| "Peak Execution Memory": 500,
| "Result Size": 500,
| "JVM GC Time": 600,
| "Result Serialization Time": 700,
@ -1366,6 +1368,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Executor Deserialize CPU Time": 300,
| "Executor Run Time": 400,
| "Executor CPU Time": 400,
| "Peak Execution Memory": 500,
| "Result Size": 500,
| "JVM GC Time": 600,
| "Result Serialization Time": 700,
@ -1489,6 +1492,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Executor Deserialize CPU Time": 300,
| "Executor Run Time": 400,
| "Executor CPU Time": 400,
| "Peak Execution Memory": 500,
| "Result Size": 500,
| "JVM GC Time": 600,
| "Result Serialization Time": 700,
@ -2052,7 +2056,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| {
| "ID": 9,
| "Name": "$PEAK_EXECUTION_MEMORY",
| "Update": 0,
| "Update": 500,
| "Internal": true,
| "Count Failed Values": true
| },