[SPARK-27057][SQL] Common trait for limit exec operators

## What changes were proposed in this pull request?

I would like to refactor `limit.scala` slightly and introduce common trait `LimitExec` for `CollectLimitExec` and `BaseLimitExec` (`LocalLimitExec` and `GlobalLimitExec`). This will allow to distinguish those operators from others, and to get the `limit` value without casting to concrete class.

## How was this patch tested?

by existing test suites.

Closes #23976 from MaxGekk/limit-exec.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Maxim Gekk 2019-03-07 08:47:52 +08:00 committed by Wenchen Fan
parent 62fd133f74
commit 9513d82edd

View file

@ -27,13 +27,21 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
/**
* The operator takes limited number of elements from its child operator.
*/
trait LimitExec extends UnaryExecNode {
/** Number of element should be taken from child operator */
def limit: Int
}
/**
* Take the first `limit` elements and collect them to a single partition.
*
* This operator will be used when a logical `Limit` operation is the final operator in an
* logical plan, which happens when the user is collecting results back to the driver.
*/
case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
@ -70,8 +78,7 @@ object BaseLimitExec {
* Helper trait which defines methods that are shared by both
* [[LocalLimitExec]] and [[GlobalLimitExec]].
*/
trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
trait BaseLimitExec extends LimitExec with CodegenSupport {
override def output: Seq[Attribute] = child.output
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>