diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index ae16242718..77a24733aa 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -32,7 +32,6 @@ case class Aggregator[K, V, C] ( mergeCombiners: (C, C) => C) { def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { - println("Combining values by key!!") //val combiners = new AppendOnlyMap[K, C] val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners) var kv: Product2[K, V] = null @@ -47,7 +46,6 @@ case class Aggregator[K, V, C] ( } def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { - println("Combining combiners by key!!") //val combiners = new AppendOnlyMap[K, C] val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners) var kc: (K, C) = null diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 6283686322..4c45a94af9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -101,14 +101,16 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { - println("Computing in CoGroupedRDD!") // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size - val combineFunction: (Seq[ArrayBuffer[Any]], Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = - (x, y) => { x ++ y } + def combine(x: Seq[ArrayBuffer[Any]], y: Seq[ArrayBuffer[Any]]) = { + x.zipAll(y, ArrayBuffer[Any](), ArrayBuffer[Any]()).map { + case (a, b) => a ++ b + } + } //val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] - val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combineFunction) + val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combine) val ser = SparkEnv.get.serializerManager.get(serializerClass) for ((dep, depNum) <- split.deps.zipWithIndex) dep match { @@ -128,22 +130,18 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: } def addToMap(key: K, value: Any, depNum: Int) { - val updateFunction: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = - (hadVal, oldVal) => { - var newVal = oldVal - if (!hadVal){ - newVal = Array.fill(numRdds)(new ArrayBuffer[Any]) - } - newVal(depNum) += value - newVal + def update(hadVal: Boolean, oldVal: Seq[ArrayBuffer[Any]]): Seq[ArrayBuffer[Any]] = { + var newVal = oldVal + if (!hadVal){ + newVal = Array.fill(numRdds)(new ArrayBuffer[Any]) } - map.changeValue(key, updateFunction) + newVal(depNum) += value + newVal + } + map.changeValue(key, update) } - println("About to construct CoGroupedRDD iterator!") - val theIterator = map.iterator - println("Returning CoGroupedRDD iterator!") - new InterruptibleIterator(context, theIterator) + new InterruptibleIterator(context, map.iterator) } override def clearDependencies() { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 6849703f05..9512b418d7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -85,12 +85,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { - println("Partitioner is some partitioner! In fact, it is " + self.partitioner.toString()) self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) }, preservesPartitioning = true) } else if (mapSideCombine) { - println("Otherwise, combining on map side.") val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) @@ -98,10 +96,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter)) }, preservesPartitioning = true) } else { - println("Else. No combining on map side!") // Don't apply map-side combiner. - // A sanity check to make sure mergeCombiners is not defined. - assert(mergeCombiners == null) val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) @@ -229,8 +224,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) // into a hash table, leading to more objects in the old gen. def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false) + createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Seq[V])]] } @@ -650,7 +646,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf) { - println("SAVE AS HADOOP DATASET") val outputFormatClass = conf.getOutputFormat val keyClass = conf.getOutputKeyClass val valueClass = conf.getOutputValueClass @@ -670,7 +665,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { - println("WRITE TO FILE") // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt @@ -678,17 +672,13 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() - println("START LOOP\n\n\n") var count = 0 while(iter.hasNext) { - println("Before next()") val record = iter.next() count += 1 - println("Before write. Record = "+record) writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) - println("After write. Record = "+record) } - println("ALL DONE! Woohoo.") + writer.close() writer.commit() } diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala index 28a3b7ed64..857f8e3439 100644 --- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala @@ -25,36 +25,32 @@ import scala.collection.mutable * A simple map that spills sorted content to disk when the memory threshold is exceeded. A combiner * function must be specified to merge values back into memory during read. */ -class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V, +class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V, memoryThresholdMB: Int = 1024) - extends Iterable[(K,V)] with Serializable { + extends Iterable[(K, V)] with Serializable { - var currentMap = new AppendOnlyMap[K,V] + var currentMap = new AppendOnlyMap[K, V] var oldMaps = new ArrayBuffer[DiskKVIterator] def changeValue(key: K, updateFunc: (Boolean, V) => V): Unit = { currentMap.changeValue(key, updateFunc) val mapSize = SizeEstimator.estimate(currentMap) - //if (mapSize > memoryThresholdMB * math.pow(1024, 2)) { - if (mapSize > 1024 * 10) { + if (mapSize > memoryThresholdMB * math.pow(1024, 2)) { spill() } } def spill(): Unit = { - println("SPILL") val file = File.createTempFile("external_append_only_map", "") // Add spill location val out = new ObjectOutputStream(new FileOutputStream(file)) val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) - sortedMap foreach { - out.writeObject( _ ) - } + sortedMap.foreach { out.writeObject( _ ) } out.close() - currentMap = new AppendOnlyMap[K,V] + currentMap = new AppendOnlyMap[K, V] oldMaps.append(new DiskKVIterator(file)) } - override def iterator: Iterator[(K,V)] = new ExternalIterator() + override def iterator: Iterator[(K, V)] = new ExternalIterator() /** * An iterator that merges KV pairs from memory and disk in sorted order @@ -67,49 +63,62 @@ class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V, } val pq = mutable.PriorityQueue[KVITuple]() val inputStreams = Seq(new MemoryKVIterator(currentMap)) ++ oldMaps - inputStreams foreach { readFromIterator } + inputStreams.foreach { readFromIterator( _ ) } override def hasNext: Boolean = !pq.isEmpty + // Combine all values from all input streams corresponding to the same key override def next(): (K,V) = { - println("ExternalIterator.next - How many left? "+pq.length) val minKVI = pq.dequeue() - var (minKey, minValue, minIter) = (minKVI.key, minKVI.value, minKVI.iter) -// println("Min key = "+minKey) - readFromIterator(minIter) - while (!pq.isEmpty && pq.head.key == minKey) { - val newKVI = pq.dequeue() - val (newValue, newIter) = (newKVI.value, newKVI.iter) -// println("\tfound new value to merge! "+newValue) -// println("\tcombinerFunction("+minValue+" <====> "+newValue+")") - minValue = combinerFunction(minValue, newValue) -// println("\tCombine complete! New value = "+minValue) - readFromIterator(newIter) + var (minKey, minValue) = (minKVI.key, minKVI.value) + val minHash = minKey.hashCode() + readFromIterator(minKVI.iter) + + var collidedKVI = ArrayBuffer[KVITuple]() + while (!pq.isEmpty && pq.head.key.hashCode() == minHash) { + val newKVI: KVITuple = pq.dequeue() + if (newKVI.key == minKey){ + minValue = combineFunction(minValue, newKVI.value) + readFromIterator(newKVI.iter) + } else { + // Collision + collidedKVI += newKVI + } } - println("Returning minKey = "+minKey+", minValue = "+minValue) + collidedKVI.foreach { pq.enqueue( _ ) } (minKey, minValue) } - def readFromIterator(iter: Iterator[(K,V)]): Unit = { - if (iter.hasNext) { + // Read from the given iterator until a key of different hash is retrieved, + // Add each KV pair read from this iterator to the heap + def readFromIterator(iter: Iterator[(K, V)]): Unit = { + var minHash : Option[Int] = None + while (iter.hasNext) { val (k, v) = iter.next() pq.enqueue(KVITuple(k, v, iter)) + minHash match { + case None => minHash = Some(k.hashCode()) + case Some(expectedHash) => + if (k.hashCode() != expectedHash){ + return + } + } } } - case class KVITuple(key:K, value:V, iter:Iterator[(K,V)]) + case class KVITuple(key:K, value:V, iter:Iterator[(K, V)]) } - class MemoryKVIterator(map: AppendOnlyMap[K,V]) extends Iterator[(K,V)] { + class MemoryKVIterator(map: AppendOnlyMap[K, V]) extends Iterator[(K, V)] { val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) val it = sortedMap.iterator override def hasNext: Boolean = it.hasNext - override def next(): (K,V) = it.next() + override def next(): (K, V) = it.next() } - class DiskKVIterator(file: File) extends Iterator[(K,V)] { + class DiskKVIterator(file: File) extends Iterator[(K, V)] { val in = new ObjectInputStream(new FileInputStream(file)) - var nextItem:(K,V) = _ + var nextItem:(K, V) = _ var eof = false override def hasNext: Boolean = { @@ -117,7 +126,7 @@ class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V, return false } try { - nextItem = in.readObject().asInstanceOf[(K,V)] + nextItem = in.readObject().asInstanceOf[(K, V)] } catch { case e: EOFException => eof = true @@ -126,7 +135,7 @@ class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V, true } - override def next(): (K,V) = { + override def next(): (K, V) = { if (eof) { throw new NoSuchElementException }