[SPARK-18973][SQL] Remove SortPartitions and RedistributeData
## What changes were proposed in this pull request? SortPartitions and RedistributeData logical operators are not actually used and can be removed. Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions. ## How was this patch tested? Also updated test cases to reflect the removal. Author: Reynold Xin <rxin@databricks.com> Closes #16381 from rxin/SPARK-18973.
This commit is contained in:
parent
76622c661f
commit
2615100055
|
@ -1116,7 +1116,7 @@ class Analyzer(
|
|||
case p: Sort =>
|
||||
failOnOuterReference(p)
|
||||
p
|
||||
case p: RedistributeData =>
|
||||
case p: RepartitionByExpression =>
|
||||
failOnOuterReference(p)
|
||||
p
|
||||
|
||||
|
|
|
@ -166,7 +166,7 @@ object UnsupportedOperationChecker {
|
|||
case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
|
||||
throwError("Limits are not supported on streaming DataFrames/Datasets")
|
||||
|
||||
case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
|
||||
case Sort(_, _, _) if !containsCompleteData(subPlan) =>
|
||||
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
|
||||
"aggregated DataFrame/Dataset in Complete output mode")
|
||||
|
||||
|
|
|
@ -796,7 +796,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
|
|||
case _: Distinct => true
|
||||
case _: Generate => true
|
||||
case _: Pivot => true
|
||||
case _: RedistributeData => true
|
||||
case _: RepartitionByExpression => true
|
||||
case _: Repartition => true
|
||||
case _: ScriptTransformation => true
|
||||
case _: Sort => true
|
||||
|
|
|
@ -489,7 +489,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
|
|||
case _: AppendColumns => true
|
||||
case _: AppendColumnsWithObject => true
|
||||
case _: BroadcastHint => true
|
||||
case _: RedistributeData => true
|
||||
case _: RepartitionByExpression => true
|
||||
case _: Repartition => true
|
||||
case _: Sort => true
|
||||
case _: TypedFilter => true
|
||||
|
|
|
@ -764,6 +764,28 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
|
|||
override def output: Seq[Attribute] = child.output
|
||||
}
|
||||
|
||||
/**
|
||||
* This method repartitions data using [[Expression]]s into `numPartitions`, and receives
|
||||
* information about the number of partitions during execution. Used when a specific ordering or
|
||||
* distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
|
||||
* `coalesce` and `repartition`.
|
||||
* If `numPartitions` is not specified, the number of partitions will be the number set by
|
||||
* `spark.sql.shuffle.partitions`.
|
||||
*/
|
||||
case class RepartitionByExpression(
|
||||
partitionExpressions: Seq[Expression],
|
||||
child: LogicalPlan,
|
||||
numPartitions: Option[Int] = None) extends UnaryNode {
|
||||
|
||||
numPartitions match {
|
||||
case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.")
|
||||
case None => // Ok
|
||||
}
|
||||
|
||||
override def maxRows: Option[Long] = child.maxRows
|
||||
override def output: Seq[Attribute] = child.output
|
||||
}
|
||||
|
||||
/**
|
||||
* A relation with one row. This is used in "SELECT ..." without a from clause.
|
||||
*/
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.catalyst.plans.logical
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder}
|
||||
|
||||
/**
|
||||
* Performs a physical redistribution of the data. Used when the consumer of the query
|
||||
* result have expectations about the distribution and ordering of partitioned input data.
|
||||
*/
|
||||
abstract class RedistributeData extends UnaryNode {
|
||||
override def output: Seq[Attribute] = child.output
|
||||
}
|
||||
|
||||
case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
|
||||
extends RedistributeData
|
||||
|
||||
/**
|
||||
* This method repartitions data using [[Expression]]s into `numPartitions`, and receives
|
||||
* information about the number of partitions during execution. Used when a specific ordering or
|
||||
* distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
|
||||
* `coalesce` and `repartition`.
|
||||
* If `numPartitions` is not specified, the number of partitions will be the number set by
|
||||
* `spark.sql.shuffle.partitions`.
|
||||
*/
|
||||
case class RepartitionByExpression(
|
||||
partitionExpressions: Seq[Expression],
|
||||
child: LogicalPlan,
|
||||
numPartitions: Option[Int] = None) extends RedistributeData {
|
||||
numPartitions match {
|
||||
case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.")
|
||||
case None => // Ok
|
||||
}
|
||||
}
|
|
@ -213,7 +213,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
|
|||
|
||||
|
||||
// Other unary operations
|
||||
testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
|
||||
testUnaryOperatorInStreamingPlan(
|
||||
"sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
|
||||
testUnaryOperatorInStreamingPlan(
|
||||
|
|
|
@ -376,10 +376,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
} else {
|
||||
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
|
||||
}
|
||||
case logical.SortPartitions(sortExprs, child) =>
|
||||
// This sort only sorts tuples within a partition. Its requiredDistribution will be
|
||||
// an UnspecifiedDistribution.
|
||||
execution.SortExec(sortExprs, global = false, child = planLater(child)) :: Nil
|
||||
case logical.Sort(sortExprs, global, child) =>
|
||||
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
|
||||
case logical.Project(projectList, child) =>
|
||||
|
|
Loading…
Reference in a new issue