[SPARK-18998][SQL] Add a cbo conf to switch between default statistics and estimated statistics

## What changes were proposed in this pull request?

We add a cbo configuration to switch between default stats and estimated stats.
We also define a new statistics method `planStats` in LogicalPlan with conf as its parameter, in order to pass the cbo switch and other estimation related configurations in the future. `planStats` is used on the caller sides (i.e. in Optimizer and Strategies) to make transformation decisions based on stats.

## How was this patch tested?

Add a test case using a dummy LogicalPlan.

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16401 from wzhfy/cboSwitch.
This commit is contained in:
Zhenhua Wang 2017-01-03 12:19:52 +08:00 committed by Wenchen Fan
parent a6cd9dbc60
commit ae83c21125
7 changed files with 117 additions and 10 deletions

View file

@ -49,6 +49,11 @@ trait CatalystConf {
def resolver: Resolver = { def resolver: Resolver = {
if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution
} }
/**
* Enables CBO for estimation of plan statistics when set true.
*/
def cboEnabled: Boolean
} }
@ -62,5 +67,6 @@ case class SimpleCatalystConf(
maxCaseBranchesForCodegen: Int = 20, maxCaseBranchesForCodegen: Int = 20,
runSQLonFile: Boolean = true, runSQLonFile: Boolean = true,
crossJoinEnabled: Boolean = false, crossJoinEnabled: Boolean = false,
cboEnabled: Boolean = false,
warehousePath: String = "/user/hive/warehouse") warehousePath: String = "/user/hive/warehouse")
extends CatalystConf extends CatalystConf

View file

@ -82,7 +82,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
EliminateOuterJoin, EliminateOuterJoin,
PushPredicateThroughJoin, PushPredicateThroughJoin,
PushDownPredicate, PushDownPredicate,
LimitPushDown, LimitPushDown(conf),
ColumnPruning, ColumnPruning,
InferFiltersFromConstraints, InferFiltersFromConstraints,
// Operator combine // Operator combine
@ -209,7 +209,7 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
/** /**
* Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins.
*/ */
object LimitPushDown extends Rule[LogicalPlan] { case class LimitPushDown(conf: CatalystConf) extends Rule[LogicalPlan] {
private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = { private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = {
plan match { plan match {
@ -253,7 +253,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
case FullOuter => case FullOuter =>
(left.maxRows, right.maxRows) match { (left.maxRows, right.maxRows) match {
case (None, None) => case (None, None) =>
if (left.statistics.sizeInBytes >= right.statistics.sizeInBytes) { if (left.planStats(conf).sizeInBytes >= right.planStats(conf).sizeInBytes) {
join.copy(left = maybePushLimit(exp, left)) join.copy(left = maybePushLimit(exp, left))
} else { } else {
join.copy(right = maybePushLimit(exp, right)) join.copy(right = maybePushLimit(exp, right))

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.QueryPlan
@ -94,6 +95,29 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product) Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
} }
/**
* Returns the default statistics or statistics estimated by cbo based on configuration.
*/
final def planStats(conf: CatalystConf): Statistics = {
if (conf.cboEnabled) {
if (estimatedStats.isEmpty) {
estimatedStats = Some(cboStatistics(conf))
}
estimatedStats.get
} else {
statistics
}
}
/**
* Returns statistics estimated by cbo. If the plan doesn't override this, it returns the
* default statistics.
*/
protected def cboStatistics(conf: CatalystConf): Statistics = statistics
/** A cache for the estimated statistics, such that it will only be computed once. */
private var estimatedStats: Option[Statistics] = None
/** /**
* Returns the maximum number of rows that this plan may compute. * Returns the maximum number of rows that this plan may compute.
* *

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.plans._
@ -32,7 +33,7 @@ class LimitPushdownSuite extends PlanTest {
Batch("Subqueries", Once, Batch("Subqueries", Once,
EliminateSubqueryAliases) :: EliminateSubqueryAliases) ::
Batch("Limit pushdown", FixedPoint(100), Batch("Limit pushdown", FixedPoint(100),
LimitPushDown, LimitPushDown(SimpleCatalystConf(caseSensitiveAnalysis = true)),
CombineLimits, CombineLimits,
ConstantFolding, ConstantFolding,
BooleanSimplification) :: Nil BooleanSimplification) :: Nil

View file

@ -114,9 +114,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Matches a plan whose output should be small enough to be used in broadcast join. * Matches a plan whose output should be small enough to be used in broadcast join.
*/ */
private def canBroadcast(plan: LogicalPlan): Boolean = { private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.statistics.isBroadcastable || plan.planStats(conf).isBroadcastable ||
(plan.statistics.sizeInBytes >= 0 && (plan.planStats(conf).sizeInBytes >= 0 &&
plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) plan.planStats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
} }
/** /**
@ -126,7 +126,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* dynamic. * dynamic.
*/ */
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = { private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions plan.planStats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
} }
/** /**
@ -137,7 +137,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* use the size of bytes here as estimation. * use the size of bytes here as estimation.
*/ */
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes a.planStats(conf).sizeInBytes * 3 <= b.planStats(conf).sizeInBytes
} }
private def canBuildRight(joinType: JoinType): Boolean = joinType match { private def canBuildRight(joinType: JoinType): Boolean = joinType match {
@ -206,7 +206,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Join(left, right, joinType, condition) => case logical.Join(left, right, joinType, condition) =>
val buildSide = val buildSide =
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { if (right.planStats(conf).sizeInBytes <= left.planStats(conf).sizeInBytes) {
BuildRight BuildRight
} else { } else {
BuildLeft BuildLeft

View file

@ -642,6 +642,12 @@ object SQLConf {
.doubleConf .doubleConf
.createWithDefault(0.05) .createWithDefault(0.05)
val CBO_ENABLED =
SQLConfigBuilder("spark.sql.cbo.enabled")
.doc("Enables CBO for estimation of plan statistics when set true.")
.booleanConf
.createWithDefault(false)
object Deprecated { object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
} }
@ -841,6 +847,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
def ndvMaxError: Double = getConf(NDV_MAX_ERROR) def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
/** ********************** SQLConf functionality methods ************ */ /** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */ /** Set Spark SQL configuration properties. */

View file

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.statsEstimation
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.IntegerType
class StatsEstimationSuite extends SharedSQLContext {
test("statistics for a plan based on the cbo switch") {
val expectedDefaultStats =
Statistics(
sizeInBytes = 40,
rowCount = Some(10),
attributeStats = AttributeMap(Seq(
AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))),
isBroadcastable = false)
val expectedCboStats =
Statistics(
sizeInBytes = 4,
rowCount = Some(1),
attributeStats = AttributeMap(Seq(
AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))),
isBroadcastable = false)
val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
withSQLConf("spark.sql.cbo.enabled" -> "true") {
// Use the statistics estimated by cbo
assert(plan.planStats(spark.sessionState.conf) == expectedCboStats)
}
withSQLConf("spark.sql.cbo.enabled" -> "false") {
// Use the default statistics
assert(plan.planStats(spark.sessionState.conf) == expectedDefaultStats)
}
}
}
/**
* This class is used for unit-testing the cbo switch, it mimics a logical plan which has both
* default statistics and cbo estimated statistics.
*/
private case class DummyLogicalPlan(
defaultStats: Statistics,
cboStats: Statistics) extends LogicalPlan {
override lazy val statistics = defaultStats
override def cboStatistics(conf: CatalystConf): Statistics = cboStats
override def output: Seq[Attribute] = Nil
override def children: Seq[LogicalPlan] = Nil
}