From ec8c5dc644ce97c8cf6e13ba2b216ddbe16e9e0a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 26 Dec 2013 21:22:38 -0800 Subject: [PATCH] Sort AppendOnlyMap in-place --- .../spark/util/collection/AppendOnlyMap.scala | 33 +++++++++++++++++++ .../collection/ExternalAppendOnlyMap.scala | 32 ++++++++++-------- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index cb0ca8f8c1..38f3c556ae 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -17,6 +17,8 @@ package org.apache.spark.util.collection +import java.util + /** * A simple open hash table optimized for the append-only use case, where keys * are never removed, but the value for each key may be changed. @@ -234,4 +236,35 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi val highBit = Integer.highestOneBit(n) if (highBit == n) n else highBit << 1 } + + // Return an iterator of the map in sorted order. + // Note that the validity of the map is no longer preserved. + def destructiveSortedIterator(ord: Ordering[(K, V)]): Iterator[(K, V)] = { + var keyIndex, newIndex = 0 + while (keyIndex < capacity) { + if (data(2 * keyIndex) != null) { + data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1)) + newIndex += 1 + } + keyIndex += 1 + } + // sort + assert(newIndex == curSize) + val rawOrdering = new Ordering[AnyRef] { + def compare(x: AnyRef, y: AnyRef): Int ={ + ord.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) + } + } + util.Arrays.sort(data, 0, curSize, rawOrdering) + + new Iterator[(K, V)] { + var i = 0 + def hasNext = i < curSize + def next(): (K, V) = { + val item = data(i).asInstanceOf[(K, V)] + i += 1 + item + } + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 4bda763ffe..ed8b1d36a9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -88,6 +88,7 @@ class SpillableAppendOnlyMap[K, V, M, C]( val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat bufferSize * bufferPercent } + val KMOrdering: Ordering[(K, M)] = Ordering.by(km => km._1.hashCode()) def insert(key: K, value: V): Unit = { val update: (Boolean, M) => M = (hadVal, oldVal) => { @@ -100,10 +101,14 @@ class SpillableAppendOnlyMap[K, V, M, C]( } def spill(): Unit = { + println("******************* SPILL *********************") val file = File.createTempFile("external_append_only_map", "") val out = new ObjectOutputStream(new FileOutputStream(file)) - val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) - sortedMap.foreach(out.writeObject) + val it = currentMap.destructiveSortedIterator(KMOrdering) + while (it.hasNext) { + val kv = it.next() + out.writeObject(kv) + } out.close() currentMap = new SizeTrackingAppendOnlyMap[K, M] oldMaps.append(new DiskIterator(file)) @@ -115,8 +120,8 @@ class SpillableAppendOnlyMap[K, V, M, C]( class ExternalIterator extends Iterator[(K, C)] { // Order by key hash value - val pq = PriorityQueue[KMITuple]()(Ordering.by(_.key.hashCode())) - val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps + val pq = new PriorityQueue[KMITuple] + val inputStreams = Seq(currentMap.destructiveSortedIterator(KMOrdering)) ++ oldMaps inputStreams.foreach(readFromIterator) // Read from the given iterator until a key of different hash is retrieved @@ -127,7 +132,10 @@ class SpillableAppendOnlyMap[K, V, M, C]( pq.enqueue(KMITuple(k, m, it)) minHash match { case None => minHash = Some(k.hashCode()) - case Some(expectedHash) if k.hashCode() != expectedHash => return + case Some(expectedHash) => + if (k.hashCode() != expectedHash) { + return + } } } } @@ -156,15 +164,11 @@ class SpillableAppendOnlyMap[K, V, M, C]( (minKey, createCombiner(minGroup)) } - case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)]) - } - - // Iterate through (K, M) pairs in sorted order from the in-memory map - class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] { - val sortedMap = currentMap.iterator.toList.sortBy(km => km._1.hashCode()) - val it = sortedMap.iterator - override def hasNext: Boolean = it.hasNext - override def next(): (K, M) = it.next() + case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)]) extends Ordered[KMITuple] { + def compare(other: KMITuple): Int = { + -key.hashCode().compareTo(other.key.hashCode()) + } + } } // Iterate through (K, M) pairs in sorted order from an on-disk map