[SPARK-28220][SQL] Improve PropagateEmptyRelation to support join with false condition
### What changes were proposed in this pull request? Improve `PropagateEmptyRelation` to support join with false condition. For example: ```sql SELECT * FROM t1 LEFT JOIN t2 ON false ``` Before this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastNestedLoopJoin BuildRight, LeftOuter, false :- FileScan parquet default.t1[a#4L] +- BroadcastExchange IdentityBroadcastMode, [id=#40] +- FileScan parquet default.t2[b#5L] ``` After this pr: ``` == Physical Plan == *(1) Project [a#4L, null AS b#5L] +- *(1) ColumnarToRow +- FileScan parquet default.t1[a#4L] ``` ### Why are the changes needed? Avoid `BroadcastNestedLoopJoin` to improve query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #31857 from wangyum/SPARK-28220. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
This commit is contained in:
parent
ed641fbad6
commit
908318f30d
|
@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
|
|||
|
||||
import org.apache.spark.sql.catalyst.analysis.CastSupport
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
|
||||
import org.apache.spark.sql.catalyst.plans._
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.catalyst.rules._
|
||||
|
@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules._
|
|||
* - Join with one or two empty children (including Intersect/Except).
|
||||
* 2. Unary-node Logical Plans
|
||||
* - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
|
||||
* - Join with false condition.
|
||||
* - Aggregate with all empty children and at least one grouping expression.
|
||||
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
|
||||
*/
|
||||
|
@ -71,24 +73,32 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
|
|||
// Joins on empty LocalRelations generated from streaming sources are not eliminated
|
||||
// as stateful streaming joins need to perform other state management operations other than
|
||||
// just processing the input data.
|
||||
case p @ Join(_, _, joinType, _, _)
|
||||
case p @ Join(_, _, joinType, conditionOpt, _)
|
||||
if !p.children.exists(_.isStreaming) =>
|
||||
val isLeftEmpty = isEmptyLocalRelation(p.left)
|
||||
val isRightEmpty = isEmptyLocalRelation(p.right)
|
||||
if (isLeftEmpty || isRightEmpty) {
|
||||
val isFalseCondition = conditionOpt match {
|
||||
case Some(FalseLiteral) => true
|
||||
case _ => false
|
||||
}
|
||||
if (isLeftEmpty || isRightEmpty || isFalseCondition) {
|
||||
joinType match {
|
||||
case _: InnerLike => empty(p)
|
||||
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
|
||||
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
|
||||
case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
|
||||
case LeftSemi if isRightEmpty => empty(p)
|
||||
case LeftAnti if isRightEmpty => p.left
|
||||
case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
|
||||
case LeftAnti if isRightEmpty | isFalseCondition => p.left
|
||||
case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
|
||||
case LeftOuter | FullOuter if isRightEmpty =>
|
||||
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
|
||||
case RightOuter if isRightEmpty => empty(p)
|
||||
case RightOuter | FullOuter if isLeftEmpty =>
|
||||
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
|
||||
case LeftOuter if isFalseCondition =>
|
||||
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
|
||||
case RightOuter if isFalseCondition =>
|
||||
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
|
||||
case _ => p
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
|
|||
import org.apache.spark.sql.catalyst.dsl.expressions._
|
||||
import org.apache.spark.sql.catalyst.dsl.plans._
|
||||
import org.apache.spark.sql.catalyst.expressions.Literal
|
||||
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
|
||||
import org.apache.spark.sql.catalyst.plans._
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
|
||||
import org.apache.spark.sql.catalyst.rules.RuleExecutor
|
||||
|
@ -155,6 +156,28 @@ class PropagateEmptyRelationSuite extends PlanTest {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-28220: Propagate empty relation through Join if condition is FalseLiteral") {
|
||||
val testcases = Seq(
|
||||
(Inner, Some(LocalRelation('a.int, 'b.int))),
|
||||
(Cross, Some(LocalRelation('a.int, 'b.int))),
|
||||
(LeftOuter,
|
||||
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
|
||||
(RightOuter,
|
||||
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
|
||||
(FullOuter, None),
|
||||
(LeftAnti, Some(testRelation1)),
|
||||
(LeftSemi, Some(LocalRelation('a.int)))
|
||||
)
|
||||
|
||||
testcases.foreach { case (jt, answer) =>
|
||||
val query = testRelation1.join(testRelation2, joinType = jt, condition = Some(FalseLiteral))
|
||||
val optimized = Optimize.execute(query.analyze)
|
||||
val correctAnswer =
|
||||
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
|
||||
comparePlans(optimized, correctAnswer)
|
||||
}
|
||||
}
|
||||
|
||||
test("propagate empty relation through UnaryNode") {
|
||||
val query = testRelation1
|
||||
.where(false)
|
||||
|
|
Loading…
Reference in a new issue