[SPARK-15317][CORE] Don't store accumulators for every task in listeners

## What changes were proposed in this pull request?

In general, the Web UI doesn't need to store the Accumulator/AccumulableInfo for every task. It only needs the Accumulator values.

In this PR, it creates new UIData classes to store the necessary fields and make `JobProgressListener` store only these new classes, so that `JobProgressListener` won't store Accumulator/AccumulableInfo and the size of `JobProgressListener` becomes pretty small. I also eliminates `AccumulableInfo` from `SQLListener` so that we don't keep any references for those unused `AccumulableInfo`s.

## How was this patch tested?

I ran two tests reported in JIRA locally:

The first one is:
```
val data = spark.range(0, 10000, 1, 10000)
data.cache().count()
```
The retained size of JobProgressListener decreases from 60.7M to 6.9M.

The second one is:
```
import org.apache.spark.ml.CC
import org.apache.spark.sql.SQLContext
val sqlContext = SQLContext.getOrCreate(sc)
CC.runTest(sqlContext)
```

This test won't cause OOM after applying this patch.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13153 from zsxwing/memory.
This commit is contained in:
Shixiong Zhu 2016-05-19 12:05:17 -07:00 committed by Andrew Or
parent 6ac1c3a040
commit 4e3cb7a5d9
11 changed files with 186 additions and 32 deletions

View file

@ -20,10 +20,10 @@ import java.util.{Arrays, Date, List => JList}
import javax.ws.rs.{GET, Produces, QueryParam}
import javax.ws.rs.core.MediaType
import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
import org.apache.spark.ui.jobs.UIData.{InputMetricsUIData => InternalInputMetrics, OutputMetricsUIData => InternalOutputMetrics, ShuffleReadMetricsUIData => InternalShuffleReadMetrics, ShuffleWriteMetricsUIData => InternalShuffleWriteMetrics, TaskMetricsUIData => InternalTaskMetrics}
import org.apache.spark.util.Distribution
@Produces(Array(MediaType.APPLICATION_JSON))

View file

@ -332,7 +332,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo, Some(metrics)))
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics)))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
@ -395,9 +395,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
taskData.taskInfo = info
taskData.metrics = taskMetrics
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None))
taskData.updateTaskInfo(info)
taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage
for (
@ -425,7 +425,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData: StageUIData,
execId: String,
taskMetrics: TaskMetrics,
oldMetrics: Option[TaskMetrics]) {
oldMetrics: Option[TaskMetricsUIData]) {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)
val shuffleWriteDelta =
@ -503,7 +503,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (!t.taskInfo.finished) {
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
// Overwrite task metrics
t.metrics = Some(metrics)
t.updateTaskMetrics(Some(metrics))
}
}
}

View file

@ -768,7 +768,7 @@ private[ui] object StagePage {
}
private[ui] def getSchedulerDelay(
info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = {
if (info.finished) {
val totalExecutionTime = info.finishTime - info.launchTime
val executorOverhead = (metrics.executorDeserializeTime +

View file

@ -21,8 +21,10 @@ import scala.collection.mutable
import scala.collection.mutable.HashMap
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
private[spark] object UIData {
@ -105,13 +107,137 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
class TaskUIData(
var taskInfo: TaskInfo,
var metrics: Option[TaskMetrics] = None,
var errorMessage: Option[String] = None)
class TaskUIData private(
private var _taskInfo: TaskInfo,
private var _metrics: Option[TaskMetricsUIData]) {
var errorMessage: Option[String] = None
def taskInfo: TaskInfo = _taskInfo
def metrics: Option[TaskMetricsUIData] = _metrics
def updateTaskInfo(taskInfo: TaskInfo): Unit = {
_taskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
}
def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
_metrics = TaskUIData.toTaskMetricsUIData(metrics)
}
}
object TaskUIData {
def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics))
}
private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = {
metrics.map { m =>
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorRunTime = m.executorRunTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
updatedBlockStatuses = m.updatedBlockStatuses.toList,
inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead),
outputMetrics =
OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}
}
/**
* We don't need to store internal or SQL accumulables as their values will be shown in other
* places, so drop them to reduce the memory usage.
*/
private[spark] def dropInternalAndSQLAccumulables(taskInfo: TaskInfo): TaskInfo = {
val newTaskInfo = new TaskInfo(
taskId = taskInfo.taskId,
index = taskInfo.index,
attemptNumber = taskInfo.attemptNumber,
launchTime = taskInfo.launchTime,
executorId = taskInfo.executorId,
host = taskInfo.host,
taskLocality = taskInfo.taskLocality,
speculative = taskInfo.speculative
)
newTaskInfo.gettingResultTime = taskInfo.gettingResultTime
newTaskInfo.accumulables ++= taskInfo.accumulables.filter {
accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
}
newTaskInfo.finishTime = taskInfo.finishTime
newTaskInfo.failed = taskInfo.failed
newTaskInfo
}
}
class ExecutorUIData(
val startTime: Long,
var finishTime: Option[Long] = None,
var finishReason: Option[String] = None)
case class TaskMetricsUIData(
executorDeserializeTime: Long,
executorRunTime: Long,
resultSize: Long,
jvmGCTime: Long,
resultSerializationTime: Long,
memoryBytesSpilled: Long,
diskBytesSpilled: Long,
peakExecutionMemory: Long,
updatedBlockStatuses: Seq[(BlockId, BlockStatus)],
inputMetrics: InputMetricsUIData,
outputMetrics: OutputMetricsUIData,
shuffleReadMetrics: ShuffleReadMetricsUIData,
shuffleWriteMetrics: ShuffleWriteMetricsUIData)
case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
case class ShuffleReadMetricsUIData(
remoteBlocksFetched: Long,
localBlocksFetched: Long,
remoteBytesRead: Long,
localBytesRead: Long,
fetchWaitTime: Long,
recordsRead: Long,
totalBytesRead: Long,
totalBlocksFetched: Long)
object ShuffleReadMetricsUIData {
def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
new ShuffleReadMetricsUIData(
remoteBlocksFetched = metrics.remoteBlocksFetched,
localBlocksFetched = metrics.localBlocksFetched,
remoteBytesRead = metrics.remoteBytesRead,
localBytesRead = metrics.localBytesRead,
fetchWaitTime = metrics.fetchWaitTime,
recordsRead = metrics.recordsRead,
totalBytesRead = metrics.totalBytesRead,
totalBlocksFetched = metrics.totalBlocksFetched
)
}
}
case class ShuffleWriteMetricsUIData(
bytesWritten: Long,
recordsWritten: Long,
writeTime: Long)
object ShuffleWriteMetricsUIData {
def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
new ShuffleWriteMetricsUIData(
bytesWritten = metrics.bytesWritten,
recordsWritten = metrics.recordsWritten,
writeTime = metrics.writeTime
)
}
}
}

View file

@ -255,6 +255,9 @@ private[spark] object AccumulatorContext {
def clear(): Unit = {
originals.clear()
}
// Identifier for distinguishing SQL metrics from other accumulators
private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
}

View file

@ -30,8 +30,8 @@ class AllStagesResourceSuite extends SparkFunSuite {
def getFirstTaskLaunchTime(taskLaunchTimes: Seq[Long]): Option[Date] = {
val tasks = new HashMap[Long, TaskUIData]
taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
tasks(idx.toLong) = new TaskUIData(
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None, None)
tasks(idx.toLong) = TaskUIData(
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None)
}
val stageUiData = new StageUIData()

View file

@ -25,7 +25,8 @@ import org.apache.spark._
import org.apache.spark.{LocalSparkContext, SparkConf, Success}
import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
import org.apache.spark.ui.jobs.UIData.TaskUIData
import org.apache.spark.util.{AccumulatorContext, Utils}
class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers {
@ -359,4 +360,30 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(
stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 402)
}
test("drop internal and sql accumulators") {
val taskInfo = new TaskInfo(0, 0, 0, 0, "", "", TaskLocality.ANY, false)
val internalAccum =
AccumulableInfo(id = 1, name = Some("internal"), None, None, internal = true, false)
val sqlAccum = AccumulableInfo(
id = 2,
name = Some("sql"),
None,
None,
internal = false,
countFailedValues = false,
metadata = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
val userAccum = AccumulableInfo(
id = 3,
name = Some("user"),
None,
None,
internal = false,
countFailedValues = false,
metadata = None)
taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum)
val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
assert(newTaskInfo.accumulables === Seq(userAccum))
}
}

