diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 7810119847..a32416afae 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import java.util +import java.util.Comparator /** * A simple open hash table optimized for the append-only use case, where keys @@ -240,7 +241,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi /** Return an iterator of the map in sorted order. This provides a way to sort the map without * using additional memory, at the expense of destroying the validity of the map. */ - def destructiveSortedIterator(ordering: Ordering[(K, V)]): Iterator[(K, V)] = { + def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = { var keyIndex, newIndex = 0 // Pack KV pairs into the front of the underlying array while (keyIndex < capacity) { @@ -252,9 +253,9 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } assert(newIndex == curSize) // Sort by the given ordering - val rawOrdering = new Ordering[AnyRef] { + val rawOrdering = new Comparator[AnyRef] { def compare(x: AnyRef, y: AnyRef): Int = { - ordering.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) + cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) } } util.Arrays.sort(data, 0, curSize, rawOrdering) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 1de545c05b..bbf96f71ce 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import java.io._ +import java.util.Comparator import scala.collection.mutable.{ArrayBuffer, PriorityQueue} import scala.reflect.ClassTag @@ -113,7 +114,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat bufferSize * bufferPercent } - private val ordering = new KeyGroupOrdering[K, G] + private val comparator = new KeyGroupComparator[K, G] private val ser = serializer.newInstance() def insert(key: K, value: V): Unit = { @@ -129,7 +130,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( private def spill(): Unit = { val file = File.createTempFile("external_append_only_map", "") val out = ser.serializeStream(new FileOutputStream(file)) - val it = currentMap.destructiveSortedIterator(ordering) + val it = currentMap.destructiveSortedIterator(comparator) while (it.hasNext) { val kv = it.next() out.writeObject(kv) @@ -150,7 +151,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs private class ExternalIterator extends Iterator[(K, C)] { val mergeHeap = new PriorityQueue[KGITuple] - val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(ordering)) + val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(comparator)) // Invariant: size of mergeHeap == number of input streams inputStreams.foreach{ it => @@ -241,7 +242,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)]) - extends Ordered[KGITuple] { + extends Comparable[KGITuple] { // Invariant: pairs are ordered by key hash def minHash: Int = { @@ -252,7 +253,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } } - def compare(other: KGITuple): Int = { + override def compareTo(other: KGITuple): Int = { // mutable.PriorityQueue dequeues the max, not the min -minHash.compareTo(other.minHash) } @@ -299,7 +300,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } private[spark] object SpillableAppendOnlyMap { - private class KeyGroupOrdering[K, G] extends Ordering[(K, G)] { + private class KeyGroupComparator[K, G] extends Comparator[(K, G)] { def compare(kg1: (K, G), kg2: (K, G)): Int = { kg1._1.hashCode().compareTo(kg2._1.hashCode()) }