[SPARK-16448] RemoveAliasOnlyProject should not remove alias with metadata

## What changes were proposed in this pull request?

`Alias` with metadata is not a no-op and we should not strip it in `RemoveAliasOnlyProject` rule.
This PR also did some improvement for this rule:

1. extend the semantic of `alias-only`. Now we allow the project list to be partially aliased.
2. add unit test for this rule.

## How was this patch tested?

new `RemoveAliasOnlyProjectSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14106 from cloud-fan/bug.
This commit is contained in:
Wenchen Fan 2016-07-14 15:48:22 +08:00 committed by Cheng Lian
parent 39c836e976
commit db7317ac3c
2 changed files with 108 additions and 18 deletions

View file

@ -165,36 +165,49 @@ object PushProjectThroughSample extends Rule[LogicalPlan] {
* but can also benefit other operators.
*/
object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
// Check if projectList in the Project node has the same attribute names and ordering
// as its child node.
/**
* Returns true if the project list is semantically same as child output, after strip alias on
* attribute.
*/
private def isAliasOnly(
projectList: Seq[NamedExpression],
childOutput: Seq[Attribute]): Boolean = {
if (!projectList.forall(_.isInstanceOf[Alias]) || projectList.length != childOutput.length) {
if (projectList.length != childOutput.length) {
false
} else {
projectList.map(_.asInstanceOf[Alias]).zip(childOutput).forall { case (a, o) =>
a.child match {
case attr: Attribute if a.name == attr.name && attr.semanticEquals(o) => true
case _ => false
}
stripAliasOnAttribute(projectList).zip(childOutput).forall {
case (a: Attribute, o) if a semanticEquals o => true
case _ => false
}
}
}
private def stripAliasOnAttribute(projectList: Seq[NamedExpression]) = {
projectList.map {
// Alias with metadata can not be stripped, or the metadata will be lost.
// If the alias name is different from attribute name, we can't strip it either, or we may
// accidentally change the output schema name of the root plan.
case a @ Alias(attr: Attribute, name) if a.metadata == Metadata.empty && name == attr.name =>
attr
case other => other
}
}
def apply(plan: LogicalPlan): LogicalPlan = {
val aliasOnlyProject = plan.find {
case Project(pList, child) if isAliasOnly(pList, child.output) => true
case _ => false
val aliasOnlyProject = plan.collectFirst {
case p @ Project(pList, child) if isAliasOnly(pList, child.output) => p
}
aliasOnlyProject.map { case p: Project =>
val aliases = p.projectList.map(_.asInstanceOf[Alias])
val attrMap = AttributeMap(aliases.map(a => (a.toAttribute, a.child)))
plan.transformAllExpressions {
case a: Attribute if attrMap.contains(a) => attrMap(a)
}.transform {
case op: Project if op.eq(p) => op.child
aliasOnlyProject.map { case proj =>
val attributesToReplace = proj.output.zip(proj.child.output).filterNot {
case (a1, a2) => a1 semanticEquals a2
}
val attrMap = AttributeMap(attributesToReplace)
plan transform {
case plan: Project if plan eq proj => plan.child
case plan => plan transformExpressions {
case a: Attribute if attrMap.contains(a) => attrMap(a)
}
}
}.getOrElse(plan)
}

View file

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.MetadataBuilder
class RemoveAliasOnlyProjectSuite extends PlanTest with PredicateHelper {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("RemoveAliasOnlyProject", FixedPoint(50), RemoveAliasOnlyProject) :: Nil
}
test("all expressions in project list are aliased child output") {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('a as 'a, 'b as 'b).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, relation)
}
test("all expressions in project list are aliased child output but with different order") {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('b as 'b, 'a as 'a).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}
test("some expressions in project list are aliased child output") {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('a as 'a, 'b).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, relation)
}
test("some expressions in project list are aliased child output but with different order") {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('b as 'b, 'a).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}
test("some expressions in project list are not Alias or Attribute") {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('a as 'a, 'b + 1).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}
test("some expressions in project list are aliased child output but with metadata") {
val relation = LocalRelation('a.int, 'b.int)
val metadata = new MetadataBuilder().putString("x", "y").build()
val aliasWithMeta = Alias('a, "a")(explicitMetadata = Some(metadata))
val query = relation.select(aliasWithMeta, 'b).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}
}