[SPARK-35351][SQL][FOLLOWUP] Avoid using loaded
variable for LEFT ANTI SMJ code-gen
### What changes were proposed in this pull request? This is a followup from https://github.com/apache/spark/pull/32547#discussion_r639916474, where for LEFT ANTI join, we do not need to depend on `loaded` variable, as in `codegenAnti` we only load `streamedAfter` no more than once (i.e. assign column values from streamed row which are not used in join condition). ### Why are the changes needed? Avoid unnecessary processing in code-gen (though it's just `boolean $loaded = false;`, and `if (!$loaded) { $loaded = true; }`). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unite tests in `ExistenceJoinSuite`. Closes #32681 from c21/join-followup. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
79a2a46cdb
commit
5cc17ba0c7
|
@ -681,26 +681,29 @@ case class SortMergeJoinExec(
|
|||
val cond = BindReferences.bindReference(
|
||||
condition.get, streamedPlan.output ++ bufferedPlan.output).genCode(ctx)
|
||||
// Evaluate the columns those used by condition before loop
|
||||
val before =
|
||||
s"""
|
||||
|boolean $loaded = false;
|
||||
|$streamedBefore
|
||||
""".stripMargin
|
||||
|
||||
val loadStreamed =
|
||||
s"""
|
||||
|if (!$loaded) {
|
||||
| $loaded = true;
|
||||
| $streamedAfter
|
||||
|}
|
||||
val before = joinType match {
|
||||
case LeftAnti =>
|
||||
// No need to initialize `loaded` variable for Left Anti join.
|
||||
streamedBefore.trim
|
||||
case _ =>
|
||||
s"""
|
||||
|boolean $loaded = false;
|
||||
|$streamedBefore
|
||||
""".stripMargin
|
||||
}
|
||||
|
||||
val loadStreamedAfterCondition = joinType match {
|
||||
case LeftAnti =>
|
||||
// No need to evaluate columns not used by condition from streamed side, as for Left Anti
|
||||
// join, streamed row with match is not outputted.
|
||||
""
|
||||
case _ => loadStreamed
|
||||
case _ =>
|
||||
s"""
|
||||
|if (!$loaded) {
|
||||
| $loaded = true;
|
||||
| $streamedAfter
|
||||
|}
|
||||
""".stripMargin
|
||||
}
|
||||
|
||||
val loadBufferedAfterCondition = joinType match {
|
||||
|
@ -722,7 +725,7 @@ case class SortMergeJoinExec(
|
|||
|$loadStreamedAfterCondition
|
||||
|$loadBufferedAfterCondition
|
||||
""".stripMargin
|
||||
(before, checking.trim, loadStreamed)
|
||||
(before, checking.trim, streamedAfter.trim)
|
||||
} else {
|
||||
(evaluateVariables(streamedVars), "", "")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue