diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index e4c445ab7b..90b25a927e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -379,6 +379,10 @@ trait JoinSelectionHelper { hint.rightHint.exists(_.strategy.contains(PREFER_SHUFFLE_HASH)) } + def hintToPreferShuffleHashJoin(hint: JoinHint): Boolean = { + hintToPreferShuffleHashJoinLeft(hint) || hintToPreferShuffleHashJoinRight(hint) + } + def hintToShuffleHashJoin(hint: JoinHint): Boolean = { hintToShuffleHashJoinLeft(hint) || hintToShuffleHashJoinRight(hint) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d6a3c590b5..88b0279899 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -169,7 +169,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } private def checkHintNonEquiJoin(hint: JoinHint): Unit = { - if (hintToShuffleHashJoin(hint) || hintToSortMergeJoin(hint)) { + if (hintToShuffleHashJoin(hint) || hintToPreferShuffleHashJoin(hint) || + hintToSortMergeJoin(hint)) { assert(hint.leftHint.orElse(hint.rightHint).isDefined) hintErrorHandler.joinHintNotSupported(hint.leftHint.orElse(hint.rightHint).get, "no equi-join keys") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala index fe2430e94f..e99fea9455 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.MapOutputStatistics -import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, Join, JoinStrategyHint, LogicalPlan, NO_BROADCAST_HASH, PREFER_SHUFFLE_HASH, SHUFFLE_HASH} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, JoinStrategyHint, LogicalPlan, NO_BROADCAST_HASH, PREFER_SHUFFLE_HASH, SHUFFLE_HASH} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -68,7 +69,7 @@ object DynamicJoinSelection extends Rule[LogicalPlan] { } def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { - case j @ Join(left, right, _, _, hint) => + case j @ ExtractEquiJoinKeys(_, _, _, _, left, right, hint) => var newHint = hint if (!hint.leftHint.exists(_.strategy.isDefined)) { selectJoinStrategy(left).foreach { strategy => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 2f7a00365f..91cad8585f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -692,4 +692,18 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP assert(logs.size == 2) logs.forall(_.contains("no equi-join keys")) } + + test("SPARK-36652: AQE dynamic join selection should not apply to non-equi join") { + val hintAppender = new LogAppender(s"join hint check for equi-join") + withLogAppender(hintAppender, level = Some(Level.WARN)) { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_MAX_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD.key -> "64MB") { + df1.join(df2.repartition($"b1"), $"a1" =!= $"b1").collect() + } + val logs = hintAppender.loggingEvents.map(_.getRenderedMessage) + .filter(_.contains("is not supported in the query: no equi-join keys")) + assert(logs.isEmpty) + } + } }