[SPARK-34514][SQL] Push down limit for LEFT SEMI and LEFT ANTI join

### What changes were proposed in this pull request?

I found out during code review of https://github.com/apache/spark/pull/31567#discussion_r577379572, where we can push down limit to the left side of LEFT SEMI and LEFT ANTI join, if the join condition is empty.

Why it's safe to push down limit:

The semantics of LEFT SEMI join without condition:
(1). if right side is non-empty, output all rows from left side.
(2). if right side is empty, output nothing.

The semantics of LEFT ANTI join without condition:
(1). if right side is non-empty, output nothing.
(2). if right side is empty, output all rows from left side.

With the semantics of output all rows from left side or nothing (all or nothing), it's safe to push down limit to left side.
NOTE: LEFT SEMI / LEFT ANTI join with non-empty condition is not safe for limit push down, because output can be a portion of left side rows.

Reference: physical operator implementation for LEFT SEMI / LEFT ANTI join without condition - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala#L200-L204 .

### Why are the changes needed?

Better performance. Save CPU and IO for these joins, as limit being pushed down before join.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `LimitPushdownSuite.scala` and `SQLQuerySuite.scala`.

Closes #31630 from c21/limit-pushdown.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Cheng Su 2021-02-24 10:23:01 +00:00 committed by Wenchen Fan
parent 14934f42d0
commit 6ef57d31cd
3 changed files with 62 additions and 8 deletions

View file

@ -502,7 +502,7 @@ object RemoveNoopOperators extends Rule[LogicalPlan] {
} }
/** /**
* Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. * Pushes down [[LocalLimit]] beneath UNION ALL and joins.
*/ */
object LimitPushDown extends Rule[LogicalPlan] { object LimitPushDown extends Rule[LogicalPlan] {
@ -539,12 +539,16 @@ object LimitPushDown extends Rule[LogicalPlan] {
// pushdown Limit. // pushdown Limit.
case LocalLimit(exp, u: Union) => case LocalLimit(exp, u: Union) =>
LocalLimit(exp, u.copy(children = u.children.map(maybePushLocalLimit(exp, _)))) LocalLimit(exp, u.copy(children = u.children.map(maybePushLocalLimit(exp, _))))
// Add extra limits below JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to // Add extra limits below JOIN:
// the left and right sides, respectively. For INNER and CROSS JOIN we push limits to // 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides,
// both the left and right sides if join condition is empty. It's not safe to push limits // respectively.
// below FULL OUTER JOIN in the general case without a more invasive rewrite. // 2. For INNER and CROSS JOIN, we push limits to both the left and right sides if join
// We also need to ensure that this limit pushdown rule will not eventually introduce limits // condition is empty.
// on both sides if it is applied multiple times. Therefore: // 3. For LEFT SEMI and LEFT ANTI JOIN, we push limits to the left side if join condition
// is empty.
// It's not safe to push limits below FULL OUTER JOIN in the general case without a more
// invasive rewrite. We also need to ensure that this limit pushdown rule will not eventually
// introduce limits on both sides if it is applied multiple times. Therefore:
// - If one side is already limited, stack another limit on top if the new limit is smaller. // - If one side is already limited, stack another limit on top if the new limit is smaller.
// The redundant limit will be collapsed by the CombineLimits rule. // The redundant limit will be collapsed by the CombineLimits rule.
case LocalLimit(exp, join @ Join(left, right, joinType, conditionOpt, _)) => case LocalLimit(exp, join @ Join(left, right, joinType, conditionOpt, _)) =>
@ -555,6 +559,8 @@ object LimitPushDown extends Rule[LogicalPlan] {
join.copy( join.copy(
left = maybePushLocalLimit(exp, left), left = maybePushLocalLimit(exp, left),
right = maybePushLocalLimit(exp, right)) right = maybePushLocalLimit(exp, right))
case LeftSemi | LeftAnti if conditionOpt.isEmpty =>
join.copy(left = maybePushLocalLimit(exp, left))
case _ => join case _ => join
} }
LocalLimit(exp, newJoin) LocalLimit(exp, newJoin)

View file

@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Add import org.apache.spark.sql.catalyst.expressions.Add
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.rules._
@ -212,4 +212,22 @@ class LimitPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer) comparePlans(optimized, correctAnswer)
} }
} }
test("SPARK-34514: Push down limit through LEFT SEMI and LEFT ANTI join") {
// Push down when condition is empty
Seq(LeftSemi, LeftAnti).foreach { joinType =>
val originalQuery = x.join(y, joinType).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, LocalLimit(1, x).join(y, joinType)).analyze
comparePlans(optimized, correctAnswer)
}
// No push down when condition is not empty
Seq(LeftSemi, LeftAnti).foreach { joinType =>
val originalQuery = x.join(y, joinType, Some("x.a".attr === "y.b".attr)).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, x.join(y, joinType, Some("x.a".attr === "y.b".attr))).analyze
comparePlans(optimized, correctAnswer)
}
}
} }

View file

@ -4034,6 +4034,36 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(df, Row(0, 0) :: Row(0, 1) :: Row(0, 2) :: Nil) checkAnswer(df, Row(0, 0) :: Row(0, 1) :: Row(0, 2) :: Nil)
} }
} }
test("SPARK-34514: Push down limit through LEFT SEMI and LEFT ANTI join") {
withTable("left_table", "nonempty_right_table", "empty_right_table") {
spark.range(5).toDF().repartition(1).write.saveAsTable("left_table")
spark.range(3).write.saveAsTable("nonempty_right_table")
spark.range(0).write.saveAsTable("empty_right_table")
Seq("LEFT SEMI", "LEFT ANTI").foreach { joinType =>
val joinWithNonEmptyRightDf = spark.sql(
s"SELECT * FROM left_table $joinType JOIN nonempty_right_table LIMIT 3")
val joinWithEmptyRightDf = spark.sql(
s"SELECT * FROM left_table $joinType JOIN empty_right_table LIMIT 3")
Seq(joinWithNonEmptyRightDf, joinWithEmptyRightDf).foreach { df =>
val pushedLocalLimits = df.queryExecution.optimizedPlan.collect {
case l @ LocalLimit(_, _: LogicalRelation) => l
}
assert(pushedLocalLimits.length === 1)
}
val expectedAnswer = Seq(Row(0), Row(1), Row(2))
if (joinType == "LEFT SEMI") {
checkAnswer(joinWithNonEmptyRightDf, expectedAnswer)
checkAnswer(joinWithEmptyRightDf, Seq.empty)
} else {
checkAnswer(joinWithNonEmptyRightDf, Seq.empty)
checkAnswer(joinWithEmptyRightDf, expectedAnswer)
}
}
}
}
} }
case class Foo(bar: Option[String]) case class Foo(bar: Option[String])