[SPARK-27909][SQL] Do not run analysis inside CTE substitution

## What changes were proposed in this pull request?

This updates CTE substitution to avoid needing to run all resolution rules on each substituted expression. Running resolution rules was previously used to avoid infinite recursion. In the updated rule, CTE plans are substituted as sub-queries from right to left. Using this scope-based order, it is not necessary to replace multiple CTEs at the same time using `resolveOperatorsDown`. Instead, `resolveOperatorsUp` is used to replace each CTE individually.

By resolving using `resolveOperatorsUp`, this no longer needs to run all analyzer rules on each substituted expression. Previously, this was done to apply `ResolveRelations`, which would throw an `AnalysisException` for all unresolved relations so that unresolved relations that may cause recursive substitutions were not left in the plan. Because this is no longer needed, `ResolveRelations` no longer needs to throw `AnalysisException` and resolution can be done in multiple rules.

## How was this patch tested?

Existing tests in `SQLQueryTestSuite`, `cte.sql`.

Closes #24763 from rdblue/SPARK-27909-fix-cte-substitution.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
Ryan Blue 2019-06-04 14:46:13 -07:00 committed by gatorsmile
parent f9ca8ab196
commit de73a54269

View file

@ -215,23 +215,26 @@ class Analyzer(
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case With(child, relations) =>
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
case (resolved, (name, relation)) =>
resolved :+ name -> executeSameContext(substituteCTE(relation, resolved))
})
// substitute CTE expressions right-to-left to resolve references to previous CTEs:
// with a as (select * from t), b as (select * from a) select * from b
relations.foldRight(child) {
case ((cteName, ctePlan), currentPlan) =>
substituteCTE(currentPlan, cteName, ctePlan)
}
case other => other
}
def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = {
plan resolveOperatorsDown {
def substituteCTE(plan: LogicalPlan, cteName: String, ctePlan: LogicalPlan): LogicalPlan = {
plan resolveOperatorsUp {
case UnresolvedRelation(TableIdentifier(table, None)) if resolver(cteName, table) =>
ctePlan
case u: UnresolvedRelation =>
cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
.map(_._2).getOrElse(u)
u
case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other transformExpressions {
case e: SubqueryExpression =>
e.withNewPlan(substituteCTE(e.plan, cteRelations))
e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan))
}
}
}