[SPARK-36735][SQL] Adjust overhead of cached relation for DPP

### What changes were proposed in this pull request?

This patch proposes to adjust the current overhead of cached relation for DPP.

### Why are the changes needed?

Currently we calculate if there is benefit of pruning with DPP by simply summing up the size of all scan relations as the overhead. However, for cached relations, the overhead should be different than a non-cached relation. This proposes to use adjusted overhead for cached relation with DPP.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Added unit test.

Closes #33975 from viirya/reduce-cache-overhead.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Liang-Chi Hsieh 2021-09-15 14:00:45 -07:00
parent 16f1f71ba5
commit bbb33af2e4
2 changed files with 82 additions and 21 deletions

View file

@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
@ -164,8 +165,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
}
val estimatePruningSideSize = filterRatio * partPlan.stats.sizeInBytes.toFloat
// the pruning overhead is the total size in bytes of all scan relations
val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum.toFloat
val overhead = calculatePlanOverhead(otherPlan)
if (canBuildBroadcast) {
estimatePruningSideSize > overhead
} else {
@ -177,6 +177,29 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
}
}
/**
* Calculates a heuristic overhead of a logical plan. Normally it returns the total
* size in bytes of all scan relations. We don't count in-memory relation which uses
* only memory.
*/
private def calculatePlanOverhead(plan: LogicalPlan): Float = {
val (cached, notCached) = plan.collectLeaves().partition(p => p match {
case _: InMemoryRelation => true
case _ => false
})
val scanOverhead = notCached.map(_.stats.sizeInBytes).sum.toFloat
val cachedOverhead = cached.map {
case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk &&
!m.cacheBuilder.storageLevel.useMemory =>
m.stats.sizeInBytes.toFloat
case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk =>
m.stats.sizeInBytes.toFloat * 0.2
case m: InMemoryRelation if m.cacheBuilder.storageLevel.useMemory =>
0.0
}.sum.toFloat
scanOverhead + cachedOverhead
}
/**
* Returns whether an expression is likely to be selective
*/

View file

@ -468,30 +468,68 @@ abstract class DynamicPartitionPruningSuiteBase
Given("no stats and selective predicate with the size of dim too large")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
sql(
"""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id
|FROM fact_sk f WHERE store_id < 5
withTable("fact_aux") {
sql(
"""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id
|FROM fact_sk f WHERE store_id < 5
""".stripMargin)
.write
.partitionBy("store_id")
.saveAsTable("fact_aux")
.write
.partitionBy("store_id")
.saveAsTable("fact_aux")
val df = sql(
"""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id
|FROM fact_aux f JOIN dim_store s
|ON f.store_id = s.store_id WHERE s.country = 'US'
val df = sql(
"""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id
|FROM fact_aux f JOIN dim_store s
|ON f.store_id = s.store_id WHERE s.country = 'US'
""".stripMargin)
checkPartitionPruningPredicate(df, false, false)
checkPartitionPruningPredicate(df, false, false)
checkAnswer(df,
Row(1070, 2, 10, 4) ::
Row(1080, 3, 20, 4) ::
Row(1090, 3, 10, 4) ::
Row(1100, 3, 10, 4) :: Nil
)
checkAnswer(df,
Row(1070, 2, 10, 4) ::
Row(1080, 3, 20, 4) ::
Row(1090, 3, 10, 4) ::
Row(1100, 3, 10, 4) :: Nil
)
}
}
Given("no stats and selective predicate with the size of dim too large but cached")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
withTable("fact_aux") {
withTempView("cached_dim_store") {
sql(
"""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id
|FROM fact_sk f WHERE store_id < 5
""".stripMargin)
.write
.partitionBy("store_id")
.saveAsTable("fact_aux")
spark.table("dim_store").cache()
.createOrReplaceTempView("cached_dim_store")
val df = sql(
"""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id
|FROM fact_aux f JOIN cached_dim_store s
|ON f.store_id = s.store_id WHERE s.country = 'US'
""".stripMargin)
checkPartitionPruningPredicate(df, true, false)
checkAnswer(df,
Row(1070, 2, 10, 4) ::
Row(1080, 3, 20, 4) ::
Row(1090, 3, 10, 4) ::
Row(1100, 3, 10, 4) :: Nil
)
}
}
}
Given("no stats and selective predicate with the size of dim small")