Add number of bytes spilled to Web UI
This commit is contained in:
parent
e6447152b3
commit
bb8098f203
|
@ -32,9 +32,9 @@ case class Aggregator[K, V, C] (
|
||||||
mergeCombiners: (C, C) => C) {
|
mergeCombiners: (C, C) => C) {
|
||||||
|
|
||||||
private val sparkConf = SparkEnv.get.conf
|
private val sparkConf = SparkEnv.get.conf
|
||||||
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
|
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
|
||||||
|
|
||||||
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
|
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = {
|
||||||
if (!externalSorting) {
|
if (!externalSorting) {
|
||||||
val combiners = new AppendOnlyMap[K,C]
|
val combiners = new AppendOnlyMap[K,C]
|
||||||
var kv: Product2[K, V] = null
|
var kv: Product2[K, V] = null
|
||||||
|
@ -53,11 +53,12 @@ case class Aggregator[K, V, C] (
|
||||||
val (k, v) = iter.next()
|
val (k, v) = iter.next()
|
||||||
combiners.insert(k, v)
|
combiners.insert(k, v)
|
||||||
}
|
}
|
||||||
|
combiners.registerBytesSpilled(context.attemptId)
|
||||||
combiners.iterator
|
combiners.iterator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
|
def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
|
||||||
if (!externalSorting) {
|
if (!externalSorting) {
|
||||||
val combiners = new AppendOnlyMap[K,C]
|
val combiners = new AppendOnlyMap[K,C]
|
||||||
var kc: Product2[K, C] = null
|
var kc: Product2[K, C] = null
|
||||||
|
@ -75,6 +76,7 @@ case class Aggregator[K, V, C] (
|
||||||
val (k, c) = iter.next()
|
val (k, c) = iter.next()
|
||||||
combiners.insert(k, c)
|
combiners.insert(k, c)
|
||||||
}
|
}
|
||||||
|
combiners.registerBytesSpilled(context.attemptId)
|
||||||
combiners.iterator
|
combiners.iterator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,9 @@ class SparkEnv private[spark] (
|
||||||
// All accesses should be manually synchronized
|
// All accesses should be manually synchronized
|
||||||
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
|
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
|
||||||
|
|
||||||
|
// A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping.
|
||||||
|
val bytesSpilledMap = mutable.HashMap[Long, Long]()
|
||||||
|
|
||||||
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
|
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
|
||||||
|
|
||||||
// A general, soft-reference map for metadata needed during HadoopRDD split computation
|
// A general, soft-reference map for metadata needed during HadoopRDD split computation
|
||||||
|
|
|
@ -229,6 +229,7 @@ private[spark] class Executor(
|
||||||
m.executorRunTime = (taskFinish - taskStart).toInt
|
m.executorRunTime = (taskFinish - taskStart).toInt
|
||||||
m.jvmGCTime = gcTime - startGCTime
|
m.jvmGCTime = gcTime - startGCTime
|
||||||
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
|
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
|
||||||
|
m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
val accumUpdates = Accumulators.values
|
val accumUpdates = Accumulators.values
|
||||||
|
@ -279,11 +280,12 @@ private[spark] class Executor(
|
||||||
//System.exit(1)
|
//System.exit(1)
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// TODO: Unregister shuffle memory only for ShuffleMapTask
|
// TODO: Unregister shuffle memory only for ResultTask
|
||||||
val shuffleMemoryMap = env.shuffleMemoryMap
|
val shuffleMemoryMap = env.shuffleMemoryMap
|
||||||
shuffleMemoryMap.synchronized {
|
shuffleMemoryMap.synchronized {
|
||||||
shuffleMemoryMap.remove(Thread.currentThread().getId)
|
shuffleMemoryMap.remove(Thread.currentThread().getId)
|
||||||
}
|
}
|
||||||
|
env.bytesSpilledMap.remove(taskId)
|
||||||
runningTasks.remove(taskId)
|
runningTasks.remove(taskId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,11 @@ class TaskMetrics extends Serializable {
|
||||||
*/
|
*/
|
||||||
var resultSerializationTime: Long = _
|
var resultSerializationTime: Long = _
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of bytes spilled to disk by this task
|
||||||
|
*/
|
||||||
|
var bytesSpilled: Long = _
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
|
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -106,7 +106,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
|
||||||
override val partitioner = Some(part)
|
override val partitioner = Some(part)
|
||||||
|
|
||||||
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
|
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
|
||||||
val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
|
|
||||||
|
val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
|
||||||
val split = s.asInstanceOf[CoGroupPartition]
|
val split = s.asInstanceOf[CoGroupPartition]
|
||||||
val numRdds = split.deps.size
|
val numRdds = split.deps.size
|
||||||
|
|
||||||
|
@ -150,6 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
|
||||||
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
|
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
map.registerBytesSpilled(context.attemptId)
|
||||||
new InterruptibleIterator(context, map.iterator)
|
new InterruptibleIterator(context, map.iterator)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,20 +88,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
|
||||||
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
|
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
|
||||||
if (self.partitioner == Some(partitioner)) {
|
if (self.partitioner == Some(partitioner)) {
|
||||||
self.mapPartitionsWithContext((context, iter) => {
|
self.mapPartitionsWithContext((context, iter) => {
|
||||||
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
|
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
|
||||||
}, preservesPartitioning = true)
|
}, preservesPartitioning = true)
|
||||||
} else if (mapSideCombine) {
|
} else if (mapSideCombine) {
|
||||||
val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
|
val combined = self.mapPartitionsWithContext((context, iter) => {
|
||||||
|
aggregator.combineValuesByKey(iter, context)
|
||||||
|
}, preservesPartitioning = true)
|
||||||
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
|
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
|
||||||
.setSerializer(serializerClass)
|
.setSerializer(serializerClass)
|
||||||
partitioned.mapPartitionsWithContext((context, iter) => {
|
partitioned.mapPartitionsWithContext((context, iter) => {
|
||||||
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
|
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
|
||||||
}, preservesPartitioning = true)
|
}, preservesPartitioning = true)
|
||||||
} else {
|
} else {
|
||||||
// Don't apply map-side combiner.
|
// Don't apply map-side combiner.
|
||||||
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
|
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
|
||||||
values.mapPartitionsWithContext((context, iter) => {
|
values.mapPartitionsWithContext((context, iter) => {
|
||||||
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
|
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
|
||||||
}, preservesPartitioning = true)
|
}, preservesPartitioning = true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,4 +24,5 @@ private[spark] class ExecutorSummary {
|
||||||
var succeededTasks : Int = 0
|
var succeededTasks : Int = 0
|
||||||
var shuffleRead : Long = 0
|
var shuffleRead : Long = 0
|
||||||
var shuffleWrite : Long = 0
|
var shuffleWrite : Long = 0
|
||||||
|
var bytesSpilled : Long = 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
|
||||||
<th>Succeeded Tasks</th>
|
<th>Succeeded Tasks</th>
|
||||||
<th>Shuffle Read</th>
|
<th>Shuffle Read</th>
|
||||||
<th>Shuffle Write</th>
|
<th>Shuffle Write</th>
|
||||||
|
<th>Bytes Spilled</th>
|
||||||
</thead>
|
</thead>
|
||||||
<tbody>
|
<tbody>
|
||||||
{createExecutorTable()}
|
{createExecutorTable()}
|
||||||
|
@ -80,6 +81,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
|
||||||
<td>{v.succeededTasks}</td>
|
<td>{v.succeededTasks}</td>
|
||||||
<td>{Utils.bytesToString(v.shuffleRead)}</td>
|
<td>{Utils.bytesToString(v.shuffleRead)}</td>
|
||||||
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
|
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
|
||||||
|
<td>{Utils.bytesToString(v.bytesSpilled)}</td>
|
||||||
</tr>
|
</tr>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
|
||||||
val stageIdToTime = HashMap[Int, Long]()
|
val stageIdToTime = HashMap[Int, Long]()
|
||||||
val stageIdToShuffleRead = HashMap[Int, Long]()
|
val stageIdToShuffleRead = HashMap[Int, Long]()
|
||||||
val stageIdToShuffleWrite = HashMap[Int, Long]()
|
val stageIdToShuffleWrite = HashMap[Int, Long]()
|
||||||
|
val stageIdToBytesSpilled = HashMap[Int, Long]()
|
||||||
val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
|
val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
|
||||||
val stageIdToTasksComplete = HashMap[Int, Int]()
|
val stageIdToTasksComplete = HashMap[Int, Int]()
|
||||||
val stageIdToTasksFailed = HashMap[Int, Int]()
|
val stageIdToTasksFailed = HashMap[Int, Int]()
|
||||||
|
@ -78,6 +79,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
|
||||||
stageIdToTime.remove(s.stageId)
|
stageIdToTime.remove(s.stageId)
|
||||||
stageIdToShuffleRead.remove(s.stageId)
|
stageIdToShuffleRead.remove(s.stageId)
|
||||||
stageIdToShuffleWrite.remove(s.stageId)
|
stageIdToShuffleWrite.remove(s.stageId)
|
||||||
|
stageIdToBytesSpilled.remove(s.stageId)
|
||||||
stageIdToTasksActive.remove(s.stageId)
|
stageIdToTasksActive.remove(s.stageId)
|
||||||
stageIdToTasksComplete.remove(s.stageId)
|
stageIdToTasksComplete.remove(s.stageId)
|
||||||
stageIdToTasksFailed.remove(s.stageId)
|
stageIdToTasksFailed.remove(s.stageId)
|
||||||
|
@ -149,6 +151,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
|
||||||
Option(taskEnd.taskMetrics).foreach { taskMetrics =>
|
Option(taskEnd.taskMetrics).foreach { taskMetrics =>
|
||||||
taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
|
taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
|
||||||
taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
|
taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
|
||||||
|
y.bytesSpilled += taskMetrics.bytesSpilled
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case _ => {}
|
case _ => {}
|
||||||
|
@ -184,6 +187,10 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
|
||||||
stageIdToShuffleWrite(sid) += shuffleWrite
|
stageIdToShuffleWrite(sid) += shuffleWrite
|
||||||
totalShuffleWrite += shuffleWrite
|
totalShuffleWrite += shuffleWrite
|
||||||
|
|
||||||
|
stageIdToBytesSpilled.getOrElseUpdate(sid, 0L)
|
||||||
|
val bytesSpilled = metrics.map(m => m.bytesSpilled).getOrElse(0L)
|
||||||
|
stageIdToBytesSpilled(sid) += bytesSpilled
|
||||||
|
|
||||||
val taskList = stageIdToTaskInfos.getOrElse(
|
val taskList = stageIdToTaskInfos.getOrElse(
|
||||||
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
|
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
|
||||||
taskList -= ((taskEnd.taskInfo, None, None))
|
taskList -= ((taskEnd.taskInfo, None, None))
|
||||||
|
|
|
@ -56,6 +56,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
||||||
val hasShuffleRead = shuffleReadBytes > 0
|
val hasShuffleRead = shuffleReadBytes > 0
|
||||||
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
|
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
|
||||||
val hasShuffleWrite = shuffleWriteBytes > 0
|
val hasShuffleWrite = shuffleWriteBytes > 0
|
||||||
|
val bytesSpilled = listener.stageIdToBytesSpilled.getOrElse(stageId, 0L)
|
||||||
|
val hasBytesSpilled = bytesSpilled > 0
|
||||||
|
|
||||||
var activeTime = 0L
|
var activeTime = 0L
|
||||||
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
|
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
|
||||||
|
@ -81,6 +83,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
||||||
{Utils.bytesToString(shuffleWriteBytes)}
|
{Utils.bytesToString(shuffleWriteBytes)}
|
||||||
</li>
|
</li>
|
||||||
}
|
}
|
||||||
|
{if (hasBytesSpilled)
|
||||||
|
<li>
|
||||||
|
<strong>Bytes spilled: </strong>
|
||||||
|
{Utils.bytesToString(bytesSpilled)}
|
||||||
|
</li>
|
||||||
|
}
|
||||||
</ul>
|
</ul>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
@ -89,9 +97,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
||||||
Seq("Duration", "GC Time", "Result Ser Time") ++
|
Seq("Duration", "GC Time", "Result Ser Time") ++
|
||||||
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
|
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
|
||||||
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
|
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
|
||||||
|
{if (hasBytesSpilled) Seq("Bytes Spilled") else Nil} ++
|
||||||
Seq("Errors")
|
Seq("Errors")
|
||||||
|
|
||||||
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
|
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
|
||||||
|
|
||||||
// Excludes tasks which failed and have incomplete metrics
|
// Excludes tasks which failed and have incomplete metrics
|
||||||
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
|
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
|
||||||
|
@ -153,13 +162,20 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
||||||
}
|
}
|
||||||
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
|
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
|
||||||
|
|
||||||
|
val bytesSpilledSizes = validTasks.map {
|
||||||
|
case(info, metrics, exception) =>
|
||||||
|
metrics.get.bytesSpilled.toDouble
|
||||||
|
}
|
||||||
|
val bytesSpilledQuantiles = "Bytes Spilled" +: getQuantileCols(bytesSpilledSizes)
|
||||||
|
|
||||||
val listings: Seq[Seq[String]] = Seq(
|
val listings: Seq[Seq[String]] = Seq(
|
||||||
serializationQuantiles,
|
serializationQuantiles,
|
||||||
serviceQuantiles,
|
serviceQuantiles,
|
||||||
gettingResultQuantiles,
|
gettingResultQuantiles,
|
||||||
schedulerDelayQuantiles,
|
schedulerDelayQuantiles,
|
||||||
if (hasShuffleRead) shuffleReadQuantiles else Nil,
|
if (hasShuffleRead) shuffleReadQuantiles else Nil,
|
||||||
if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
|
if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
|
||||||
|
if (hasBytesSpilled) bytesSpilledQuantiles else Nil)
|
||||||
|
|
||||||
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
|
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
|
||||||
"Median", "75th percentile", "Max")
|
"Median", "75th percentile", "Max")
|
||||||
|
@ -178,8 +194,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
|
||||||
def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
|
|
||||||
(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
|
(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
|
||||||
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
|
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
|
||||||
trace.map(e => <span style="display:block;">{e.toString}</span>)
|
trace.map(e => <span style="display:block;">{e.toString}</span>)
|
||||||
|
@ -205,6 +220,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
||||||
val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
|
val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
|
||||||
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
|
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
|
||||||
|
|
||||||
|
val maybeBytesSpilled = metrics.map{m => m.bytesSpilled}
|
||||||
|
val bytesSpilledSortable = maybeBytesSpilled.map(_.toString).getOrElse("")
|
||||||
|
val bytesSpilledReadable = maybeBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
|
||||||
|
|
||||||
<tr>
|
<tr>
|
||||||
<td>{info.index}</td>
|
<td>{info.index}</td>
|
||||||
<td>{info.taskId}</td>
|
<td>{info.taskId}</td>
|
||||||
|
@ -234,6 +253,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
||||||
{shuffleWriteReadable}
|
{shuffleWriteReadable}
|
||||||
</td>
|
</td>
|
||||||
}}
|
}}
|
||||||
|
{if (bytesSpilled) {
|
||||||
|
<td sorttable_customkey={bytesSpilledSortable}>
|
||||||
|
{bytesSpilledReadable}
|
||||||
|
</td>
|
||||||
|
}}
|
||||||
<td>{exception.map(e =>
|
<td>{exception.map(e =>
|
||||||
<span>
|
<span>
|
||||||
{e.className} ({e.description})<br/>
|
{e.className} ({e.description})<br/>
|
||||||
|
|
|
@ -77,7 +77,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
|
||||||
}
|
}
|
||||||
|
|
||||||
// Number of pairs in the in-memory map
|
// Number of pairs in the in-memory map
|
||||||
private var numPairsInMemory = 0
|
private var numPairsInMemory = 0L
|
||||||
|
|
||||||
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
|
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
|
||||||
private val trackMemoryThreshold = 1000
|
private val trackMemoryThreshold = 1000
|
||||||
|
@ -85,6 +85,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
|
||||||
// How many times we have spilled so far
|
// How many times we have spilled so far
|
||||||
private var spillCount = 0
|
private var spillCount = 0
|
||||||
|
|
||||||
|
// Number of bytes spilled in total
|
||||||
|
private var bytesSpilled = 0L
|
||||||
|
|
||||||
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
|
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
|
||||||
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
|
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
|
||||||
private val comparator = new KCComparator[K, C]
|
private val comparator = new KCComparator[K, C]
|
||||||
|
@ -161,6 +164,14 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
|
||||||
shuffleMemoryMap(Thread.currentThread().getId) = 0
|
shuffleMemoryMap(Thread.currentThread().getId) = 0
|
||||||
}
|
}
|
||||||
numPairsInMemory = 0
|
numPairsInMemory = 0
|
||||||
|
bytesSpilled += mapSize
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the total number of bytes spilled by this task
|
||||||
|
*/
|
||||||
|
def registerBytesSpilled(taskId: Long) {
|
||||||
|
SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue