[SPARK-28156][SQL] Self-join should not miss cached view

## What changes were proposed in this pull request?

The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching.

The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.

```scala
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")

sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")

val cachedView = sql("select a, b, c, d from table1_vw")

cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()

val queryDf = sql(
  s"""select leftside.a, leftside.b
      |from cachedview leftside
      |join cachedview rightside
      |on leftside.a = rightside.a
    """.stripMargin)
```

Query plan before this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight
   :- *(2) Filter isnotnull(a#12664)
   :  +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
   :        +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF
ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc
t<a:int,b:int,c:int,d:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
      +- *(1) Project [a#12660]
         +- *(1) Filter isnotnull(a#12660)
            +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil
e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc
t<a:int>
```

Query plan after this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight
   :- *(2) Filter isnotnull(a#12664)
   :  +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
   :        +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *(1) Filter isnotnull(a#12692)
         +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)]
               +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
```

## How was this patch tested?

Added test.

Closes #24960 from viirya/SPARK-28156.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Liang-Chi Hsieh 2019-07-03 21:21:31 +08:00 committed by Wenchen Fan
parent 90bd017c10
commit 913ab4b9fd
6 changed files with 101 additions and 58 deletions

View file

@ -197,8 +197,6 @@ class Analyzer(
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,

View file

@ -313,6 +313,46 @@ trait CheckAnalysis extends PredicateHelper {
failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}")
}
// If the view output doesn't have the same number of columns neither with the child
// output, nor with the query column names, throw an AnalysisException.
// If the view's child output can't up cast to the view output,
// throw an AnalysisException, too.
case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) =>
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
if (output.length != queryColumnNames.length) {
// If the view output doesn't have the same number of columns with the query column
// names, throw an AnalysisException.
throw new AnalysisException(
s"The view output ${output.mkString("[", ",", "]")} doesn't have the same" +
"number of columns with the query column names " +
s"${queryColumnNames.mkString("[", ",", "]")}")
}
val resolver = SQLConf.get.resolver
queryColumnNames.map { colName =>
child.output.find { attr =>
resolver(attr.name, colName)
}.getOrElse(throw new AnalysisException(
s"Attribute with name '$colName' is not found in " +
s"'${child.output.map(_.name).mkString("(", ",", ")")}'"))
}
} else {
child.output
}
output.zip(queryOutput).foreach {
case (attr, originAttr) if !attr.dataType.sameType(originAttr.dataType) =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output
// attribute. Will throw an AnalysisException if the cast is not a up-cast.
if (!Cast.canUpCast(originAttr.dataType, attr.dataType)) {
throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
s"${originAttr.dataType.catalogString} to ${attr.dataType.catalogString} " +
"as it may truncate\n")
}
case _ =>
}
case _ => // Fallbacks to the following checks
}

View file

@ -17,8 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
@ -28,7 +27,12 @@ import org.apache.spark.sql.internal.SQLConf
*/
/**
* Make sure that a view's child plan produces the view's output attributes. We try to wrap the
* This rule has two goals:
*
* 1. Removes [[View]] operators from the plan. The operator is respected till the end of analysis
* stage because we want to see which part of an analyzed logical plan is generated from a view.
*
* 2. Make sure that a view's child plan produces the view's output attributes. We try to wrap the
* child by:
* 1. Generate the `queryOutput` by:
* 1.1. If the query column names are defined, map the column names to attributes in the child
@ -41,27 +45,29 @@ import org.apache.spark.sql.internal.SQLConf
* 2. Map the `queryOutput` to view output by index, if the corresponding attributes don't match,
* try to up cast and alias the attribute in `queryOutput` to the attribute in the view output.
* 3. Add a Project over the child, with the new output generated by the previous steps.
* If the view output doesn't have the same number of columns neither with the child output, nor
* with the query column names, throw an AnalysisException.
*
* Once reaches this rule, it means `CheckAnalysis` did necessary checks on number of columns
* between the view output and the child output or the query column names. `CheckAnalysis` also
* checked the cast from the view's child to the Project is up-cast.
*
* This should be only done after the batch of Resolution, because the view attributes are not
* completely resolved during the batch of Resolution.
*/
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def conf: SQLConf = SQLConf.get
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// The child has the different output attributes with the View operator. Adds a Project over
// the child of the view.
case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
// If the view output doesn't have the same number of columns with the query column names,
// throw an AnalysisException.
if (output.length != queryColumnNames.length) {
throw new AnalysisException(
s"The view output ${output.mkString("[", ",", "]")} doesn't have the same number of " +
s"columns with the query column names ${queryColumnNames.mkString("[", ",", "]")}")
}
// Find the attribute that has the expected attribute name from an attribute list, the names
// are compared using conf.resolver.
// `CheckAnalysis` already guarantees the expected attribute can be found for sure.
desc.viewQueryColumnNames.map { colName =>
findAttributeByName(colName, child.output, resolver)
child.output.find(attr => resolver(attr.name, colName)).get
}
} else {
// For view created before Spark 2.2.0, the view text is already fully qualified, the plan
@ -71,51 +77,16 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp
// Map the attributes in the query output to the attributes in the view output by index.
val newOutput = output.zip(queryOutput).map {
case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output attribute.
// Will throw an AnalysisException if the cast is not a up-cast.
if (!Cast.canUpCast(originAttr.dataType, attr.dataType)) {
throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
s"${originAttr.dataType.catalogString} to ${attr.dataType.catalogString} as it " +
s"may truncate\n")
} else {
Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
}
// `CheckAnalysis` already guarantees that the cast is a up-cast for sure.
Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
case (_, originAttr) => originAttr
}
v.copy(child = Project(newOutput, child))
}
Project(newOutput, child)
/**
* Find the attribute that has the expected attribute name from an attribute list, the names
* are compared using conf.resolver.
* If the expected attribute is not found, throw an AnalysisException.
*/
private def findAttributeByName(
name: String,
attrs: Seq[Attribute],
resolver: Resolver): Attribute = {
attrs.find { attr =>
resolver(attr.name, name)
}.getOrElse(throw new AnalysisException(
s"Attribute with name '$name' is not found in " +
s"'${attrs.map(_.name).mkString("(", ",", ")")}'"))
}
}
/**
* Removes [[View]] operators from the plan. The operator is respected till the end of analysis
* stage because we want to see which part of an analyzed logical plan is generated from a view.
*/
object EliminateView extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case v @ View(_, output, child) =>
assert(v.sameOutput(child),
s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
s"view output ${output.mkString("[", ",", "]")}")
case View(_, _, child) =>
child
}
}

View file

@ -586,6 +586,9 @@ case class View(
output: Seq[Attribute],
child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation {
@transient
override lazy val references: AttributeSet = AttributeSet.empty
override lazy val resolved: Boolean = child.resolved
override def children: Seq[LogicalPlan] = child :: Nil

View file

@ -616,7 +616,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
test("SPARK-25691: AliasViewChild with different nullabilities") {
object ViewAnalyzer extends RuleExecutor[LogicalPlan] {
val batches = Batch("View", Once, AliasViewChild(conf), EliminateView) :: Nil
val batches = Batch("View", Once, EliminateView) :: Nil
}
val relation = LocalRelation('a.int.notNull, 'b.string)
val view = View(CatalogTable(

View file

@ -26,6 +26,7 @@ import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@ -3149,6 +3150,36 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql("select to_timestamp('2000-01-01 01:10:00') > '1'"), Row(true))
}
}
test("SPARK-28156: self-join should not miss cached view") {
withTable("table1") {
withView("table1_vw") {
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")
sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")
val cachedView = sql("select a, b, c, d from table1_vw")
cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()
val queryDf = sql(
s"""select leftside.a, leftside.b
|from cachedview leftside
|join cachedview rightside
|on leftside.a = rightside.a
""".stripMargin)
val inMemoryTableScan = queryDf.queryExecution.executedPlan.collect {
case i: InMemoryTableScanExec => i
}
assert(inMemoryTableScan.size == 2)
checkAnswer(queryDf, Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(3, 4) :: Row(4, 5) :: Nil)
}
}
}
}
case class Foo(bar: Option[String])