Improvements to external sorting
1. Adds the option of compressing outputs. 2. Adds batching to the serialization to prevent OOM on the read side. 3. Slight renaming of config options. 4. Use Spark's buffer size for reads in addition to writes.
This commit is contained in:
parent
e6ed13f255
commit
5d61e051c2
|
@ -32,7 +32,7 @@ case class Aggregator[K, V, C] (
|
|||
mergeCombiners: (C, C) => C) {
|
||||
|
||||
private val sparkConf = SparkEnv.get.conf
|
||||
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
|
||||
private val externalSorting = sparkConf.getBoolean("spark.shuffle.external", true)
|
||||
|
||||
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
|
||||
if (!externalSorting) {
|
||||
|
|
|
@ -80,6 +80,8 @@ private[spark] class BlockManager(
|
|||
val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
|
||||
// Whether to compress RDD partitions that are stored serialized
|
||||
val compressRdds = conf.getBoolean("spark.rdd.compress", false)
|
||||
// Whether to compress shuffle output temporarily spilled to disk
|
||||
val compressExternalShuffle = conf.getBoolean("spark.shuffle.external.compress", false)
|
||||
|
||||
val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
|
||||
|
||||
|
@ -790,6 +792,7 @@ private[spark] class BlockManager(
|
|||
case ShuffleBlockId(_, _, _) => compressShuffle
|
||||
case BroadcastBlockId(_) => compressBroadcast
|
||||
case RDDBlockId(_, _) => compressRdds
|
||||
case TempBlockId(_) => compressExternalShuffle
|
||||
case _ => false
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer
|
|||
|
||||
import org.apache.spark.{Logging, SparkEnv}
|
||||
import org.apache.spark.serializer.Serializer
|
||||
import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
|
||||
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
|
||||
|
||||
/**
|
||||
* An append-only map that spills sorted content to disk when there is insufficient space for it
|
||||
|
@ -60,7 +60,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
|
|||
mergeValue: (C, V) => C,
|
||||
mergeCombiners: (C, C) => C,
|
||||
serializer: Serializer = SparkEnv.get.serializerManager.default,
|
||||
diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager)
|
||||
blockManager: BlockManager = SparkEnv.get.blockManager)
|
||||
extends Iterable[(K, C)] with Serializable with Logging {
|
||||
|
||||
import ExternalAppendOnlyMap._
|
||||
|
@ -68,6 +68,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
|
|||
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
|
||||
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
|
||||
private val sparkConf = SparkEnv.get.conf
|
||||
private val diskBlockManager = blockManager.diskBlockManager
|
||||
|
||||
// Collective memory threshold shared across all running tasks
|
||||
private val maxMemoryThreshold = {
|
||||
|
@ -82,6 +83,14 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
|
|||
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
|
||||
private val trackMemoryThreshold = 1000
|
||||
|
||||
// Size of object batches when reading/writing from serializers. Objects are written in
|
||||
// batches, with each batch using its own serialization stream. This cuts down on the size
|
||||
// of reference-tracking maps constructed when deserializing a stream.
|
||||
//
|
||||
// NOTE: Setting this too low can cause excess copying when serializing, since some serailizers
|
||||
// grow internal data structures by growing + copying every time the number of objects doubles.
|
||||
private val serializerBatchSize = sparkConf.getLong("spark.shuffle.external.batchSize", 10000)
|
||||
|
||||
// How many times we have spilled so far
|
||||
private var spillCount = 0
|
||||
|
||||
|
@ -139,21 +148,34 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
|
|||
logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
|
||||
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
|
||||
val (blockId, file) = diskBlockManager.createTempBlock()
|
||||
val writer =
|
||||
new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)
|
||||
|
||||
val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
|
||||
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
|
||||
compressStream, syncWrites)
|
||||
|
||||
var writer = getNewWriter
|
||||
var objectsWritten = 0
|
||||
try {
|
||||
val it = currentMap.destructiveSortedIterator(comparator)
|
||||
while (it.hasNext) {
|
||||
val kv = it.next()
|
||||
writer.write(kv)
|
||||
objectsWritten += 1
|
||||
|
||||
if (objectsWritten == serializerBatchSize) {
|
||||
writer.commit()
|
||||
writer = getNewWriter
|
||||
objectsWritten = 0
|
||||
}
|
||||
}
|
||||
writer.commit()
|
||||
|
||||
if (objectsWritten > 0) writer.commit()
|
||||
} finally {
|
||||
// Partial failures cannot be tolerated; do not revert partial writes
|
||||
writer.close()
|
||||
}
|
||||
currentMap = new SizeTrackingAppendOnlyMap[K, C]
|
||||
spilledMaps.append(new DiskMapIterator(file))
|
||||
spilledMaps.append(new DiskMapIterator(file, blockId))
|
||||
|
||||
// Reset the amount of shuffle memory used by this map in the global pool
|
||||
val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
|
||||
|
@ -297,16 +319,24 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
|
|||
/**
|
||||
* An iterator that returns (K, C) pairs in sorted order from an on-disk map
|
||||
*/
|
||||
private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
|
||||
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
|
||||
val fileStream = new FileInputStream(file)
|
||||
val bufferedStream = new FastBufferedInputStream(fileStream)
|
||||
val deserializeStream = ser.deserializeStream(bufferedStream)
|
||||
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
|
||||
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
|
||||
var deserializeStream = ser.deserializeStream(compressedStream)
|
||||
var objectsRead = 0
|
||||
|
||||
var nextItem: (K, C) = null
|
||||
var eof = false
|
||||
|
||||
def readNextItem(): (K, C) = {
|
||||
if (!eof) {
|
||||
try {
|
||||
if (objectsRead == serializerBatchSize) {
|
||||
deserializeStream = ser.deserializeStream(compressedStream)
|
||||
objectsRead = 0
|
||||
}
|
||||
objectsRead += 1
|
||||
return deserializeStream.readObject().asInstanceOf[(K, C)]
|
||||
} catch {
|
||||
case e: EOFException =>
|
||||
|
|
|
@ -116,7 +116,7 @@ Apart from these, the following properties are also available, and may be useful
|
|||
<td>0.3</td>
|
||||
<td>
|
||||
Fraction of Java heap to use for aggregation and cogroups during shuffles, if
|
||||
<code>spark.shuffle.externalSorting</code> is enabled. At any given time, the collective size of
|
||||
<code>spark.shuffle.external</code> is true. At any given time, the collective size of
|
||||
all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
|
||||
begin to spill to disk. If spills are often, consider increasing this value at the expense of
|
||||
<code>spark.storage.memoryFraction</code>.
|
||||
|
@ -154,6 +154,13 @@ Apart from these, the following properties are also available, and may be useful
|
|||
Whether to compress map output files. Generally a good idea.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>spark.shuffle.external.compress</td>
|
||||
<td>false</td>
|
||||
<td>
|
||||
Whether to compress data spilled during shuffles.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>spark.broadcast.compress</td>
|
||||
<td>true</td>
|
||||
|
@ -388,7 +395,7 @@ Apart from these, the following properties are also available, and may be useful
|
|||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>spark.shuffle.externalSorting</td>
|
||||
<td>spark.shuffle.external</td>
|
||||
<td>true</td>
|
||||
<td>
|
||||
If set to "true", limits the amount of memory used during reduces by spilling data out to disk. This spilling
|
||||
|
|
Loading…
Reference in a new issue