From 7ad4408255e37f95e545d9c21a4460cbf98c05dd Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 25 Dec 2013 23:10:53 -0800 Subject: [PATCH] New minor edits --- .../scala/org/apache/spark/Aggregator.scala | 5 +- .../org/apache/spark/rdd/CoGroupedRDD.scala | 37 ++++++----- .../spark/util/ExternalAppendOnlyMap.scala | 61 +++++++++---------- 3 files changed, 49 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 582625577f..8863c3175b 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -45,8 +45,8 @@ case class Aggregator[K, V, C] ( } combiners.iterator } else { - // Spilling - val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) + val combiners = + new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) iter.foreach { case(k, v) => combiners.insert(k, v) } combiners.iterator } @@ -66,7 +66,6 @@ case class Aggregator[K, V, C] ( } combiners.iterator } else { - // Spilling val combiners = new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners) iter.foreach { case(k, c) => combiners.insert(k, c) } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 3af0376a4d..113a912f16 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -50,7 +50,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) override def hashCode(): Int = idx } - /** * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a * tuple with the list of values for that key. @@ -108,7 +107,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size - val ser = SparkEnv.get.serializerManager.get(serializerClass) // A list of (rdd iterator, dependency number) pairs val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] @@ -121,6 +119,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher + val ser = SparkEnv.get.serializerManager.get(serializerClass) val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum) rddIterators += v } @@ -131,39 +130,39 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => { if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any]) } - val getSeq = (k: K) => map.changeValue(k, update) - rddIterators.foreach { case(iter, depNum) => - iter.foreach { - case(k, v) => getSeq(k)(depNum) += v + rddIterators.foreach { case(it, depNum) => + it.foreach { case(k, v) => + map.changeValue(k, update)(depNum) += v } } new InterruptibleIterator(context, map.iterator) } else { - // Spilling val map = createExternalMap(numRdds) - rddIterators.foreach { case(iter, depNum) => - iter.foreach { - case(k, v) => map.insert(k, new CoGroupValue(v, depNum)) + rddIterators.foreach { case(it, depNum) => + it.foreach { case(k, v) => + map.insert(k, new CoGroupValue(v, depNum)) } } new InterruptibleIterator(context, map.iterator) } } - private def createExternalMap(numRdds: Int) - : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { + private def createExternalMap(numRdds: Int): + ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { - val createCombiner: (CoGroupValue) => CoGroupCombiner = v => { + val createCombiner: (CoGroupValue => CoGroupCombiner) = value => { val newCombiner = Array.fill(numRdds)(new CoGroup) - v match { case (value, depNum) => newCombiner(depNum) += value } + value match { case(v, depNum) => newCombiner(depNum) += v } newCombiner } - val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (c, v) => { - v match { case (value, depNum) => c(depNum) += value } - c + val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = + (combiner, value) => { + value match { case(v, depNum) => combiner(depNum) += v } + combiner } - val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (c1, c2) => { - c1.zipAll(c2, new CoGroup, new CoGroup).map { + val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = + (combiner1, combiner2) => { + combiner1.zipAll(combiner2, new CoGroup, new CoGroup).map { case (v1, v2) => v1 ++ v2 } } diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala index c8c053460c..413f83862d 100644 --- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala @@ -28,11 +28,11 @@ import scala.util.Random /** * A wrapper for SpillableAppendOnlyMap that handles two cases: * - * (1) If a mergeCombiners function is specified, merge values into combiners before - * disk spill, as it is possible to merge the resulting combiners later. + * (1) If a mergeCombiners function is specified, merge values into combiners before disk + * spill, as it is possible to merge the resulting combiners later. * - * (2) Otherwise, group values of the same key together before disk spill, and merge - * them into combiners only after reading them back from disk. + * (2) Otherwise, group values of the same key together before disk spill, and merge them + * into combiners only after reading them back from disk. */ class ExternalAppendOnlyMap[K, V, C]( createCombiner: V => C, @@ -48,8 +48,25 @@ class ExternalAppendOnlyMap[K, V, C]( new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB) } else { + // Use ArrayBuffer[V] as the intermediate combiner val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value) - new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup, + val mergeValueIntoGroup: (ArrayBuffer[V], V) => ArrayBuffer[V] = (group, value) => { + group += value + } + val mergeGroups: (ArrayBuffer[V], ArrayBuffer[V]) => ArrayBuffer[V] = (group1, group2) => { + group1 ++= group2 + } + val combineGroup: (ArrayBuffer[V] => C) = group => { + var combiner : Option[C] = None + group.foreach { v => + combiner match { + case None => combiner = Some(createCombiner(v)) + case Some(c) => combiner = Some(mergeValue(c, v)) + } + } + combiner.getOrElse(null.asInstanceOf[C]) + } + new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB) } } @@ -57,31 +74,11 @@ class ExternalAppendOnlyMap[K, V, C]( def insert(key: K, value: V): Unit = map.insert(key, value) override def iterator: Iterator[(K, C)] = map.iterator - - private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): ArrayBuffer[V] = { - group += value - group - } - private def mergeGroups(group1: ArrayBuffer[V], group2: ArrayBuffer[V]): ArrayBuffer[V] = { - group1 ++= group2 - group1 - } - private def combineGroup(group: ArrayBuffer[V]): C = { - var combiner : Option[C] = None - group.foreach { v => - combiner match { - case None => combiner = Some(createCombiner(v)) - case Some(c) => combiner = Some(mergeValue(c, v)) - } - } - combiner.get - } } /** - * An append-only map that spills sorted content to disk when the memory threshold - * is exceeded. A group with type M is an intermediate combiner, and shares the same - * type as either C or ArrayBuffer[V]. + * An append-only map that spills sorted content to disk when the memory threshold is exceeded. + * A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V]. */ class SpillableAppendOnlyMap[K, V, M, C]( createGroup: V => M, @@ -96,7 +93,7 @@ class SpillableAppendOnlyMap[K, V, M, C]( var oldMaps = new ArrayBuffer[DiskIterator] def insert(key: K, value: V): Unit = { - def update(hadVal: Boolean, oldVal: M): M = { + val update: (Boolean, M) => M = (hadVal, oldVal) => { if (hadVal) mergeValue(oldVal, value) else createGroup(value) } currentMap.changeValue(key, update) @@ -128,11 +125,11 @@ class SpillableAppendOnlyMap[K, V, M, C]( inputStreams.foreach(readFromIterator) // Read from the given iterator until a key of different hash is retrieved - def readFromIterator(iter: Iterator[(K, M)]): Unit = { + def readFromIterator(it: Iterator[(K, M)]): Unit = { var minHash : Option[Int] = None - while (iter.hasNext) { - val (k, m) = iter.next() - pq.enqueue(KMITuple(k, m, iter)) + while (it.hasNext) { + val (k, m) = it.next() + pq.enqueue(KMITuple(k, m, it)) minHash match { case None => minHash = Some(k.hashCode()) case Some(expectedHash) if k.hashCode() != expectedHash => return