[SPARK-13383][SQL] Keep broadcast hint after column pruning

JIRA: https://issues.apache.org/jira/browse/SPARK-13383

## What changes were proposed in this pull request?

When we do column pruning in Optimizer, we put additional Project on top of a logical plan. However, when we already wrap a BroadcastHint on a logical plan, the added Project will hide BroadcastHint after later execution.

We should take care of BroadcastHint when we do column pruning.

## How was the this patch tested?

Unit test is added.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #11260 from viirya/keep-broadcasthint.
This commit is contained in:
Liang-Chi Hsieh 2016-02-24 10:22:40 -08:00 committed by Michael Armbrust
parent 8930181833
commit f373986997
3 changed files with 42 additions and 9 deletions

View file

@ -332,6 +332,10 @@ case class Join(
*/
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
// We manually set statistics of BroadcastHint to smallest value to make sure
// the plan wrapped by BroadcastHint will be considered to broadcast later.
override def statistics: Statistics = Statistics(sizeInBytes = 1)
}
case class InsertIntoTable(

View file

@ -23,18 +23,18 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
class JoinOrderSuite extends PlanTest {
class JoinOptimizationSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Filter Pushdown", Once,
Batch("Filter Pushdown", FixedPoint(100),
CombineFilters,
PushPredicateThroughProject,
BooleanSimplification,
@ -92,4 +92,31 @@ class JoinOrderSuite extends PlanTest {
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
test("broadcasthint sets relation statistics to smallest value") {
val input = LocalRelation('key.int, 'value.string)
val query =
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input),
BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
val optimized = Optimize.execute(query)
val expected =
Project(Seq($"x.key", $"y.key"),
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input)),
BroadcastHint(
Project(Seq($"y.key"), SubqueryAlias("y", input))),
Inner, None)).analyze
comparePlans(optimized, expected)
val broadcastChildren = optimized.collect {
case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r
}
assert(broadcastChildren.size == 1)
}
}

View file

@ -81,11 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
object CanBroadcast {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
case BroadcastHint(p) => Some(p)
case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p)
case _ => None
def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
if (sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) {
Some(plan)
} else {
None
}
}
}