From f3ec79990f66c9e3d1908f228a41c244da2d84f1 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Wed, 7 Jul 2021 00:58:14 +0800 Subject: [PATCH] [SPARK-35984][SQL][TEST] Config to force applying shuffled hash join ### What changes were proposed in this pull request? Add a config `spark.sql.join.forceApplyShuffledHashJoin` to force applying shuffled hash join during the join selection. ### Why are the changes needed? In the `SQLQueryTestSuite`, we want to cover 3 kinds of join (BHJ, SHJ, SMJ) in join.sql. But even if the `spark.sql.join.preferSortMergeJoin` is set to `false`, shuffled hash join is still not guaranteed. Thus, we need another config to force the selection. ### Does this PR introduce _any_ user-facing change? No, only for testing ### How was this patch tested? newly added tests Verified all queries in join.sql will use `ShuffledHashJoin` when the config set to `true` Closes #33182 from linhongliu-db/SPARK-35984-hash-join-config. Authored-by: Linhong Liu Signed-off-by: Wenchen Fan (cherry picked from commit 7566db603305ef04db89768bbb50d8b978a5c93c) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/optimizer/joins.scala | 16 ++++++++++++++-- .../sql-tests/inputs/postgreSQL/join.sql | 2 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 8 ++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 31a48c8cb5..d6e2a59de0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils /** * Reorder the joins and push all the conditions into join, so that the bottom ones have at least @@ -274,14 +275,16 @@ trait JoinSelectionHelper { } else { hintToPreferShuffleHashJoinLeft(hint) || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) && - muchSmaller(left, right)) + muchSmaller(left, right)) || + forceApplyShuffledHashJoin(conf) } val buildRight = if (hintOnly) { hintToShuffleHashJoinRight(hint) } else { hintToPreferShuffleHashJoinRight(hint) || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) && - muchSmaller(right, left)) + muchSmaller(right, left)) || + forceApplyShuffledHashJoin(conf) } getBuildSide( canBuildShuffledHashJoinLeft(joinType) && buildLeft, @@ -424,5 +427,14 @@ trait JoinSelectionHelper { private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes } + + /** + * Returns whether a shuffled hash join should be force applied. + * The config key is hard-coded because it's testing only and should not be exposed. + */ + private def forceApplyShuffledHashJoin(conf: SQLConf): Boolean = { + Utils.isTesting && + conf.getConfString("spark.sql.join.forceApplyShuffledHashJoin", "false") == "true" + } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql index 183e79ee98..dc0b56112d 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql @@ -13,7 +13,7 @@ --CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=10485760 --CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=true ---CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false +--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.forceApplyShuffledHashJoin=true --CONFIG_DIM2 spark.sql.codegen.wholeStage=true --CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index acbf30089a..abfc19ac6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1394,4 +1394,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan checkAnswer(fullJoinDF, Row(100)) } } + + test("SPARK-35984: Config to force applying shuffled hash join") { + val sql = "SELECT * FROM testData JOIN testData2 ON key = a" + assertJoin(sql, classOf[SortMergeJoinExec]) + withSQLConf("spark.sql.join.forceApplyShuffledHashJoin" -> "true") { + assertJoin(sql, classOf[ShuffledHashJoinExec]) + } + } }