[SPARK-32855][SQL][FOLLOWUP] Fix code format in SQLConf and comment in PartitionPruning

### What changes were proposed in this pull request?

Fix code format in `SQLConf` and comment in `PartitionPruning`.

### Why are the changes needed?

Make code more readable.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A

Closes #31969 from wangyum/SPARK-32855-2.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Yuming Wang 2021-03-28 09:48:54 -07:00 committed by Dongjoon Hyun
parent c8b7a09d39
commit 540f1fb1d9
3 changed files with 26 additions and 24 deletions

View file

@ -287,16 +287,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
val DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO = buildConf(
"spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio")
.internal()
.doc("When statistics are not available or configured not to be used, this config will be " +
"used as the fallback filter ratio for computing the data size of the partitioned table " +
"after dynamic partition pruning, in order to evaluate if it is worth adding an extra " +
"subquery as the pruning filter if broadcast reuse is not applicable.")
.version("3.0.0")
.doubleConf
.createWithDefault(0.5)
val DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio")
.internal()
.doc("When statistics are not available or configured not to be used, this config will be " +
"used as the fallback filter ratio for computing the data size of the partitioned table " +
"after dynamic partition pruning, in order to evaluate if it is worth adding an extra " +
"subquery as the pruning filter if broadcast reuse is not applicable.")
.version("3.0.0")
.doubleConf
.createWithDefault(0.5)
val DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly")
@ -307,16 +307,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
val DYNAMIC_PARTITON_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO =
val DYNAMIC_PARTITION_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.pruningSideExtraFilterRatio")
.internal()
.doc("When filtering side doesn't support broadcast by join type, and doing DPP means " +
"running an extra query that may have significant overhead. This config will be used " +
"as the extra filter ratio for computing the data size of the pruning side after DPP, " +
"in order to evaluate if it is worth adding an extra subquery as the pruning filter.")
.version("3.2.0")
.doubleConf
.createWithDefault(0.04)
.internal()
.doc("When filtering side doesn't support broadcast by join type, and doing DPP means " +
"running an extra query that may have significant overhead. This config will be used " +
"as the extra filter ratio for computing the data size of the pruning side after DPP, " +
"in order to evaluate if it is worth adding an extra subquery as the pruning filter.")
.version("3.2.0")
.doubleConf
.checkValue(ratio => ratio > 0.0 && ratio <= 1.0, "The ratio value must be in (0.0, 1.0].")
.createWithDefault(0.04)
val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
@ -3261,7 +3262,7 @@ class SQLConf extends Serializable with Logging {
getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY)
def dynamicPartitionPruningPruningSideExtraFilterRatio: Double =
getConf(DYNAMIC_PARTITON_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO)
getConf(DYNAMIC_PARTITION_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO)
def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)

View file

@ -108,11 +108,12 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
}
/**
* Given an estimated filtering ratio we assume the partition pruning has benefit if
* Given an estimated filtering ratio(and extra filter ratio if filtering side can't
* build broadcast by join type) we assume the partition pruning has benefit if
* the size in bytes of the partitioned plan after filtering is greater than the size
* in bytes of the plan on the other side of the join. We estimate the filtering ratio
* using column statistics if they are available, otherwise we use the config value of
* `spark.sql.optimizer.joinFilterRatio`.
* `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
*/
private def pruningHasBenefit(
partExpr: Expression,

View file

@ -414,7 +414,7 @@ abstract class DynamicPartitionPruningSuiteBase
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITON_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {
SQLConf.DYNAMIC_PARTITION_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {
Given("dynamic partition pruning disabled")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false") {
val df = sql(
@ -1436,7 +1436,7 @@ abstract class DynamicPartitionPruningSuiteBase
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
SQLConf.DYNAMIC_PARTITON_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {
SQLConf.DYNAMIC_PARTITION_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {
val sqlStr =
"""