From 96ba04bbf917bcb971dd0d8cd1e1766dbe9366e8 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 29 Jul 2014 01:12:44 -0700 Subject: [PATCH] [SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort. The pull request includes two changes: 1. Removes SortOrder introduced by SPARK-2125. The key ordering already includes the SortOrder information since an Ordering can be reverse. This is similar to Java's Comparator interface. Rarely does an API accept both a Comparator as well as a SortOrder. 2. Replaces the sortWith call in HashShuffleReader with an in-place quick sort. Author: Reynold Xin Closes #1631 from rxin/sortOrder and squashes the following commits: c9d37e1 [Reynold Xin] [SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort. --- .../scala/org/apache/spark/Dependency.scala | 4 +-- .../spark/rdd/OrderedRDDFunctions.scala | 8 +----- .../org/apache/spark/rdd/ShuffledRDD.scala | 12 +-------- .../shuffle/hash/HashShuffleReader.scala | 25 +++++++++++-------- 4 files changed, 18 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index f010c03223..09a6057123 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -19,7 +19,6 @@ package org.apache.spark import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.SortOrder.SortOrder import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleHandle @@ -63,8 +62,7 @@ class ShuffleDependency[K, V, C]( val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, - val mapSideCombine: Boolean = false, - val sortOrder: Option[SortOrder] = None) + val mapSideCombine: Boolean = false) extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) { val shuffleId: Int = rdd.context.newShuffleId() diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index afd7075f68..d85f962783 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -58,12 +58,6 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V, P](self, part) - .setKeyOrdering(ordering) - .setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING) + .setKeyOrdering(if (ascending) ordering else ordering.reverse) } } - -private[spark] object SortOrder extends Enumeration { - type SortOrder = Value - val ASCENDING, DESCENDING = Value -} diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index da4a8c3dc2..bf02f68d0d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -21,7 +21,6 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.SortOrder.SortOrder import org.apache.spark.serializer.Serializer private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { @@ -52,8 +51,6 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag]( private var mapSideCombine: Boolean = false - private var sortOrder: Option[SortOrder] = None - /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = { this.serializer = Option(serializer) @@ -78,15 +75,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag]( this } - /** Set sort order for RDD's sorting. */ - def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = { - this.sortOrder = Option(sortOrder) - this - } - override def getDependencies: Seq[Dependency[_]] = { - List(new ShuffleDependency(prev, part, serializer, - keyOrdering, aggregator, mapSideCombine, sortOrder)) + List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) } override val partitioner = Some(part) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 76cdb8f4f8..c8059496a1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -18,7 +18,6 @@ package org.apache.spark.shuffle.hash import org.apache.spark.{InterruptibleIterator, TaskContext} -import org.apache.spark.rdd.SortOrder import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader} @@ -51,16 +50,22 @@ class HashShuffleReader[K, C]( iter } - val sortedIter = for (sortOrder <- dep.sortOrder; ordering <- dep.keyOrdering) yield { - val buf = aggregatedIter.toArray - if (sortOrder == SortOrder.ASCENDING) { - buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator - } else { - buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator - } + // Sort the output if there is a sort ordering defined. + dep.keyOrdering match { + case Some(keyOrd: Ordering[K]) => + // Define a Comparator for the whole record based on the key Ordering. + val cmp = new Ordering[Product2[K, C]] { + override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = { + keyOrd.compare(o1._1, o2._1) + } + } + val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray + // TODO: do external sort. + scala.util.Sorting.quickSort(sortBuffer)(cmp) + sortBuffer.iterator + case None => + aggregatedIter } - - sortedIter.getOrElse(aggregatedIter) } /** Close this reader */