[SPARK-34884][SQL] Improve DPP evaluation to make filtering side must can broadcast by size or broadcast by hint

### What changes were proposed in this pull request?

Improve dynamic partition pruning evaluation to make filtering side must can broadcast by size or broadcast by hint.

### Why are the changes needed?

1. Fast fail if filtering side can not broadcast by size or broadcast by hint.
2. We can safely disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit test.

Closes #31984 from wangyum/SPARK-34884.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Yuming Wang 2021-03-30 12:34:46 +00:00 committed by Wenchen Fan
parent a98dc60408
commit de66fa63f9
4 changed files with 28 additions and 7 deletions

View file

@ -160,8 +160,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
// We can't reuse the broadcast because the join type doesn't support broadcast,
// and doing DPP means running an extra query that may have significant overhead.
// We need to make sure the pruning side is very big so that DPP is still worthy.
canBroadcastBySize(otherPlan, conf) &&
estimatePruningSideSize * conf.dynamicPartitionPruningPruningSideExtraFilterRatio > overhead
estimatePruningSideSize * conf.dynamicPartitionPruningPruningSideExtraFilterRatio > overhead
}
}
@ -247,13 +246,15 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
// otherwise the pruning will not trigger
var partScan = getPartitionTableScan(l, left)
if (partScan.isDefined && canPruneLeft(joinType) &&
hasPartitionPruningFilter(right)) {
hasPartitionPruningFilter(right) &&
(canBroadcastBySize(right, conf) || hintToBroadcastRight(hint))) {
newLeft = insertPredicate(l, newLeft, r, right, rightKeys, partScan.get,
canBuildBroadcastRight(joinType))
} else {
partScan = getPartitionTableScan(r, right)
if (partScan.isDefined && canPruneRight(joinType) &&
hasPartitionPruningFilter(left) ) {
hasPartitionPruningFilter(left) &&
(canBroadcastBySize(left, conf) || hintToBroadcastLeft(hint))) {
newRight = insertPredicate(r, newRight, l, left, leftKeys, partScan.get,
canBuildBroadcastLeft(joinType))
}

View file

@ -85,7 +85,7 @@ TakeOrderedAndProject (80)
Output [8]: [cs_ship_date_sk#1, cs_bill_cdemo_sk#2, cs_bill_hdemo_sk#3, cs_item_sk#4, cs_promo_sk#5, cs_order_number#6, cs_quantity#7, cs_sold_date_sk#8]
Batched: true
Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(cs_sold_date_sk#8), dynamicpruningexpression(true)]
PartitionFilters: [isnotnull(cs_sold_date_sk#8)]
PushedFilters: [IsNotNull(cs_quantity), IsNotNull(cs_item_sk), IsNotNull(cs_bill_cdemo_sk), IsNotNull(cs_bill_hdemo_sk), IsNotNull(cs_ship_date_sk)]
ReadSchema: struct<cs_ship_date_sk:int,cs_bill_cdemo_sk:int,cs_bill_hdemo_sk:int,cs_item_sk:int,cs_promo_sk:int,cs_order_number:int,cs_quantity:int>

View file

@ -85,7 +85,7 @@ TakeOrderedAndProject (80)
Output [8]: [cs_ship_date_sk#1, cs_bill_cdemo_sk#2, cs_bill_hdemo_sk#3, cs_item_sk#4, cs_promo_sk#5, cs_order_number#6, cs_quantity#7, cs_sold_date_sk#8]
Batched: true
Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(cs_sold_date_sk#8), dynamicpruningexpression(true)]
PartitionFilters: [isnotnull(cs_sold_date_sk#8)]
PushedFilters: [IsNotNull(cs_quantity), IsNotNull(cs_item_sk), IsNotNull(cs_bill_cdemo_sk), IsNotNull(cs_bill_hdemo_sk), IsNotNull(cs_ship_date_sk)]
ReadSchema: struct<cs_ship_date_sk:int,cs_bill_cdemo_sk:int,cs_bill_hdemo_sk:int,cs_item_sk:int,cs_promo_sk:int,cs_order_number:int,cs_quantity:int>

View file

@ -868,7 +868,7 @@ abstract class DynamicPartitionPruningSuiteBase
|ON f.store_id = s.store_id WHERE s.country = 'DE'
""".stripMargin)
checkPartitionPruningPredicate(df, true, false)
checkPartitionPruningPredicate(df, false, false)
checkAnswer(df,
Row(1030, 2, 10, 3) ::
@ -1463,6 +1463,26 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
}
test("SPARK-34884: DPP evaluation consider broadcast hint") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
"""
|SELECT /*+ BROADCAST(s) */ f.date_id, f.product_id, f.units_sold FROM fact_stats f
|JOIN dim_stats s
|ON f.store_id = s.store_id WHERE s.country = 'DE'
""".stripMargin)
checkPartitionPruningPredicate(df, false, true)
checkAnswer(df,
Row(1030, 2, 10) ::
Row(1040, 2, 50) ::
Row(1050, 2, 50) ::
Row(1060, 2, 50) :: Nil
)
}
}
}
class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase