From bbb33af2e4c90d679542298f56073a71de001fb7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 15 Sep 2021 14:00:45 -0700 Subject: [PATCH] [SPARK-36735][SQL] Adjust overhead of cached relation for DPP ### What changes were proposed in this pull request? This patch proposes to adjust the current overhead of cached relation for DPP. ### Why are the changes needed? Currently we calculate if there is benefit of pruning with DPP by simply summing up the size of all scan relations as the overhead. However, for cached relations, the overhead should be different than a non-cached relation. This proposes to use adjusted overhead for cached relation with DPP. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Added unit test. Closes #33975 from viirya/reduce-cache-overhead. Authored-by: Liang-Chi Hsieh Signed-off-by: Liang-Chi Hsieh --- .../dynamicpruning/PartitionPruning.scala | 27 ++++++- .../sql/DynamicPartitionPruningSuite.scala | 76 ++++++++++++++----- 2 files changed, 82 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index 3014e7a4d5..a40e041e11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering +import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation @@ -164,8 +165,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join } val estimatePruningSideSize = filterRatio * partPlan.stats.sizeInBytes.toFloat - // the pruning overhead is the total size in bytes of all scan relations - val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum.toFloat + val overhead = calculatePlanOverhead(otherPlan) if (canBuildBroadcast) { estimatePruningSideSize > overhead } else { @@ -177,6 +177,29 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join } } + /** + * Calculates a heuristic overhead of a logical plan. Normally it returns the total + * size in bytes of all scan relations. We don't count in-memory relation which uses + * only memory. + */ + private def calculatePlanOverhead(plan: LogicalPlan): Float = { + val (cached, notCached) = plan.collectLeaves().partition(p => p match { + case _: InMemoryRelation => true + case _ => false + }) + val scanOverhead = notCached.map(_.stats.sizeInBytes).sum.toFloat + val cachedOverhead = cached.map { + case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk && + !m.cacheBuilder.storageLevel.useMemory => + m.stats.sizeInBytes.toFloat + case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk => + m.stats.sizeInBytes.toFloat * 0.2 + case m: InMemoryRelation if m.cacheBuilder.storageLevel.useMemory => + 0.0 + }.sum.toFloat + scanOverhead + cachedOverhead + } + /** * Returns whether an expression is likely to be selective */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index f058bb5a26..9610f106f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -468,30 +468,68 @@ abstract class DynamicPartitionPruningSuiteBase Given("no stats and selective predicate with the size of dim too large") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") { - sql( - """ - |SELECT f.date_id, f.product_id, f.units_sold, f.store_id - |FROM fact_sk f WHERE store_id < 5 + withTable("fact_aux") { + sql( + """ + |SELECT f.date_id, f.product_id, f.units_sold, f.store_id + |FROM fact_sk f WHERE store_id < 5 """.stripMargin) - .write - .partitionBy("store_id") - .saveAsTable("fact_aux") + .write + .partitionBy("store_id") + .saveAsTable("fact_aux") - val df = sql( - """ - |SELECT f.date_id, f.product_id, f.units_sold, f.store_id - |FROM fact_aux f JOIN dim_store s - |ON f.store_id = s.store_id WHERE s.country = 'US' + val df = sql( + """ + |SELECT f.date_id, f.product_id, f.units_sold, f.store_id + |FROM fact_aux f JOIN dim_store s + |ON f.store_id = s.store_id WHERE s.country = 'US' """.stripMargin) - checkPartitionPruningPredicate(df, false, false) + checkPartitionPruningPredicate(df, false, false) - checkAnswer(df, - Row(1070, 2, 10, 4) :: - Row(1080, 3, 20, 4) :: - Row(1090, 3, 10, 4) :: - Row(1100, 3, 10, 4) :: Nil - ) + checkAnswer(df, + Row(1070, 2, 10, 4) :: + Row(1080, 3, 20, 4) :: + Row(1090, 3, 10, 4) :: + Row(1100, 3, 10, 4) :: Nil + ) + } + } + + Given("no stats and selective predicate with the size of dim too large but cached") + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") { + withTable("fact_aux") { + withTempView("cached_dim_store") { + sql( + """ + |SELECT f.date_id, f.product_id, f.units_sold, f.store_id + |FROM fact_sk f WHERE store_id < 5 + """.stripMargin) + .write + .partitionBy("store_id") + .saveAsTable("fact_aux") + + spark.table("dim_store").cache() + .createOrReplaceTempView("cached_dim_store") + + val df = sql( + """ + |SELECT f.date_id, f.product_id, f.units_sold, f.store_id + |FROM fact_aux f JOIN cached_dim_store s + |ON f.store_id = s.store_id WHERE s.country = 'US' + """.stripMargin) + + checkPartitionPruningPredicate(df, true, false) + + checkAnswer(df, + Row(1070, 2, 10, 4) :: + Row(1080, 3, 20, 4) :: + Row(1090, 3, 10, 4) :: + Row(1100, 3, 10, 4) :: Nil + ) + } + } } Given("no stats and selective predicate with the size of dim small")