Add toggle for ExternalAppendOnlyMap in Aggregator and CoGroupedRDD

This commit is contained in:
Andrew Or 2013-12-25 17:32:01 -08:00
parent 28685a4820
commit 2a2ca2a661
3 changed files with 66 additions and 25 deletions

View file

@ -32,22 +32,45 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
//val combiners = new AppendOnlyMap[K, C]
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val kv =
combiners.insert(kv._1, kv._2)
val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
while (iter.hasNext) {
kv =
combiners.changeValue(kv._1, update)
} else {
// Spilling
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
iter.foreach { case(k, v) => combiners.insert(k, v) }
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
//val combiners = new AppendOnlyMap[K, C]
val combiners = new ExternalAppendOnlyMap[K, C, C]((c:C) => c, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
val kc =
combiners.insert(kc._1, kc._2)
val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
while (iter.hasNext) {
kc =
combiners.changeValue(kc._1, update)
} else {
// Spilling
def combinerIdentity(combiner:C) = combiner
val combiners = new ExternalAppendOnlyMap[K, C, C](combinerIdentity, mergeCombiners, mergeCombiners)
iter.foreach { case(k, c) => combiners.insert(k, c) }

View file

@ -102,28 +102,49 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
//val combiners = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
val combiners = createExternalMap(numRdds)
val ser = SparkEnv.get.serializerManager.get(serializerClass)
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach {
kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
val v = (rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]], depNum)
rddIterators += v
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum)
rddIterators += v
new InterruptibleIterator(context, combiners.iterator)
if (!externalSorting) {
val map = new AppendOnlyMap[K, CoGroupCombiner]
val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
val getSeq = (k: K) => map.changeValue(k, update)
rddIterators.foreach { case(iter, depNum) =>
iter.foreach {
case(k, v) => getSeq(k)(depNum) += v
new InterruptibleIterator(context, map.iterator)
} else {
// Spilling
val map = createExternalMap(numRdds)
rddIterators.foreach { case(iter, depNum) =>
iter.foreach {
case(k, v) => map.insert(k, new CoGroupValue(v, depNum))
new InterruptibleIterator(context, map.iterator)
private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] = {

View file

@ -39,11 +39,9 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
private val map: SpillableAppendOnlyMap[K, V, _, C] = {
if (mergeBeforeSpill) {
println("* Merge before spill *")
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB)
} else {
println("* Merge after spill *")
new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
@ -103,7 +101,6 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
def spill(): Unit = {
println("> SPILL <")
val file = File.createTempFile("external_append_only_map", "") // Add spill location
val out = new ObjectOutputStream(new FileOutputStream(file))
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())