Sort AppendOnlyMap in-place

This commit is contained in:
Andrew Or 2013-12-26 21:22:38 -08:00
parent a515706d9c
commit ec8c5dc644
2 changed files with 51 additions and 14 deletions

View file

@ -17,6 +17,8 @@
package org.apache.spark.util.collection
import java.util
/**
* A simple open hash table optimized for the append-only use case, where keys
* are never removed, but the value for each key may be changed.
@ -234,4 +236,35 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
val highBit = Integer.highestOneBit(n)
if (highBit == n) n else highBit << 1
}
// Return an iterator of the map in sorted order.
// Note that the validity of the map is no longer preserved.
def destructiveSortedIterator(ord: Ordering[(K, V)]): Iterator[(K, V)] = {
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
newIndex += 1
}
keyIndex += 1
}
// sort
assert(newIndex == curSize)
val rawOrdering = new Ordering[AnyRef] {
def compare(x: AnyRef, y: AnyRef): Int ={
ord.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
}
}
util.Arrays.sort(data, 0, curSize, rawOrdering)
new Iterator[(K, V)] {
var i = 0
def hasNext = i < curSize
def next(): (K, V) = {
val item = data(i).asInstanceOf[(K, V)]
i += 1
item
}
}
}
}

View file

@ -88,6 +88,7 @@ class SpillableAppendOnlyMap[K, V, M, C](
val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat
bufferSize * bufferPercent
}
val KMOrdering: Ordering[(K, M)] = Ordering.by(km => km._1.hashCode())
def insert(key: K, value: V): Unit = {
val update: (Boolean, M) => M = (hadVal, oldVal) => {
@ -100,10 +101,14 @@ class SpillableAppendOnlyMap[K, V, M, C](
}
def spill(): Unit = {
println("******************* SPILL *********************")
val file = File.createTempFile("external_append_only_map", "")
val out = new ObjectOutputStream(new FileOutputStream(file))
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
sortedMap.foreach(out.writeObject)
val it = currentMap.destructiveSortedIterator(KMOrdering)
while (it.hasNext) {
val kv = it.next()
out.writeObject(kv)
}
out.close()
currentMap = new SizeTrackingAppendOnlyMap[K, M]
oldMaps.append(new DiskIterator(file))
@ -115,8 +120,8 @@ class SpillableAppendOnlyMap[K, V, M, C](
class ExternalIterator extends Iterator[(K, C)] {
// Order by key hash value
val pq = PriorityQueue[KMITuple]()(Ordering.by(_.key.hashCode()))
val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps
val pq = new PriorityQueue[KMITuple]
val inputStreams = Seq(currentMap.destructiveSortedIterator(KMOrdering)) ++ oldMaps
inputStreams.foreach(readFromIterator)
// Read from the given iterator until a key of different hash is retrieved
@ -127,7 +132,10 @@ class SpillableAppendOnlyMap[K, V, M, C](
pq.enqueue(KMITuple(k, m, it))
minHash match {
case None => minHash = Some(k.hashCode())
case Some(expectedHash) if k.hashCode() != expectedHash => return
case Some(expectedHash) =>
if (k.hashCode() != expectedHash) {
return
}
}
}
}
@ -156,15 +164,11 @@ class SpillableAppendOnlyMap[K, V, M, C](
(minKey, createCombiner(minGroup))
}
case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)])
}
// Iterate through (K, M) pairs in sorted order from the in-memory map
class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] {
val sortedMap = currentMap.iterator.toList.sortBy(km => km._1.hashCode())
val it = sortedMap.iterator
override def hasNext: Boolean = it.hasNext
override def next(): (K, M) = it.next()
case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)]) extends Ordered[KMITuple] {
def compare(other: KMITuple): Int = {
-key.hashCode().compareTo(other.key.hashCode())
}
}
}
// Iterate through (K, M) pairs in sorted order from an on-disk map