[SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark
This patch just replaces a native quick sorter with Sorter(TimSort) in Spark. It could get performance gains by ~8% in my quick experiments. Author: Takeshi Yamamuro <linguin.m.s@gmail.com> Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the following commits: 8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import 3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark
This commit is contained in:
parent
e895e0cbec
commit
2e6b736b0e
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.graphx
|
||||
|
||||
import org.apache.spark.util.collection.SortDataFormat
|
||||
|
||||
/**
|
||||
* A single directed edge consisting of a source id, target id,
|
||||
* and the data associated with the edge.
|
||||
|
@ -65,4 +67,32 @@ object Edge {
|
|||
else 1
|
||||
}
|
||||
}
|
||||
|
||||
private[graphx] def edgeArraySortDataFormat[ED] = new SortDataFormat[Edge[ED], Array[Edge[ED]]] {
|
||||
override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = {
|
||||
data(pos)
|
||||
}
|
||||
|
||||
override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = {
|
||||
val tmp = data(pos0)
|
||||
data(pos0) = data(pos1)
|
||||
data(pos1) = tmp
|
||||
}
|
||||
|
||||
override def copyElement(
|
||||
src: Array[Edge[ED]], srcPos: Int,
|
||||
dst: Array[Edge[ED]], dstPos: Int) {
|
||||
dst(dstPos) = src(srcPos)
|
||||
}
|
||||
|
||||
override def copyRange(
|
||||
src: Array[Edge[ED]], srcPos: Int,
|
||||
dst: Array[Edge[ED]], dstPos: Int, length: Int) {
|
||||
System.arraycopy(src, srcPos, dst, dstPos, length)
|
||||
}
|
||||
|
||||
override def allocate(length: Int): Array[Edge[ED]] = {
|
||||
new Array[Edge[ED]](length)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,12 +18,10 @@
|
|||
package org.apache.spark.graphx.impl
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.Sorting
|
||||
|
||||
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
|
||||
|
||||
import org.apache.spark.graphx._
|
||||
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
|
||||
import org.apache.spark.util.collection.{SortDataFormat, Sorter, PrimitiveVector}
|
||||
|
||||
/** Constructs an EdgePartition from scratch. */
|
||||
private[graphx]
|
||||
|
@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
|
|||
|
||||
def toEdgePartition: EdgePartition[ED, VD] = {
|
||||
val edgeArray = edges.trim().array
|
||||
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
|
||||
new Sorter(Edge.edgeArraySortDataFormat[ED])
|
||||
.sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
|
||||
val localSrcIds = new Array[Int](edgeArray.size)
|
||||
val localDstIds = new Array[Int](edgeArray.size)
|
||||
val data = new Array[ED](edgeArray.size)
|
||||
|
@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[
|
|||
|
||||
def toEdgePartition: EdgePartition[ED, VD] = {
|
||||
val edgeArray = edges.trim().array
|
||||
Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering)
|
||||
new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED])
|
||||
.sort(edgeArray, 0, edgeArray.length, EdgeWithLocalIds.lexicographicOrdering)
|
||||
val localSrcIds = new Array[Int](edgeArray.size)
|
||||
val localDstIds = new Array[Int](edgeArray.size)
|
||||
val data = new Array[ED](edgeArray.size)
|
||||
|
@ -140,4 +140,33 @@ private[impl] object EdgeWithLocalIds {
|
|||
}
|
||||
}
|
||||
|
||||
private[graphx] def edgeArraySortDataFormat[ED]
|
||||
= new SortDataFormat[EdgeWithLocalIds[ED], Array[EdgeWithLocalIds[ED]]] {
|
||||
override def getKey(
|
||||
data: Array[EdgeWithLocalIds[ED]], pos: Int): EdgeWithLocalIds[ED] = {
|
||||
data(pos)
|
||||
}
|
||||
|
||||
override def swap(data: Array[EdgeWithLocalIds[ED]], pos0: Int, pos1: Int): Unit = {
|
||||
val tmp = data(pos0)
|
||||
data(pos0) = data(pos1)
|
||||
data(pos1) = tmp
|
||||
}
|
||||
|
||||
override def copyElement(
|
||||
src: Array[EdgeWithLocalIds[ED]], srcPos: Int,
|
||||
dst: Array[EdgeWithLocalIds[ED]], dstPos: Int) {
|
||||
dst(dstPos) = src(srcPos)
|
||||
}
|
||||
|
||||
override def copyRange(
|
||||
src: Array[EdgeWithLocalIds[ED]], srcPos: Int,
|
||||
dst: Array[EdgeWithLocalIds[ED]], dstPos: Int, length: Int) {
|
||||
System.arraycopy(src, srcPos, dst, dstPos, length)
|
||||
}
|
||||
|
||||
override def allocate(length: Int): Array[EdgeWithLocalIds[ED]] = {
|
||||
new Array[EdgeWithLocalIds[ED]](length)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue