Avoid a copy in ShuffleMapTask by creating an iterator that will be used by the
block manager.
This commit is contained in:
parent
88b016db2a
commit
f4aaec7a48
|
@ -124,14 +124,18 @@ class ShuffleMapTask(
|
|||
val blockManager = SparkEnv.get.blockManager
|
||||
for (i <- 0 until numOutputSplits) {
|
||||
val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i
|
||||
val arr = new ArrayBuffer[Any]
|
||||
val iter = buckets(i).entrySet().iterator()
|
||||
while (iter.hasNext()) {
|
||||
// Create an iterator to avoid a copy. blockManager will traverse this
|
||||
// to create an block
|
||||
val entryIter = new Iterator[Any] {
|
||||
override def hasNext() = iter.hasNext()
|
||||
override def next() = {
|
||||
val entry = iter.next()
|
||||
arr += ((entry.getKey(), entry.getValue()))
|
||||
(entry.getKey(), entry.getValue())
|
||||
}
|
||||
}
|
||||
// TODO: This should probably be DISK_ONLY
|
||||
blockManager.put(blockId, arr.iterator, StorageLevel.MEMORY_ONLY, false)
|
||||
blockManager.put(blockId, entryIter, StorageLevel.MEMORY_ONLY, false)
|
||||
}
|
||||
return SparkEnv.get.blockManager.blockManagerId
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue