Revert "[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks"
This reverts commit e430614eae
.
This commit is contained in:
parent
9634e17d01
commit
7791d0c3a9
|
@ -18,8 +18,6 @@
|
|||
package org.apache.spark.sql.catalyst.planning
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper}
|
||||
import org.apache.spark.sql.catalyst.plans
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.catalyst.trees.TreeNode
|
||||
|
||||
|
@ -28,28 +26,8 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
|
|||
* be used for execution. If this strategy does not apply to the give logical operation then an
|
||||
* empty list should be returned.
|
||||
*/
|
||||
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]]
|
||||
extends PredicateHelper with Logging {
|
||||
|
||||
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
|
||||
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
|
||||
|
||||
// Attempts to re-order the individual conjunctive predicates in an expression to short circuit
|
||||
// the evaluation of relatively cheaper checks (e.g., checking for nullability) before others.
|
||||
protected def reorderPredicates(expr: Expression): Expression = {
|
||||
splitConjunctivePredicates(expr)
|
||||
.sortWith((x, _) => x.isInstanceOf[IsNotNull])
|
||||
.reduce(And)
|
||||
}
|
||||
|
||||
// Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins
|
||||
protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = {
|
||||
exprOpt match {
|
||||
case Some(expr) =>
|
||||
Option(reorderPredicates(expr))
|
||||
case None =>
|
||||
exprOpt
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -66,13 +66,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
case ExtractEquiJoinKeys(
|
||||
LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
|
||||
joins.BroadcastLeftSemiJoinHash(
|
||||
leftKeys, rightKeys, planLater(left), planLater(right),
|
||||
reorderPredicates(condition)) :: Nil
|
||||
leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
|
||||
// Find left semi joins where at least some predicates can be evaluated by matching join keys
|
||||
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
|
||||
joins.LeftSemiJoinHash(
|
||||
leftKeys, rightKeys, planLater(left), planLater(right),
|
||||
reorderPredicates(condition)) :: Nil
|
||||
leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
|
||||
case _ => Nil
|
||||
}
|
||||
}
|
||||
|
@ -113,39 +111,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
|
||||
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
|
||||
Seq(joins.BroadcastHashJoin(
|
||||
leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition),
|
||||
planLater(left), planLater(right)))
|
||||
leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right)))
|
||||
|
||||
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
|
||||
Seq(joins.BroadcastHashJoin(
|
||||
leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), planLater(left),
|
||||
planLater(right)))
|
||||
leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right)))
|
||||
|
||||
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
|
||||
if RowOrdering.isOrderable(leftKeys) =>
|
||||
joins.SortMergeJoin(
|
||||
leftKeys, rightKeys, reorderPredicates(condition), planLater(left),
|
||||
planLater(right)) :: Nil
|
||||
leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil
|
||||
|
||||
// --- Outer joins --------------------------------------------------------------------------
|
||||
|
||||
case ExtractEquiJoinKeys(
|
||||
LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
|
||||
Seq(joins.BroadcastHashJoin(
|
||||
leftKeys, rightKeys, LeftOuter, BuildRight, reorderPredicates(condition),
|
||||
planLater(left), planLater(right)))
|
||||
leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right)))
|
||||
|
||||
case ExtractEquiJoinKeys(
|
||||
RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
|
||||
Seq(joins.BroadcastHashJoin(
|
||||
leftKeys, rightKeys, RightOuter, BuildLeft, reorderPredicates(condition),
|
||||
planLater(left), planLater(right)))
|
||||
leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right)))
|
||||
|
||||
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
|
||||
if RowOrdering.isOrderable(leftKeys) =>
|
||||
joins.SortMergeOuterJoin(
|
||||
leftKeys, rightKeys, joinType, reorderPredicates(condition), planLater(left),
|
||||
planLater(right)) :: Nil
|
||||
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
|
||||
|
||||
// --- Cases where this strategy does not apply ---------------------------------------------
|
||||
|
||||
|
@ -260,12 +252,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
|
||||
case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) =>
|
||||
execution.joins.BroadcastNestedLoopJoin(
|
||||
planLater(left), planLater(right), joins.BuildLeft, j.joinType,
|
||||
reorderPredicates(condition)) :: Nil
|
||||
planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil
|
||||
case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) =>
|
||||
execution.joins.BroadcastNestedLoopJoin(
|
||||
planLater(left), planLater(right), joins.BuildRight, j.joinType,
|
||||
reorderPredicates(condition)) :: Nil
|
||||
planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil
|
||||
case _ => Nil
|
||||
}
|
||||
}
|
||||
|
@ -275,7 +265,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
case logical.Join(left, right, Inner, None) =>
|
||||
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
|
||||
case logical.Join(left, right, Inner, Some(condition)) =>
|
||||
execution.Filter(reorderPredicates(condition),
|
||||
execution.Filter(condition,
|
||||
execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
|
||||
case _ => Nil
|
||||
}
|
||||
|
@ -292,8 +282,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
}
|
||||
// This join could be very slow or even hang forever
|
||||
joins.BroadcastNestedLoopJoin(
|
||||
planLater(left), planLater(right), buildSide, joinType,
|
||||
reorderPredicates(condition)) :: Nil
|
||||
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
|
||||
case _ => Nil
|
||||
}
|
||||
}
|
||||
|
@ -352,7 +341,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
|
|||
case logical.Project(projectList, child) =>
|
||||
execution.Project(projectList, planLater(child)) :: Nil
|
||||
case logical.Filter(condition, child) =>
|
||||
execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil
|
||||
execution.Filter(condition, planLater(child)) :: Nil
|
||||
case e @ logical.Expand(_, _, child) =>
|
||||
execution.Expand(e.projections, e.output, planLater(child)) :: Nil
|
||||
case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
|
||||
|
|
|
@ -1,103 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper}
|
||||
import org.apache.spark.sql.catalyst.plans.logical
|
||||
import org.apache.spark.sql.catalyst.plans.logical.Join
|
||||
import org.apache.spark.sql.execution
|
||||
import org.apache.spark.sql.execution.joins.LeftSemiJoinHash
|
||||
import org.apache.spark.sql.test.SharedSQLContext
|
||||
|
||||
|
||||
class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper {
|
||||
|
||||
setupTestData()
|
||||
|
||||
// Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators
|
||||
// and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained
|
||||
private def verifyStableOrder(before: Expression, after: Expression): Unit = {
|
||||
val oldPredicates = splitConjunctivePredicates(before)
|
||||
splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) =>
|
||||
// Verify IsNotNull operator ordering
|
||||
assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull])
|
||||
|
||||
// Verify stable sort order
|
||||
if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) ||
|
||||
(!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) {
|
||||
assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("null ordering in filter predicates") {
|
||||
val query = sql(
|
||||
"""
|
||||
|SELECT * from testData
|
||||
|WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5
|
||||
""".stripMargin)
|
||||
.queryExecution
|
||||
|
||||
val logicalPlan = query.optimizedPlan
|
||||
val physicalPlan = query.sparkPlan
|
||||
assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined)
|
||||
assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined)
|
||||
|
||||
val logicalCondition = logicalPlan.collect {
|
||||
case logical.Filter(condition, _) =>
|
||||
condition
|
||||
}.head
|
||||
|
||||
val physicalCondition = physicalPlan.collect {
|
||||
case Filter(condition, _) =>
|
||||
condition
|
||||
}.head
|
||||
|
||||
verifyStableOrder(logicalCondition, physicalCondition)
|
||||
}
|
||||
|
||||
test("null ordering in join predicates") {
|
||||
sqlContext.cacheManager.clearCache()
|
||||
val query = sql(
|
||||
"""
|
||||
|SELECT * FROM testData t1
|
||||
|LEFT SEMI JOIN testData t2
|
||||
|ON t1.key = t2.key
|
||||
|AND t1.key + t2.key != 5
|
||||
|AND CONCAT(t1.value, t2.value) IS NOT NULL
|
||||
""".stripMargin)
|
||||
.queryExecution
|
||||
|
||||
val logicalPlan = query.optimizedPlan
|
||||
val physicalPlan = query.sparkPlan
|
||||
assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined)
|
||||
assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined)
|
||||
|
||||
val logicalCondition = logicalPlan.collect {
|
||||
case Join(_, _, _, condition) =>
|
||||
condition.get
|
||||
}.head
|
||||
|
||||
val physicalCondition = physicalPlan.collect {
|
||||
case LeftSemiJoinHash(_, _, _, _, conditionOpt) =>
|
||||
conditionOpt.get
|
||||
}.head
|
||||
|
||||
verifyStableOrder(logicalCondition, physicalCondition)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue