diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index c51fb1d630..ecaeb2d9b8 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -32,22 +32,45 @@ case class Aggregator[K, V, C] ( mergeCombiners: (C, C) => C) { def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { - //val combiners = new AppendOnlyMap[K, C] - val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) - while (iter.hasNext) { - val kv = iter.next() - combiners.insert(kv._1, kv._2) + val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean + if (!externalSorting) { + val combiners = new AppendOnlyMap[K,C] + var kv: Product2[K, V] = null + val update = (hadValue: Boolean, oldValue: C) => { + if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) + } + while (iter.hasNext) { + kv = iter.next() + combiners.changeValue(kv._1, update) + } + combiners.iterator + } else { + // Spilling + val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) + iter.foreach { case(k, v) => combiners.insert(k, v) } + combiners.iterator } - combiners.iterator } def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { - //val combiners = new AppendOnlyMap[K, C] - val combiners = new ExternalAppendOnlyMap[K, C, C]((c:C) => c, mergeCombiners, mergeCombiners) - while (iter.hasNext) { - val kc = iter.next() - combiners.insert(kc._1, kc._2) + val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean + if (!externalSorting) { + val combiners = new AppendOnlyMap[K,C] + var kc: Product2[K, C] = null + val update = (hadValue: Boolean, oldValue: C) => { + if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2 + } + while (iter.hasNext) { + kc = iter.next() + combiners.changeValue(kc._1, update) + } + combiners.iterator + } else { + // Spilling + def combinerIdentity(combiner:C) = combiner + val combiners = new ExternalAppendOnlyMap[K, C, C](combinerIdentity, mergeCombiners, mergeCombiners) + iter.foreach { case(k, c) => combiners.insert(k, c) } + combiners.iterator } - combiners.iterator } } \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index a2a3de7d88..a7265f3cac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -102,28 +102,49 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) + val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size - //val combiners = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] - val combiners = createExternalMap(numRdds) - val ser = SparkEnv.get.serializerManager.get(serializerClass) + + // A list of (rdd iterator, dependency number) pairs + val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent - rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { - kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum)) - } + val v = (rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]], depNum) + rddIterators += v } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher - fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach { - kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum)) - } + val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum) + rddIterators += v } } - new InterruptibleIterator(context, combiners.iterator) + + if (!externalSorting) { + val map = new AppendOnlyMap[K, CoGroupCombiner] + val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => { + if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any]) + } + val getSeq = (k: K) => map.changeValue(k, update) + rddIterators.foreach { case(iter, depNum) => + iter.foreach { + case(k, v) => getSeq(k)(depNum) += v + } + } + new InterruptibleIterator(context, map.iterator) + } else { + // Spilling + val map = createExternalMap(numRdds) + rddIterators.foreach { case(iter, depNum) => + iter.foreach { + case(k, v) => map.insert(k, new CoGroupValue(v, depNum)) + } + } + new InterruptibleIterator(context, map.iterator) + } } private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] = { diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala index e2205c6063..790dcf06df 100644 --- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala @@ -39,11 +39,9 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C, private val map: SpillableAppendOnlyMap[K, V, _, C] = { if (mergeBeforeSpill) { - println("* Merge before spill *") new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB) } else { - println("* Merge after spill *") new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup, mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB) } @@ -103,7 +101,6 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M, } def spill(): Unit = { - println("> SPILL <") val file = File.createTempFile("external_append_only_map", "") // Add spill location val out = new ObjectOutputStream(new FileOutputStream(file)) val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())