View file

@ -21,7 +21,7 @@ import java.text.NumberFormat
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.{AccumulatorV2, Utils}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
@ -56,15 +56,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
new AccumulableInfo(id, name, update, value, true, true, Some(SQLMetrics.ACCUM_IDENTIFIER))
new AccumulableInfo(
id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
}
}
private[sql] object SQLMetrics {
// Identifier for distinguishing SQL metrics from other accumulators
private[sql] val ACCUM_IDENTIFIER = "sql"
private[sql] val SUM_METRIC = "sum"
private[sql] val SIZE_METRIC = "size"
private[sql] val TIMING_METRIC = "timing"

View file

@ -26,6 +26,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext
@DeveloperApi
case class SparkListenerSQLExecutionStart(
@ -177,8 +178,10 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
taskId: Long,
stageId: Int,
stageAttemptID: Int,
accumulatorUpdates: Seq[AccumulableInfo],
_accumulatorUpdates: Seq[AccumulableInfo],
finishTask: Boolean): Unit = {
val accumulatorUpdates =
_accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get))
_stageIdToStageMetrics.get(stageId) match {
case Some(stageMetrics) =>
@ -290,9 +293,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable;
taskMetrics <- stageMetrics.taskIdToMetricUpdates.values;
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
assert(accumulatorUpdate.update.isDefined, s"accumulator update from " +
s"task did not have a partial value: ${accumulatorUpdate.name}")
(accumulatorUpdate.id, accumulatorUpdate.update.get)
(accumulatorUpdate._1, accumulatorUpdate._2)
}
}.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
@ -336,7 +337,7 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
taskEnd.taskInfo.accumulables.flatMap { a =>
// Filter out accumulators that are not SQL metrics
// For now we assume all SQL metrics are Long's that have been JSON serialized as String's
if (a.metadata == Some(SQLMetrics.ACCUM_IDENTIFIER)) {
if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) {
val newValue = a.update.map(_.toString.toLong).getOrElse(0L)
Some(a.copy(update = Some(newValue)))
} else {
@ -418,4 +419,4 @@ private[ui] class SQLStageMetrics(
private[ui] class SQLTaskMetrics(
val attemptId: Long, // TODO not used yet
var finished: Boolean,
var accumulatorUpdates: Seq[AccumulableInfo])
var accumulatorUpdates: Seq[(Long, Any)])

View file

@ -29,9 +29,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{JsonProtocol, Utils}
import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils}
class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
@ -308,7 +307,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
case Some(v) => fail(s"metric value was not a Long: ${v.getClass.getName}")
case _ => fail("metric update is missing")
}
assert(metricInfo.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
assert(metricInfo.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
// After serializing to JSON, the original value type is lost, but we can still
// identify that it's a SQL metric from the metadata
val metricInfoJson = JsonProtocol.accumulableInfoToJson(metricInfo)
@ -318,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
case Some(v) => fail(s"deserialized metric value was not a string: ${v.getClass.getName}")
case _ => fail("deserialized metric update is missing")
}
assert(metricInfoDeser.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
}
}

View file

@ -383,7 +383,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
}
// Listener tracks only SQL metrics, not other accumulators
assert(trackedAccums.size === 1)
assert(trackedAccums.head === sqlMetricInfo)
assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get))
}
}