[SPARK-36353][SQL] RemoveNoopOperators should keep output schema

### What changes were proposed in this pull request?
 RemoveNoopOperators should keep output schema

### Why are the changes needed?
Expand function

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33587 from AngersZhuuuu/SPARK-36355.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Angerszhuuuu 2021-08-05 20:43:35 +08:00 committed by Wenchen Fan
parent 7d13ac177b
commit 02810eecbf
2 changed files with 37 additions and 1 deletions

View file

@ -516,10 +516,34 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
* Remove no-op operators from the query plan that do not make any modifications.
*/
object RemoveNoopOperators extends Rule[LogicalPlan] {
def restoreOriginalOutputNames(
projectList: Seq[NamedExpression],
originalNames: Seq[String]): Seq[NamedExpression] = {
projectList.zip(originalNames).map {
case (attr: Attribute, name) => attr.withName(name)
case (alias: Alias, name) => alias.withName(name)
case (other, _) => other
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsAnyPattern(PROJECT, WINDOW), ruleId) {
// Eliminate no-op Projects
case p @ Project(_, child) if child.sameOutput(p) => child
case p @ Project(projectList, child) if child.sameOutput(p) =>
val newChild = child match {
case p: Project =>
p.copy(projectList = restoreOriginalOutputNames(p.projectList, projectList.map(_.name)))
case agg: Aggregate =>
agg.copy(aggregateExpressions =
restoreOriginalOutputNames(agg.aggregateExpressions, projectList.map(_.name)))
case _ =>
child
}
if (newChild.output.zip(projectList).forall { case (a1, a2) => a1.name == a2.name }) {
newChild
} else {
p
}
// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child

View file

@ -54,4 +54,16 @@ class RemoveNoopOperatorsSuite extends PlanTest {
comparePlans(optimized, testRelation)
}
test("SPARK-36353: RemoveNoopOperators should keep output schema") {
val query = testRelation
.select(('a + 'b).as("c"))
.analyze
val originalQuery = Project(Seq(query.output.head.withName("C")), query)
val optimized = Optimize.execute(originalQuery.analyze)
val result = testRelation
.select(('a + 'b).as("C"))
.analyze
comparePlans(optimized, result)
}
}