[SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.
The pull request includes two changes: 1. Removes SortOrder introduced by SPARK-2125. The key ordering already includes the SortOrder information since an Ordering can be reverse. This is similar to Java's Comparator interface. Rarely does an API accept both a Comparator as well as a SortOrder. 2. Replaces the sortWith call in HashShuffleReader with an in-place quick sort. Author: Reynold Xin <rxin@apache.org> Closes #1631 from rxin/sortOrder and squashes the following commits: c9d37e1 [Reynold Xin] [SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.
This commit is contained in:
parent
92ef02626e
commit
96ba04bbf9
|
@ -19,7 +19,6 @@ package org.apache.spark
|
|||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.rdd.SortOrder.SortOrder
|
||||
import org.apache.spark.serializer.Serializer
|
||||
import org.apache.spark.shuffle.ShuffleHandle
|
||||
|
||||
|
@ -63,8 +62,7 @@ class ShuffleDependency[K, V, C](
|
|||
val serializer: Option[Serializer] = None,
|
||||
val keyOrdering: Option[Ordering[K]] = None,
|
||||
val aggregator: Option[Aggregator[K, V, C]] = None,
|
||||
val mapSideCombine: Boolean = false,
|
||||
val sortOrder: Option[SortOrder] = None)
|
||||
val mapSideCombine: Boolean = false)
|
||||
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
|
||||
|
||||
val shuffleId: Int = rdd.context.newShuffleId()
|
||||
|
|
|
@ -58,12 +58,6 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
|
|||
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
|
||||
val part = new RangePartitioner(numPartitions, self, ascending)
|
||||
new ShuffledRDD[K, V, V, P](self, part)
|
||||
.setKeyOrdering(ordering)
|
||||
.setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
|
||||
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
|
||||
}
|
||||
}
|
||||
|
||||
private[spark] object SortOrder extends Enumeration {
|
||||
type SortOrder = Value
|
||||
val ASCENDING, DESCENDING = Value
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import scala.reflect.ClassTag
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.rdd.SortOrder.SortOrder
|
||||
import org.apache.spark.serializer.Serializer
|
||||
|
||||
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
|
||||
|
@ -52,8 +51,6 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
|
|||
|
||||
private var mapSideCombine: Boolean = false
|
||||
|
||||
private var sortOrder: Option[SortOrder] = None
|
||||
|
||||
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
|
||||
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
|
||||
this.serializer = Option(serializer)
|
||||
|
@ -78,15 +75,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
|
|||
this
|
||||
}
|
||||
|
||||
/** Set sort order for RDD's sorting. */
|
||||
def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = {
|
||||
this.sortOrder = Option(sortOrder)
|
||||
this
|
||||
}
|
||||
|
||||
override def getDependencies: Seq[Dependency[_]] = {
|
||||
List(new ShuffleDependency(prev, part, serializer,
|
||||
keyOrdering, aggregator, mapSideCombine, sortOrder))
|
||||
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
|
||||
}
|
||||
|
||||
override val partitioner = Some(part)
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.spark.shuffle.hash
|
||||
|
||||
import org.apache.spark.{InterruptibleIterator, TaskContext}
|
||||
import org.apache.spark.rdd.SortOrder
|
||||
import org.apache.spark.serializer.Serializer
|
||||
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
|
||||
|
||||
|
@ -51,16 +50,22 @@ class HashShuffleReader[K, C](
|
|||
iter
|
||||
}
|
||||
|
||||
val sortedIter = for (sortOrder <- dep.sortOrder; ordering <- dep.keyOrdering) yield {
|
||||
val buf = aggregatedIter.toArray
|
||||
if (sortOrder == SortOrder.ASCENDING) {
|
||||
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
|
||||
} else {
|
||||
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
|
||||
}
|
||||
// Sort the output if there is a sort ordering defined.
|
||||
dep.keyOrdering match {
|
||||
case Some(keyOrd: Ordering[K]) =>
|
||||
// Define a Comparator for the whole record based on the key Ordering.
|
||||
val cmp = new Ordering[Product2[K, C]] {
|
||||
override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = {
|
||||
keyOrd.compare(o1._1, o2._1)
|
||||
}
|
||||
}
|
||||
val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray
|
||||
// TODO: do external sort.
|
||||
scala.util.Sorting.quickSort(sortBuffer)(cmp)
|
||||
sortBuffer.iterator
|
||||
case None =>
|
||||
aggregatedIter
|
||||
}
|
||||
|
||||
sortedIter.getOrElse(aggregatedIter)
|
||||
}
|
||||
|
||||
/** Close this reader */
|
||||
|
|
Loading…
Reference in a new issue