[SPARK-36652][SQL] AQE dynamic join selection should not apply to non-equi join
### What changes were proposed in this pull request? Currently `DynamicJoinSelection` has two features: 1.demote broadcast hash join, and 2.promote shuffled hash join. Both are achieved by adding join hint in query plan, and only works for equi join. However [the rule is matching with `Join` operator now](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala#L71), so it would add hint for non-equi join by mistake (See added test query in `JoinHintSuite.scala` for an example). This PR is to fix `DynamicJoinSelection` to only apply to equi-join, and improve `checkHintNonEquiJoin` to check we should not add `PREFER_SHUFFLE_HASH` for non-equi join. ### Why are the changes needed? Improve the logic of codebase to be better. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `JoinHintSuite.scala`. Closes #33899 from c21/aqe-test. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
parent
4d9e577694
commit
9054a6ac00
|
@ -379,6 +379,10 @@ trait JoinSelectionHelper {
|
|||
hint.rightHint.exists(_.strategy.contains(PREFER_SHUFFLE_HASH))
|
||||
}
|
||||
|
||||
def hintToPreferShuffleHashJoin(hint: JoinHint): Boolean = {
|
||||
hintToPreferShuffleHashJoinLeft(hint) || hintToPreferShuffleHashJoinRight(hint)
|
||||
}
|
||||
|
||||
def hintToShuffleHashJoin(hint: JoinHint): Boolean = {
|
||||
hintToShuffleHashJoinLeft(hint) || hintToShuffleHashJoinRight(hint)
|
||||
}
|
||||
|
|
|
@ -169,7 +169,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
}
|
||||
|
||||
private def checkHintNonEquiJoin(hint: JoinHint): Unit = {
|
||||
if (hintToShuffleHashJoin(hint) || hintToSortMergeJoin(hint)) {
|
||||
if (hintToShuffleHashJoin(hint) || hintToPreferShuffleHashJoin(hint) ||
|
||||
hintToSortMergeJoin(hint)) {
|
||||
assert(hint.leftHint.orElse(hint.rightHint).isDefined)
|
||||
hintErrorHandler.joinHintNotSupported(hint.leftHint.orElse(hint.rightHint).get,
|
||||
"no equi-join keys")
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
package org.apache.spark.sql.execution.adaptive
|
||||
|
||||
import org.apache.spark.MapOutputStatistics
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, Join, JoinStrategyHint, LogicalPlan, NO_BROADCAST_HASH, PREFER_SHUFFLE_HASH, SHUFFLE_HASH}
|
||||
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, JoinStrategyHint, LogicalPlan, NO_BROADCAST_HASH, PREFER_SHUFFLE_HASH, SHUFFLE_HASH}
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
|
@ -68,7 +69,7 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
|
|||
}
|
||||
|
||||
def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
|
||||
case j @ Join(left, right, _, _, hint) =>
|
||||
case j @ ExtractEquiJoinKeys(_, _, _, _, left, right, hint) =>
|
||||
var newHint = hint
|
||||
if (!hint.leftHint.exists(_.strategy.isDefined)) {
|
||||
selectJoinStrategy(left).foreach { strategy =>
|
||||
|
|
|
@ -692,4 +692,18 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
|
|||
assert(logs.size == 2)
|
||||
logs.forall(_.contains("no equi-join keys"))
|
||||
}
|
||||
|
||||
test("SPARK-36652: AQE dynamic join selection should not apply to non-equi join") {
|
||||
val hintAppender = new LogAppender(s"join hint check for equi-join")
|
||||
withLogAppender(hintAppender, level = Some(Level.WARN)) {
|
||||
withSQLConf(
|
||||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
|
||||
SQLConf.ADAPTIVE_MAX_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD.key -> "64MB") {
|
||||
df1.join(df2.repartition($"b1"), $"a1" =!= $"b1").collect()
|
||||
}
|
||||
val logs = hintAppender.loggingEvents.map(_.getRenderedMessage)
|
||||
.filter(_.contains("is not supported in the query: no equi-join keys"))
|
||||
assert(logs.isEmpty)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue