From 9513d82edd5f52e791ae811e331b5d1a77895ec5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 7 Mar 2019 08:47:52 +0800 Subject: [PATCH] [SPARK-27057][SQL] Common trait for limit exec operators ## What changes were proposed in this pull request? I would like to refactor `limit.scala` slightly and introduce common trait `LimitExec` for `CollectLimitExec` and `BaseLimitExec` (`LocalLimitExec` and `GlobalLimitExec`). This will allow to distinguish those operators from others, and to get the `limit` value without casting to concrete class. ## How was this patch tested? by existing test suites. Closes #23976 from MaxGekk/limit-exec. Authored-by: Maxim Gekk Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/execution/limit.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 56973af8fd..2ff08883d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -27,13 +27,21 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} +/** + * The operator takes limited number of elements from its child operator. + */ +trait LimitExec extends UnaryExecNode { + /** Number of element should be taken from child operator */ + def limit: Int +} + /** * Take the first `limit` elements and collect them to a single partition. * * This operator will be used when a logical `Limit` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) @@ -70,8 +78,7 @@ object BaseLimitExec { * Helper trait which defines methods that are shared by both * [[LocalLimitExec]] and [[GlobalLimitExec]]. */ -trait BaseLimitExec extends UnaryExecNode with CodegenSupport { - val limit: Int +trait BaseLimitExec extends LimitExec with CodegenSupport { override def output: Seq[Attribute] = child.output protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>