Revert "Revert "[SPARK-13383][SQL] Keep broadcast hint after column pruning""
This reverts commit 382b27babf
.
This commit is contained in:
parent
d563c8fa01
commit
65805ab6ea
|
@ -332,6 +332,10 @@ case class Join(
|
|||
*/
|
||||
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
|
||||
override def output: Seq[Attribute] = child.output
|
||||
|
||||
// We manually set statistics of BroadcastHint to smallest value to make sure
|
||||
// the plan wrapped by BroadcastHint will be considered to broadcast later.
|
||||
override def statistics: Statistics = Statistics(sizeInBytes = 1)
|
||||
}
|
||||
|
||||
case class InsertIntoTable(
|
||||
|
|
|
@ -23,18 +23,18 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
|
|||
import org.apache.spark.sql.catalyst.dsl.plans._
|
||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
|
||||
import org.apache.spark.sql.catalyst.plans.PlanTest
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
|
||||
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.catalyst.rules.RuleExecutor
|
||||
|
||||
|
||||
class JoinOrderSuite extends PlanTest {
|
||||
class JoinOptimizationSuite extends PlanTest {
|
||||
|
||||
object Optimize extends RuleExecutor[LogicalPlan] {
|
||||
val batches =
|
||||
Batch("Subqueries", Once,
|
||||
EliminateSubqueryAliases) ::
|
||||
Batch("Filter Pushdown", Once,
|
||||
Batch("Filter Pushdown", FixedPoint(100),
|
||||
CombineFilters,
|
||||
PushPredicateThroughProject,
|
||||
BooleanSimplification,
|
||||
|
@ -92,4 +92,31 @@ class JoinOrderSuite extends PlanTest {
|
|||
|
||||
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
|
||||
}
|
||||
|
||||
test("broadcasthint sets relation statistics to smallest value") {
|
||||
val input = LocalRelation('key.int, 'value.string)
|
||||
|
||||
val query =
|
||||
Project(Seq($"x.key", $"y.key"),
|
||||
Join(
|
||||
SubqueryAlias("x", input),
|
||||
BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
|
||||
|
||||
val optimized = Optimize.execute(query)
|
||||
|
||||
val expected =
|
||||
Project(Seq($"x.key", $"y.key"),
|
||||
Join(
|
||||
Project(Seq($"x.key"), SubqueryAlias("x", input)),
|
||||
BroadcastHint(
|
||||
Project(Seq($"y.key"), SubqueryAlias("y", input))),
|
||||
Inner, None)).analyze
|
||||
|
||||
comparePlans(optimized, expected)
|
||||
|
||||
val broadcastChildren = optimized.collect {
|
||||
case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r
|
||||
}
|
||||
assert(broadcastChildren.size == 1)
|
||||
}
|
||||
}
|
|
@ -81,11 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
* Matches a plan whose output should be small enough to be used in broadcast join.
|
||||
*/
|
||||
object CanBroadcast {
|
||||
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
|
||||
case BroadcastHint(p) => Some(p)
|
||||
case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
|
||||
p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p)
|
||||
case _ => None
|
||||
def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
|
||||
if (sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
|
||||
plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) {
|
||||
Some(plan)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue