[SPARK-19933][SQL] Do not change output of a subquery
## What changes were proposed in this pull request? The `RemoveRedundantAlias` rule can change the output attributes (the expression id's to be precise) of a query by eliminating the redundant alias producing them. This is no problem for a regular query, but can cause problems for correlated subqueries: The attributes produced by the subquery are used in the parent plan; changing them will break the parent plan. This PR fixes this by wrapping a subquery in a `Subquery` top level node when it gets optimized. The `RemoveRedundantAlias` rule now recognizes `Subquery` and makes sure that the output attributes of the `Subquery` node are retained. ## How was this patch tested? Added a test case to `RemoveRedundantAliasAndProjectSuite` and added a regression test to `SubquerySuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17278 from hvanhovell/SPARK-19933.
This commit is contained in:
parent
6325a2f82a
commit
e04c05cf41
|
@ -142,7 +142,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
|
|||
object OptimizeSubqueries extends Rule[LogicalPlan] {
|
||||
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
|
||||
case s: SubqueryExpression =>
|
||||
s.withNewPlan(Optimizer.this.execute(s.plan))
|
||||
val Subquery(newPlan) = Optimizer.this.execute(Subquery(s.plan))
|
||||
s.withNewPlan(newPlan)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -187,7 +188,10 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
|
|||
// If the alias name is different from attribute name, we can't strip it either, or we
|
||||
// may accidentally change the output schema name of the root plan.
|
||||
case a @ Alias(attr: Attribute, name)
|
||||
if a.metadata == Metadata.empty && name == attr.name && !blacklist.contains(attr) =>
|
||||
if a.metadata == Metadata.empty &&
|
||||
name == attr.name &&
|
||||
!blacklist.contains(attr) &&
|
||||
!blacklist.contains(a) =>
|
||||
attr
|
||||
case a => a
|
||||
}
|
||||
|
@ -195,10 +199,15 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
|
|||
/**
|
||||
* Remove redundant alias expression from a LogicalPlan and its subtree. A blacklist is used to
|
||||
* prevent the removal of seemingly redundant aliases used to deduplicate the input for a (self)
|
||||
* join.
|
||||
* join or to prevent the removal of top-level subquery attributes.
|
||||
*/
|
||||
private def removeRedundantAliases(plan: LogicalPlan, blacklist: AttributeSet): LogicalPlan = {
|
||||
plan match {
|
||||
// We want to keep the same output attributes for subqueries. This means we cannot remove
|
||||
// the aliases that produce these attributes
|
||||
case Subquery(child) =>
|
||||
Subquery(removeRedundantAliases(child, blacklist ++ child.outputSet))
|
||||
|
||||
// A join has to be treated differently, because the left and the right side of the join are
|
||||
// not allowed to use the same attributes. We use a blacklist to prevent us from creating a
|
||||
// situation in which this happens; the rule will only remove an alias if its child
|
||||
|
|
|
@ -38,6 +38,14 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
|
|||
override def output: Seq[Attribute] = child.output
|
||||
}
|
||||
|
||||
/**
|
||||
* This node is inserted at the top of a subquery when it is optimized. This makes sure we can
|
||||
* recognize a subquery as such, and it allows us to write subquery aware transformations.
|
||||
*/
|
||||
case class Subquery(child: LogicalPlan) extends UnaryNode {
|
||||
override def output: Seq[Attribute] = child.output
|
||||
}
|
||||
|
||||
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
|
||||
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
|
||||
override def maxRows: Option[Long] = child.maxRows
|
||||
|
|
|
@ -116,4 +116,12 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper
|
|||
val expected = relation.window(Seq('b), Seq('a), Seq()).analyze
|
||||
comparePlans(optimized, expected)
|
||||
}
|
||||
|
||||
test("do not remove output attributes from a subquery") {
|
||||
val relation = LocalRelation('a.int, 'b.int)
|
||||
val query = Subquery(relation.select('a as "a", 'b as "b").where('b < 10).select('a).analyze)
|
||||
val optimized = Optimize.execute(query)
|
||||
val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze)
|
||||
comparePlans(optimized, expected)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -830,4 +830,18 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
|
|||
Row(1) :: Row(0) :: Nil)
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-19933 Do not eliminate top-level aliases in sub-queries") {
|
||||
withTempView("t1", "t2") {
|
||||
spark.range(4).createOrReplaceTempView("t1")
|
||||
checkAnswer(
|
||||
sql("select * from t1 where id in (select id as id from t1)"),
|
||||
Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil)
|
||||
|
||||
spark.range(2).createOrReplaceTempView("t2")
|
||||
checkAnswer(
|
||||
sql("select * from t1 where id in (select id as id from t2)"),
|
||||
Row(0) :: Row(1) :: Nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue