[SPARK-35984][SQL][TEST] Config to force applying shuffled hash join
### What changes were proposed in this pull request?
Add a config `spark.sql.join.forceApplyShuffledHashJoin` to force applying shuffled hash join
during the join selection.
### Why are the changes needed?
In the `SQLQueryTestSuite`, we want to cover 3 kinds of join (BHJ, SHJ, SMJ) in join.sql. But even
if the `spark.sql.join.preferSortMergeJoin` is set to `false`, shuffled hash join is still not guaranteed.
Thus, we need another config to force the selection.
### Does this PR introduce _any_ user-facing change?
No, only for testing
### How was this patch tested?
newly added tests
Verified all queries in join.sql will use `ShuffledHashJoin` when the config set to `true`
Closes #33182 from linhongliu-db/SPARK-35984-hash-join-config.
Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7566db6033
)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
6c1c1af6b4
commit
f3ec79990f
|
@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules._
|
|||
import org.apache.spark.sql.catalyst.trees.TreePattern._
|
||||
import org.apache.spark.sql.errors.QueryCompilationErrors
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
|
||||
|
@ -274,14 +275,16 @@ trait JoinSelectionHelper {
|
|||
} else {
|
||||
hintToPreferShuffleHashJoinLeft(hint) ||
|
||||
(!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) &&
|
||||
muchSmaller(left, right))
|
||||
muchSmaller(left, right)) ||
|
||||
forceApplyShuffledHashJoin(conf)
|
||||
}
|
||||
val buildRight = if (hintOnly) {
|
||||
hintToShuffleHashJoinRight(hint)
|
||||
} else {
|
||||
hintToPreferShuffleHashJoinRight(hint) ||
|
||||
(!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) &&
|
||||
muchSmaller(right, left))
|
||||
muchSmaller(right, left)) ||
|
||||
forceApplyShuffledHashJoin(conf)
|
||||
}
|
||||
getBuildSide(
|
||||
canBuildShuffledHashJoinLeft(joinType) && buildLeft,
|
||||
|
@ -424,5 +427,14 @@ trait JoinSelectionHelper {
|
|||
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
|
||||
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether a shuffled hash join should be force applied.
|
||||
* The config key is hard-coded because it's testing only and should not be exposed.
|
||||
*/
|
||||
private def forceApplyShuffledHashJoin(conf: SQLConf): Boolean = {
|
||||
Utils.isTesting &&
|
||||
conf.getConfString("spark.sql.join.forceApplyShuffledHashJoin", "false") == "true"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
|
||||
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=10485760
|
||||
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=true
|
||||
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false
|
||||
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.forceApplyShuffledHashJoin=true
|
||||
|
||||
--CONFIG_DIM2 spark.sql.codegen.wholeStage=true
|
||||
--CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
|
||||
|
|
|
@ -1394,4 +1394,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
|
|||
checkAnswer(fullJoinDF, Row(100))
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-35984: Config to force applying shuffled hash join") {
|
||||
val sql = "SELECT * FROM testData JOIN testData2 ON key = a"
|
||||
assertJoin(sql, classOf[SortMergeJoinExec])
|
||||
withSQLConf("spark.sql.join.forceApplyShuffledHashJoin" -> "true") {
|
||||
assertJoin(sql, classOf[ShuffledHashJoinExec])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue