[SPARK-21229][SQL] remove QueryPlan.preCanonicalized

## What changes were proposed in this pull request?

`QueryPlan.preCanonicalized` is only overridden in a few places, and it does introduce an extra concept to `QueryPlan` which may confuse people.

This PR removes it and override `canonicalized` in these places

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18440 from cloud-fan/minor.
This commit is contained in:
Wenchen Fan 2017-06-29 11:21:50 +08:00
parent fc92d25f2a
commit 25c2edf6f9
4 changed files with 27 additions and 22 deletions

View file

@ -27,7 +27,8 @@ import com.google.common.base.Objects
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
@ -425,15 +426,17 @@ case class CatalogRelation(
Objects.hashCode(tableMeta.identifier, output)
}
override def preCanonicalized: LogicalPlan = copy(tableMeta = CatalogTable(
identifier = tableMeta.identifier,
tableType = tableMeta.tableType,
storage = CatalogStorageFormat.empty,
schema = tableMeta.schema,
partitionColumnNames = tableMeta.partitionColumnNames,
bucketSpec = tableMeta.bucketSpec,
createTime = -1
))
override lazy val canonicalized: LogicalPlan = copy(
tableMeta = tableMeta.copy(
storage = CatalogStorageFormat.empty,
createTime = -1
),
dataCols = dataCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index))
},
partitionCols = partitionCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
})
override def computeStats: Statistics = {
// For data source tables, we will create a `LogicalRelation` and won't call this method, for

View file

@ -188,12 +188,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same
* result.
*
* Some nodes should overwrite this to provide proper canonicalize logic.
* Some nodes should overwrite this to provide proper canonicalize logic, but they should remove
* expressions cosmetic variations themselves.
*/
lazy val canonicalized: PlanType = {
val canonicalizedChildren = children.map(_.canonicalized)
var id = -1
preCanonicalized.mapExpressions {
mapExpressions {
case a: Alias =>
id += 1
// As the root of the expression, Alias will always take an arbitrary exprId, we need to
@ -206,18 +207,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
// Top level `AttributeReference` may also be used for output like `Alias`, we should
// normalize the epxrId too.
id += 1
ar.withExprId(ExprId(id))
ar.withExprId(ExprId(id)).canonicalized
case other => QueryPlan.normalizeExprId(other, allAttributes)
}.withNewChildren(canonicalizedChildren)
}
/**
* Do some simple transformation on this plan before canonicalizing. Implementations can override
* this method to provide customized canonicalize logic without rewriting the whole logic.
*/
protected def preCanonicalized: PlanType = this
/**
* Returns true when the given query plan will return the same results as this query plan.

View file

@ -138,8 +138,12 @@ case class RowDataSourceScanExec(
}
// Only care about `relation` and `metadata` when canonicalizing.
override def preCanonicalized: SparkPlan =
copy(rdd = null, outputPartitioning = null, metastoreTableIdentifier = None)
override lazy val canonicalized: SparkPlan =
copy(
output.map(QueryPlan.normalizeExprId(_, output)),
rdd = null,
outputPartitioning = null,
metastoreTableIdentifier = None)
}
/**

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.util.Utils
@ -43,7 +44,9 @@ case class LogicalRelation(
}
// Only care about relation when canonicalizing.
override def preCanonicalized: LogicalPlan = copy(catalogTable = None)
override lazy val canonicalized: LogicalPlan = copy(
output = output.map(QueryPlan.normalizeExprId(_, output)),
catalogTable = None)
@transient override def computeStats: Statistics = {
catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(