[SPARK-34853][SQL] Remove duplicated definition of output partitioning/ordering for limit operator
### What changes were proposed in this pull request? Both local limit and global limit define the output partitioning and output ordering in the same way and this is duplicated (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L159-L175 ). We can move the output partitioning and ordering into their parent trait - `BaseLimitExec`. This is doable as `BaseLimitExec` has no more other child class. This is a minor code refactoring. ### Why are the changes needed? Clean up the code a little bit. Better readability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pure refactoring. Rely on existing unit tests. Closes #31950 from c21/limit-cleanup. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
parent
8ed5808f64
commit
35c70e417d
|
@ -113,6 +113,10 @@ object BaseLimitExec {
|
||||||
trait BaseLimitExec extends LimitExec with CodegenSupport {
|
trait BaseLimitExec extends LimitExec with CodegenSupport {
|
||||||
override def output: Seq[Attribute] = child.output
|
override def output: Seq[Attribute] = child.output
|
||||||
|
|
||||||
|
override def outputPartitioning: Partitioning = child.outputPartitioning
|
||||||
|
|
||||||
|
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
|
||||||
|
|
||||||
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
|
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
|
||||||
iter.take(limit)
|
iter.take(limit)
|
||||||
}
|
}
|
||||||
|
@ -156,12 +160,7 @@ trait BaseLimitExec extends LimitExec with CodegenSupport {
|
||||||
/**
|
/**
|
||||||
* Take the first `limit` elements of each child partition, but do not collect or shuffle them.
|
* Take the first `limit` elements of each child partition, but do not collect or shuffle them.
|
||||||
*/
|
*/
|
||||||
case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
|
case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec
|
||||||
|
|
||||||
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
|
|
||||||
|
|
||||||
override def outputPartitioning: Partitioning = child.outputPartitioning
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take the first `limit` elements of the child's single output partition.
|
* Take the first `limit` elements of the child's single output partition.
|
||||||
|
@ -169,10 +168,6 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
|
||||||
case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
|
case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
|
||||||
|
|
||||||
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
|
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
|
||||||
|
|
||||||
override def outputPartitioning: Partitioning = child.outputPartitioning
|
|
||||||
|
|
||||||
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue