Use Comparator instead of Ordering
lower object creation costs
This commit is contained in:
parent
8fbff9f5d0
commit
e3cac47e65
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.spark.util.collection
|
package org.apache.spark.util.collection
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
|
import java.util.Comparator
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A simple open hash table optimized for the append-only use case, where keys
|
* A simple open hash table optimized for the append-only use case, where keys
|
||||||
|
@ -240,7 +241,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
|
||||||
/** Return an iterator of the map in sorted order. This provides a way to sort the map without
|
/** Return an iterator of the map in sorted order. This provides a way to sort the map without
|
||||||
* using additional memory, at the expense of destroying the validity of the map.
|
* using additional memory, at the expense of destroying the validity of the map.
|
||||||
*/
|
*/
|
||||||
def destructiveSortedIterator(ordering: Ordering[(K, V)]): Iterator[(K, V)] = {
|
def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
|
||||||
var keyIndex, newIndex = 0
|
var keyIndex, newIndex = 0
|
||||||
// Pack KV pairs into the front of the underlying array
|
// Pack KV pairs into the front of the underlying array
|
||||||
while (keyIndex < capacity) {
|
while (keyIndex < capacity) {
|
||||||
|
@ -252,9 +253,9 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
|
||||||
}
|
}
|
||||||
assert(newIndex == curSize)
|
assert(newIndex == curSize)
|
||||||
// Sort by the given ordering
|
// Sort by the given ordering
|
||||||
val rawOrdering = new Ordering[AnyRef] {
|
val rawOrdering = new Comparator[AnyRef] {
|
||||||
def compare(x: AnyRef, y: AnyRef): Int = {
|
def compare(x: AnyRef, y: AnyRef): Int = {
|
||||||
ordering.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
|
cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
util.Arrays.sort(data, 0, curSize, rawOrdering)
|
util.Arrays.sort(data, 0, curSize, rawOrdering)
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.spark.util.collection
|
package org.apache.spark.util.collection
|
||||||
|
|
||||||
import java.io._
|
import java.io._
|
||||||
|
import java.util.Comparator
|
||||||
|
|
||||||
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
|
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
@ -113,7 +114,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
|
||||||
val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat
|
val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat
|
||||||
bufferSize * bufferPercent
|
bufferSize * bufferPercent
|
||||||
}
|
}
|
||||||
private val ordering = new KeyGroupOrdering[K, G]
|
private val comparator = new KeyGroupComparator[K, G]
|
||||||
private val ser = serializer.newInstance()
|
private val ser = serializer.newInstance()
|
||||||
|
|
||||||
def insert(key: K, value: V): Unit = {
|
def insert(key: K, value: V): Unit = {
|
||||||
|
@ -129,7 +130,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
|
||||||
private def spill(): Unit = {
|
private def spill(): Unit = {
|
||||||
val file = File.createTempFile("external_append_only_map", "")
|
val file = File.createTempFile("external_append_only_map", "")
|
||||||
val out = ser.serializeStream(new FileOutputStream(file))
|
val out = ser.serializeStream(new FileOutputStream(file))
|
||||||
val it = currentMap.destructiveSortedIterator(ordering)
|
val it = currentMap.destructiveSortedIterator(comparator)
|
||||||
while (it.hasNext) {
|
while (it.hasNext) {
|
||||||
val kv = it.next()
|
val kv = it.next()
|
||||||
out.writeObject(kv)
|
out.writeObject(kv)
|
||||||
|
@ -150,7 +151,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
|
||||||
// An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs
|
// An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs
|
||||||
private class ExternalIterator extends Iterator[(K, C)] {
|
private class ExternalIterator extends Iterator[(K, C)] {
|
||||||
val mergeHeap = new PriorityQueue[KGITuple]
|
val mergeHeap = new PriorityQueue[KGITuple]
|
||||||
val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(ordering))
|
val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(comparator))
|
||||||
|
|
||||||
// Invariant: size of mergeHeap == number of input streams
|
// Invariant: size of mergeHeap == number of input streams
|
||||||
inputStreams.foreach{ it =>
|
inputStreams.foreach{ it =>
|
||||||
|
@ -241,7 +242,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
|
||||||
}
|
}
|
||||||
|
|
||||||
case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
|
case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
|
||||||
extends Ordered[KGITuple] {
|
extends Comparable[KGITuple] {
|
||||||
|
|
||||||
// Invariant: pairs are ordered by key hash
|
// Invariant: pairs are ordered by key hash
|
||||||
def minHash: Int = {
|
def minHash: Int = {
|
||||||
|
@ -252,7 +253,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def compare(other: KGITuple): Int = {
|
override def compareTo(other: KGITuple): Int = {
|
||||||
// mutable.PriorityQueue dequeues the max, not the min
|
// mutable.PriorityQueue dequeues the max, not the min
|
||||||
-minHash.compareTo(other.minHash)
|
-minHash.compareTo(other.minHash)
|
||||||
}
|
}
|
||||||
|
@ -299,7 +300,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
|
||||||
}
|
}
|
||||||
|
|
||||||
private[spark] object SpillableAppendOnlyMap {
|
private[spark] object SpillableAppendOnlyMap {
|
||||||
private class KeyGroupOrdering[K, G] extends Ordering[(K, G)] {
|
private class KeyGroupComparator[K, G] extends Comparator[(K, G)] {
|
||||||
def compare(kg1: (K, G), kg2: (K, G)): Int = {
|
def compare(kg1: (K, G), kg2: (K, G)): Int = {
|
||||||
kg1._1.hashCode().compareTo(kg2._1.hashCode())
|
kg1._1.hashCode().compareTo(kg2._1.hashCode())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue