From ae83c211257c508989c703d54f2aeec8b2b5f14d Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Tue, 3 Jan 2017 12:19:52 +0800 Subject: [PATCH] [SPARK-18998][SQL] Add a cbo conf to switch between default statistics and estimated statistics ## What changes were proposed in this pull request? We add a cbo configuration to switch between default stats and estimated stats. We also define a new statistics method `planStats` in LogicalPlan with conf as its parameter, in order to pass the cbo switch and other estimation related configurations in the future. `planStats` is used on the caller sides (i.e. in Optimizer and Strategies) to make transformation decisions based on stats. ## How was this patch tested? Add a test case using a dummy LogicalPlan. Author: Zhenhua Wang Closes #16401 from wzhfy/cboSwitch. --- .../spark/sql/catalyst/CatalystConf.scala | 6 ++ .../sql/catalyst/optimizer/Optimizer.scala | 6 +- .../catalyst/plans/logical/LogicalPlan.scala | 24 +++++++ .../optimizer/LimitPushdownSuite.scala | 3 +- .../spark/sql/execution/SparkStrategies.scala | 12 ++-- .../apache/spark/sql/internal/SQLConf.scala | 9 +++ .../StatsEstimationSuite.scala | 67 +++++++++++++++++++ 7 files changed, 117 insertions(+), 10 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 75ae588c18..b805cfe88b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -49,6 +49,11 @@ trait CatalystConf { def resolver: Resolver = { if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution } + + /** + * Enables CBO for estimation of plan statistics when set true. + */ + def cboEnabled: Boolean } @@ -62,5 +67,6 @@ case class SimpleCatalystConf( maxCaseBranchesForCodegen: Int = 20, runSQLonFile: Boolean = true, crossJoinEnabled: Boolean = false, + cboEnabled: Boolean = false, warehousePath: String = "/user/hive/warehouse") extends CatalystConf diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dfd66aac2d..d1f90e6a1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -82,7 +82,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, - LimitPushDown, + LimitPushDown(conf), ColumnPruning, InferFiltersFromConstraints, // Operator combine @@ -209,7 +209,7 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] { /** * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. */ -object LimitPushDown extends Rule[LogicalPlan] { +case class LimitPushDown(conf: CatalystConf) extends Rule[LogicalPlan] { private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = { plan match { @@ -253,7 +253,7 @@ object LimitPushDown extends Rule[LogicalPlan] { case FullOuter => (left.maxRows, right.maxRows) match { case (None, None) => - if (left.statistics.sizeInBytes >= right.statistics.sizeInBytes) { + if (left.planStats(conf).sizeInBytes >= right.planStats(conf).sizeInBytes) { join.copy(left = maybePushLimit(exp, left)) } else { join.copy(right = maybePushLimit(exp, right)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index b0a4145f37..4f634cb29d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -94,6 +95,29 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product) } + /** + * Returns the default statistics or statistics estimated by cbo based on configuration. + */ + final def planStats(conf: CatalystConf): Statistics = { + if (conf.cboEnabled) { + if (estimatedStats.isEmpty) { + estimatedStats = Some(cboStatistics(conf)) + } + estimatedStats.get + } else { + statistics + } + } + + /** + * Returns statistics estimated by cbo. If the plan doesn't override this, it returns the + * default statistics. + */ + protected def cboStatistics(conf: CatalystConf): Statistics = statistics + + /** A cache for the estimated statistics, such that it will only be computed once. */ + private var estimatedStats: Option[Statistics] = None + /** * Returns the maximum number of rows that this plan may compute. * diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index dcbc79365c..9ec99835c6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -32,7 +33,7 @@ class LimitPushdownSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubqueryAliases) :: Batch("Limit pushdown", FixedPoint(100), - LimitPushDown, + LimitPushDown(SimpleCatalystConf(caseSensitiveAnalysis = true)), CombineLimits, ConstantFolding, BooleanSimplification) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ba82ec156e..81cd5ef340 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -114,9 +114,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Matches a plan whose output should be small enough to be used in broadcast join. */ private def canBroadcast(plan: LogicalPlan): Boolean = { - plan.statistics.isBroadcastable || - (plan.statistics.sizeInBytes >= 0 && - plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) + plan.planStats(conf).isBroadcastable || + (plan.planStats(conf).sizeInBytes >= 0 && + plan.planStats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold) } /** @@ -126,7 +126,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * dynamic. */ private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = { - plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions + plan.planStats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions } /** @@ -137,7 +137,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * use the size of bytes here as estimation. */ private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { - a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes + a.planStats(conf).sizeInBytes * 3 <= b.planStats(conf).sizeInBytes } private def canBuildRight(joinType: JoinType): Boolean = joinType match { @@ -206,7 +206,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Join(left, right, joinType, condition) => val buildSide = - if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { + if (right.planStats(conf).sizeInBytes <= left.planStats(conf).sizeInBytes) { BuildRight } else { BuildLeft diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 304dcb691b..322cc7c928 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -642,6 +642,12 @@ object SQLConf { .doubleConf .createWithDefault(0.05) + val CBO_ENABLED = + SQLConfigBuilder("spark.sql.cbo.enabled") + .doc("Enables CBO for estimation of plan statistics when set true.") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -841,6 +847,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) + + override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala new file mode 100644 index 0000000000..78f2ce1d57 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.statsEstimation + +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.IntegerType + + +class StatsEstimationSuite extends SharedSQLContext { + test("statistics for a plan based on the cbo switch") { + val expectedDefaultStats = + Statistics( + sizeInBytes = 40, + rowCount = Some(10), + attributeStats = AttributeMap(Seq( + AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))), + isBroadcastable = false) + val expectedCboStats = + Statistics( + sizeInBytes = 4, + rowCount = Some(1), + attributeStats = AttributeMap(Seq( + AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))), + isBroadcastable = false) + + val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats) + withSQLConf("spark.sql.cbo.enabled" -> "true") { + // Use the statistics estimated by cbo + assert(plan.planStats(spark.sessionState.conf) == expectedCboStats) + } + withSQLConf("spark.sql.cbo.enabled" -> "false") { + // Use the default statistics + assert(plan.planStats(spark.sessionState.conf) == expectedDefaultStats) + } + } +} + +/** + * This class is used for unit-testing the cbo switch, it mimics a logical plan which has both + * default statistics and cbo estimated statistics. + */ +private case class DummyLogicalPlan( + defaultStats: Statistics, + cboStats: Statistics) extends LogicalPlan { + override lazy val statistics = defaultStats + override def cboStatistics(conf: CatalystConf): Statistics = cboStats + override def output: Seq[Attribute] = Nil + override def children: Seq[LogicalPlan] = Nil +}