[SPARK-9702] [SQL] Use Exchange to implement logical Repartition operator
This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`. Author: Josh Rosen <joshrosen@databricks.com> Author: Liang-Chi Hsieh <viirya@appier.com> Closes #8083 from JoshRosen/SPARK-9702.
This commit is contained in:
parent
37526aca24
commit
7e2e268289
|
@ -194,6 +194,22 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
|
|||
override def guarantees(other: Partitioning): Boolean = false
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a partitioning where rows are distributed evenly across output partitions
|
||||
* by starting from a random target partition number and distributing rows in a round-robin
|
||||
* fashion. This partitioning is used when implementing the DataFrame.repartition() operator.
|
||||
*/
|
||||
case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning {
|
||||
override def satisfies(required: Distribution): Boolean = required match {
|
||||
case UnspecifiedDistribution => true
|
||||
case _ => false
|
||||
}
|
||||
|
||||
override def compatibleWith(other: Partitioning): Boolean = false
|
||||
|
||||
override def guarantees(other: Partitioning): Boolean = false
|
||||
}
|
||||
|
||||
case object SinglePartition extends Partitioning {
|
||||
val numPartitions = 1
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql.execution
|
||||
|
||||
import java.util.Random
|
||||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.serializer.Serializer
|
||||
|
@ -31,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
|
|||
import org.apache.spark.sql.catalyst.plans.physical._
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.util.MutablePair
|
||||
import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
|
||||
import org.apache.spark._
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
|
@ -130,7 +132,6 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
|
|||
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
|
||||
|
||||
private val serializer: Serializer = {
|
||||
val rowDataTypes = child.output.map(_.dataType).toArray
|
||||
if (tungstenMode) {
|
||||
new UnsafeRowSerializer(child.output.size)
|
||||
} else {
|
||||
|
@ -141,6 +142,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
|
|||
protected override def doExecute(): RDD[InternalRow] = attachTree(this , "execute") {
|
||||
val rdd = child.execute()
|
||||
val part: Partitioner = newPartitioning match {
|
||||
case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
|
||||
case HashPartitioning(expressions, numPartitions) => new HashPartitioner(numPartitions)
|
||||
case RangePartitioning(sortingExpressions, numPartitions) =>
|
||||
// Internally, RangePartitioner runs a job on the RDD that samples keys to compute
|
||||
|
@ -162,7 +164,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
|
|||
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
|
||||
// TODO: Handle BroadcastPartitioning.
|
||||
}
|
||||
def getPartitionKeyExtractor(): InternalRow => InternalRow = newPartitioning match {
|
||||
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
|
||||
case RoundRobinPartitioning(numPartitions) =>
|
||||
// Distributes elements evenly across output partitions, starting from a random partition.
|
||||
var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
|
||||
(row: InternalRow) => {
|
||||
// The HashPartitioner will handle the `mod` by the number of partitions
|
||||
position += 1
|
||||
position
|
||||
}
|
||||
case HashPartitioning(expressions, _) => newMutableProjection(expressions, child.output)()
|
||||
case RangePartitioning(_, _) | SinglePartition => identity
|
||||
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
|
||||
|
|
|
@ -336,7 +336,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
throw new IllegalStateException(
|
||||
"logical distinct operator should have been replaced by aggregate in the optimizer")
|
||||
case logical.Repartition(numPartitions, shuffle, child) =>
|
||||
execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
|
||||
if (shuffle) {
|
||||
execution.Exchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
|
||||
} else {
|
||||
execution.Coalesce(numPartitions, planLater(child)) :: Nil
|
||||
}
|
||||
case logical.SortPartitions(sortExprs, child) =>
|
||||
// This sort only sorts tuples within a partition. Its requiredDistribution will be
|
||||
// an UnspecifiedDistribution.
|
||||
|
|
|
@ -17,21 +17,20 @@
|
|||
|
||||
package org.apache.spark.sql.execution
|
||||
|
||||
import java.util.Random
|
||||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD, ShuffledRDD}
|
||||
import org.apache.spark.serializer.Serializer
|
||||
import org.apache.spark.shuffle.sort.SortShuffleManager
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.CatalystTypeConverters
|
||||
import org.apache.spark.sql.catalyst.errors._
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.plans.physical._
|
||||
import org.apache.spark.sql.execution.metric.SQLMetrics
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.util.collection.ExternalSorter
|
||||
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
|
||||
import org.apache.spark.util.random.PoissonSampler
|
||||
import org.apache.spark.util.{CompletionIterator, MutablePair}
|
||||
import org.apache.spark.util.MutablePair
|
||||
import org.apache.spark.{HashPartitioner, SparkEnv}
|
||||
|
||||
/**
|
||||
|
@ -279,10 +278,12 @@ case class TakeOrderedAndProject(
|
|||
/**
|
||||
* :: DeveloperApi ::
|
||||
* Return a new RDD that has exactly `numPartitions` partitions.
|
||||
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
|
||||
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
|
||||
* the 100 new partitions will claim 10 of the current partitions.
|
||||
*/
|
||||
@DeveloperApi
|
||||
case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
|
||||
extends UnaryNode {
|
||||
case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
|
||||
override def output: Seq[Attribute] = child.output
|
||||
|
||||
override def outputPartitioning: Partitioning = {
|
||||
|
@ -291,11 +292,10 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
|
|||
}
|
||||
|
||||
protected override def doExecute(): RDD[InternalRow] = {
|
||||
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
|
||||
child.execute().map(_.copy()).coalesce(numPartitions, shuffle = false)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
* Returns a table with the elements from left that are not in right using
|
||||
|
|
Loading…
Reference in a new issue