From 913ab4b9fd2ac417822590b227ce4f7a9dd2ac04 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 3 Jul 2019 21:21:31 +0800 Subject: [PATCH] [SPARK-28156][SQL] Self-join should not miss cached view ## What changes were proposed in this pull request? The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct ``` ## How was this patch tested? Added test. Closes #24960 from viirya/SPARK-28156. Authored-by: Liang-Chi Hsieh Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 2 - .../sql/catalyst/analysis/CheckAnalysis.scala | 40 +++++++++ .../spark/sql/catalyst/analysis/view.scala | 81 ++++++------------- .../plans/logical/basicLogicalOperators.scala | 3 + .../sql/catalyst/analysis/AnalysisSuite.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 31 +++++++ 6 files changed, 101 insertions(+), 58 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 60517f11a2..7b920e183d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -197,8 +197,6 @@ class Analyzer( TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), - Batch("View", Once, - AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 6b9b4f4e1b..286794c0ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -313,6 +313,46 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}") } + // If the view output doesn't have the same number of columns neither with the child + // output, nor with the query column names, throw an AnalysisException. + // If the view's child output can't up cast to the view output, + // throw an AnalysisException, too. + case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) => + val queryColumnNames = desc.viewQueryColumnNames + val queryOutput = if (queryColumnNames.nonEmpty) { + if (output.length != queryColumnNames.length) { + // If the view output doesn't have the same number of columns with the query column + // names, throw an AnalysisException. + throw new AnalysisException( + s"The view output ${output.mkString("[", ",", "]")} doesn't have the same" + + "number of columns with the query column names " + + s"${queryColumnNames.mkString("[", ",", "]")}") + } + val resolver = SQLConf.get.resolver + queryColumnNames.map { colName => + child.output.find { attr => + resolver(attr.name, colName) + }.getOrElse(throw new AnalysisException( + s"Attribute with name '$colName' is not found in " + + s"'${child.output.map(_.name).mkString("(", ",", ")")}'")) + } + } else { + child.output + } + + output.zip(queryOutput).foreach { + case (attr, originAttr) if !attr.dataType.sameType(originAttr.dataType) => + // The dataType of the output attributes may be not the same with that of the view + // output, so we should cast the attribute to the dataType of the view output + // attribute. Will throw an AnalysisException if the cast is not a up-cast. + if (!Cast.canUpCast(originAttr.dataType, attr.dataType)) { + throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " + + s"${originAttr.dataType.catalogString} to ${attr.dataType.catalogString} " + + "as it may truncate\n") + } + case _ => + } + case _ => // Fallbacks to the following checks } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index 24276e11d8..76bf3740ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast} +import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -28,7 +27,12 @@ import org.apache.spark.sql.internal.SQLConf */ /** - * Make sure that a view's child plan produces the view's output attributes. We try to wrap the + * This rule has two goals: + * + * 1. Removes [[View]] operators from the plan. The operator is respected till the end of analysis + * stage because we want to see which part of an analyzed logical plan is generated from a view. + * + * 2. Make sure that a view's child plan produces the view's output attributes. We try to wrap the * child by: * 1. Generate the `queryOutput` by: * 1.1. If the query column names are defined, map the column names to attributes in the child @@ -41,27 +45,29 @@ import org.apache.spark.sql.internal.SQLConf * 2. Map the `queryOutput` to view output by index, if the corresponding attributes don't match, * try to up cast and alias the attribute in `queryOutput` to the attribute in the view output. * 3. Add a Project over the child, with the new output generated by the previous steps. - * If the view output doesn't have the same number of columns neither with the child output, nor - * with the query column names, throw an AnalysisException. + * + * Once reaches this rule, it means `CheckAnalysis` did necessary checks on number of columns + * between the view output and the child output or the query column names. `CheckAnalysis` also + * checked the cast from the view's child to the Project is up-cast. * * This should be only done after the batch of Resolution, because the view attributes are not * completely resolved during the batch of Resolution. */ -case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { +object EliminateView extends Rule[LogicalPlan] with CastSupport { + override def conf: SQLConf = SQLConf.get + + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + // The child has the different output attributes with the View operator. Adds a Project over + // the child of the view. case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) => val resolver = conf.resolver val queryColumnNames = desc.viewQueryColumnNames val queryOutput = if (queryColumnNames.nonEmpty) { - // If the view output doesn't have the same number of columns with the query column names, - // throw an AnalysisException. - if (output.length != queryColumnNames.length) { - throw new AnalysisException( - s"The view output ${output.mkString("[", ",", "]")} doesn't have the same number of " + - s"columns with the query column names ${queryColumnNames.mkString("[", ",", "]")}") - } + // Find the attribute that has the expected attribute name from an attribute list, the names + // are compared using conf.resolver. + // `CheckAnalysis` already guarantees the expected attribute can be found for sure. desc.viewQueryColumnNames.map { colName => - findAttributeByName(colName, child.output, resolver) + child.output.find(attr => resolver(attr.name, colName)).get } } else { // For view created before Spark 2.2.0, the view text is already fully qualified, the plan @@ -71,51 +77,16 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp // Map the attributes in the query output to the attributes in the view output by index. val newOutput = output.zip(queryOutput).map { case (attr, originAttr) if !attr.semanticEquals(originAttr) => - // The dataType of the output attributes may be not the same with that of the view - // output, so we should cast the attribute to the dataType of the view output attribute. - // Will throw an AnalysisException if the cast is not a up-cast. - if (!Cast.canUpCast(originAttr.dataType, attr.dataType)) { - throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " + - s"${originAttr.dataType.catalogString} to ${attr.dataType.catalogString} as it " + - s"may truncate\n") - } else { - Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId, - qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata)) - } + // `CheckAnalysis` already guarantees that the cast is a up-cast for sure. + Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId, + qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata)) case (_, originAttr) => originAttr } - v.copy(child = Project(newOutput, child)) - } + Project(newOutput, child) - /** - * Find the attribute that has the expected attribute name from an attribute list, the names - * are compared using conf.resolver. - * If the expected attribute is not found, throw an AnalysisException. - */ - private def findAttributeByName( - name: String, - attrs: Seq[Attribute], - resolver: Resolver): Attribute = { - attrs.find { attr => - resolver(attr.name, name) - }.getOrElse(throw new AnalysisException( - s"Attribute with name '$name' is not found in " + - s"'${attrs.map(_.name).mkString("(", ",", ")")}'")) - } -} - -/** - * Removes [[View]] operators from the plan. The operator is respected till the end of analysis - * stage because we want to see which part of an analyzed logical plan is generated from a view. - */ -object EliminateView extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { // The child should have the same output attributes with the View operator, so we simply // remove the View operator. - case v @ View(_, output, child) => - assert(v.sameOutput(child), - s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " + - s"view output ${output.mkString("[", ",", "]")}") + case View(_, _, child) => child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 93a558de58..273a4389ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -586,6 +586,9 @@ case class View( output: Seq[Attribute], child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation { + @transient + override lazy val references: AttributeSet = AttributeSet.empty + override lazy val resolved: Boolean = child.resolved override def children: Seq[LogicalPlan] = child :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index b69f25b8a5..34ea6d5404 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -616,7 +616,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { test("SPARK-25691: AliasViewChild with different nullabilities") { object ViewAnalyzer extends RuleExecutor[LogicalPlan] { - val batches = Batch("View", Once, AliasViewChild(conf), EliminateView) :: Nil + val batches = Batch("View", Once, EliminateView) :: Nil } val relation = LocalRelation('a.int.notNull, 'b.string) val view = View(CatalogTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index ba1ac654c1..2cc1be9fdd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -3149,6 +3150,36 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(sql("select to_timestamp('2000-01-01 01:10:00') > '1'"), Row(true)) } } + + test("SPARK-28156: self-join should not miss cached view") { + withTable("table1") { + withView("table1_vw") { + val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") + df.write.mode("overwrite").format("orc").saveAsTable("table1") + sql("drop view if exists table1_vw") + sql("create view table1_vw as select * from table1") + + val cachedView = sql("select a, b, c, d from table1_vw") + + cachedView.createOrReplaceTempView("cachedview") + cachedView.persist() + + val queryDf = sql( + s"""select leftside.a, leftside.b + |from cachedview leftside + |join cachedview rightside + |on leftside.a = rightside.a + """.stripMargin) + + val inMemoryTableScan = queryDf.queryExecution.executedPlan.collect { + case i: InMemoryTableScanExec => i + } + assert(inMemoryTableScan.size == 2) + checkAnswer(queryDf, Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(3, 4) :: Row(4, 5) :: Nil) + } + } + + } } case class Foo(bar: Option[String])