[SPARK-31719][SQL] Refactor JoinSelection

### What changes were proposed in this pull request?
This PR extracts the logic for selecting the planned join type out of the `JoinSelection` rule and moves it to `JoinSelectionHelper` in Catalyst.

### Why are the changes needed?
This change both cleans up the code in `JoinSelection` and allows the logic to be in one place and be used from other rules that need to make decision based on the join type before the planning time.

### Does this PR introduce _any_ user-facing change?
`BuildSide`, `BuildLeft`, and `BuildRight` are moved from `org.apache.spark.sql.execution` to Catalyst in `org.apache.spark.sql.catalyst.optimizer`.

### How was this patch tested?
This is a refactoring, passes existing tests.

Closes #28540 from dbaliafroozeh/RefactorJoinSelection.

Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Ali Afroozeh 2020-05-27 15:49:08 +00:00 committed by Wenchen Fan
parent 50492c0bd3
commit f6f1e51072
17 changed files with 395 additions and 161 deletions

View file

@ -208,3 +208,161 @@ object ExtractPythonUDFFromJoinCondition extends Rule[LogicalPlan] with Predicat
}
}
}
sealed abstract class BuildSide
case object BuildRight extends BuildSide
case object BuildLeft extends BuildSide
trait JoinSelectionHelper {
def getBroadcastBuildSide(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
hint: JoinHint,
hintOnly: Boolean,
conf: SQLConf): Option[BuildSide] = {
val buildLeft = if (hintOnly) {
hintToBroadcastLeft(hint)
} else {
canBroadcastBySize(left, conf) && !hintToNotBroadcastLeft(hint)
}
val buildRight = if (hintOnly) {
hintToBroadcastRight(hint)
} else {
canBroadcastBySize(right, conf) && !hintToNotBroadcastRight(hint)
}
getBuildSide(
canBuildLeft(joinType) && buildLeft,
canBuildRight(joinType) && buildRight,
left,
right
)
}
def getShuffleHashJoinBuildSide(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
hint: JoinHint,
hintOnly: Boolean,
conf: SQLConf): Option[BuildSide] = {
val buildLeft = if (hintOnly) {
hintToShuffleHashJoinLeft(hint)
} else {
canBuildLocalHashMapBySize(left, conf) && muchSmaller(left, right)
}
val buildRight = if (hintOnly) {
hintToShuffleHashJoinRight(hint)
} else {
canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)
}
getBuildSide(
canBuildLeft(joinType) && buildLeft,
canBuildRight(joinType) && buildRight,
left,
right
)
}
def getSmallerSide(left: LogicalPlan, right: LogicalPlan): BuildSide = {
if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
}
/**
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
def canBuildLeft(joinType: JoinType): Boolean = {
joinType match {
case _: InnerLike | RightOuter => true
case _ => false
}
}
def canBuildRight(joinType: JoinType): Boolean = {
joinType match {
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}
}
def hintToBroadcastLeft(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(BROADCAST))
}
def hintToBroadcastRight(hint: JoinHint): Boolean = {
hint.rightHint.exists(_.strategy.contains(BROADCAST))
}
def hintToNotBroadcastLeft(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_HASH))
}
def hintToNotBroadcastRight(hint: JoinHint): Boolean = {
hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_HASH))
}
def hintToShuffleHashJoinLeft(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(SHUFFLE_HASH))
}
def hintToShuffleHashJoinRight(hint: JoinHint): Boolean = {
hint.rightHint.exists(_.strategy.contains(SHUFFLE_HASH))
}
def hintToSortMergeJoin(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(SHUFFLE_MERGE)) ||
hint.rightHint.exists(_.strategy.contains(SHUFFLE_MERGE))
}
def hintToShuffleReplicateNL(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL)) ||
hint.rightHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL))
}
private def getBuildSide(
canBuildLeft: Boolean,
canBuildRight: Boolean,
left: LogicalPlan,
right: LogicalPlan): Option[BuildSide] = {
if (canBuildLeft && canBuildRight) {
// returns the smaller side base on its estimated physical size, if we want to build the
// both sides.
Some(getSmallerSide(left, right))
} else if (canBuildLeft) {
Some(BuildLeft)
} else if (canBuildRight) {
Some(BuildRight)
} else {
None
}
}
/**
* Matches a plan whose single partition should be small enough to build a hash table.
*
* Note: this assume that the number of partition is fixed, requires additional work if it's
* dynamic.
*/
private def canBuildLocalHashMapBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
/**
* Returns whether plan a is much smaller (3X) than plan b.
*
* The cost to build hash map is higher than sorting, we should only build hash map on a table
* that is much smaller than other one. Since we does not have the statistic for number of rows,
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}
}

View file

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, JoinHint, NO_BROADCAST_HASH, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.statsEstimation.StatsTestPlan
import org.apache.spark.sql.internal.SQLConf
class JoinSelectionHelperSuite extends PlanTest with JoinSelectionHelper {
private val left = StatsTestPlan(
outputList = Seq('a.int, 'b.int, 'c.int),
rowCount = 20000000,
size = Some(20000000),
attributeStats = AttributeMap(Seq()))
private val right = StatsTestPlan(
outputList = Seq('d.int),
rowCount = 1000,
size = Some(1000),
attributeStats = AttributeMap(Seq()))
private val hintBroadcast = Some(HintInfo(Some(BROADCAST)))
private val hintNotToBroadcast = Some(HintInfo(Some(NO_BROADCAST_HASH)))
private val hintShuffleHash = Some(HintInfo(Some(SHUFFLE_HASH)))
test("getBroadcastBuildSide (hintOnly = true) return BuildLeft with only a left hint") {
val broadcastSide = getBroadcastBuildSide(
left,
right,
Inner,
JoinHint(hintBroadcast, None),
hintOnly = true,
SQLConf.get
)
assert(broadcastSide === Some(BuildLeft))
}
test("getBroadcastBuildSide (hintOnly = true) return BuildRight with only a right hint") {
val broadcastSide = getBroadcastBuildSide(
left,
right,
Inner,
JoinHint(None, hintBroadcast),
hintOnly = true,
SQLConf.get
)
assert(broadcastSide === Some(BuildRight))
}
test("getBroadcastBuildSide (hintOnly = true) return smaller side with both having hints") {
val broadcastSide = getBroadcastBuildSide(
left,
right,
Inner,
JoinHint(hintBroadcast, hintBroadcast),
hintOnly = true,
SQLConf.get
)
assert(broadcastSide === Some(BuildRight))
}
test("getBroadcastBuildSide (hintOnly = true) return None when no side has a hint") {
val broadcastSide = getBroadcastBuildSide(
left,
right,
Inner,
JoinHint(None, None),
hintOnly = true,
SQLConf.get
)
assert(broadcastSide === None)
}
test("getBroadcastBuildSide (hintOnly = false) return BuildRight when right is broadcastable") {
val broadcastSide = getBroadcastBuildSide(
left,
right,
Inner,
JoinHint(None, None),
hintOnly = false,
SQLConf.get
)
assert(broadcastSide === Some(BuildRight))
}
test("getBroadcastBuildSide (hintOnly = false) return None when right has no broadcast hint") {
val broadcastSide = getBroadcastBuildSide(
left,
right,
Inner,
JoinHint(None, hintNotToBroadcast ),
hintOnly = false,
SQLConf.get
)
assert(broadcastSide === None)
}
test("getShuffleHashJoinBuildSide (hintOnly = true) return BuildLeft with only a left hint") {
val broadcastSide = getShuffleHashJoinBuildSide(
left,
right,
Inner,
JoinHint(hintShuffleHash, None),
hintOnly = true,
SQLConf.get
)
assert(broadcastSide === Some(BuildLeft))
}
test("getShuffleHashJoinBuildSide (hintOnly = true) return BuildRight with only a right hint") {
val broadcastSide = getShuffleHashJoinBuildSide(
left,
right,
Inner,
JoinHint(None, hintShuffleHash),
hintOnly = true,
SQLConf.get
)
assert(broadcastSide === Some(BuildRight))
}
test("getShuffleHashJoinBuildSide (hintOnly = true) return smaller side when both have hints") {
val broadcastSide = getShuffleHashJoinBuildSide(
left,
right,
Inner,
JoinHint(hintShuffleHash, hintShuffleHash),
hintOnly = true,
SQLConf.get
)
assert(broadcastSide === Some(BuildRight))
}
test("getShuffleHashJoinBuildSide (hintOnly = true) return None when no side has a hint") {
val broadcastSide = getShuffleHashJoinBuildSide(
left,
right,
Inner,
JoinHint(None, None),
hintOnly = true,
SQLConf.get
)
assert(broadcastSide === None)
}
test("getShuffleHashJoinBuildSide (hintOnly = false) return BuildRight when right is smaller") {
val broadcastSide = getBroadcastBuildSide(
left,
right,
Inner,
JoinHint(None, None),
hintOnly = false,
SQLConf.get
)
assert(broadcastSide === Some(BuildRight))
}
test("getSmallerSide should return BuildRight") {
assert(getSmallerSide(left, right) === BuildRight)
}
test("canBroadcastBySize should return true if the plan size is less than 10MB") {
assert(canBroadcastBySize(left, SQLConf.get) === false)
assert(canBroadcastBySize(right, SQLConf.get) === true)
}
}

View file

@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper, NormalizeFloatingNumbers}
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@ -33,7 +33,6 @@ import org.apache.spark.sql.execution.aggregate.AggUtils
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemoryPlan
@ -135,93 +134,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Supports both equi-joins and non-equi-joins.
* Supports only inner like joins.
*/
object JoinSelection extends Strategy with PredicateHelper {
/**
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
/**
* Matches a plan whose single partition should be small enough to build a hash table.
*
* Note: this assume that the number of partition is fixed, requires additional work if it's
* dynamic.
*/
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
/**
* Returns whether plan a is much smaller (3X) than plan b.
*
* The cost to build hash map is higher than sorting, we should only build hash map on a table
* that is much smaller than other one. Since we does not have the statistic for number of rows,
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | RightOuter => true
case _ => false
}
private def getBuildSide(
wantToBuildLeft: Boolean,
wantToBuildRight: Boolean,
left: LogicalPlan,
right: LogicalPlan): Option[BuildSide] = {
if (wantToBuildLeft && wantToBuildRight) {
// returns the smaller side base on its estimated physical size, if we want to build the
// both sides.
Some(getSmallerSide(left, right))
} else if (wantToBuildLeft) {
Some(BuildLeft)
} else if (wantToBuildRight) {
Some(BuildRight)
} else {
None
}
}
private def getSmallerSide(left: LogicalPlan, right: LogicalPlan) = {
if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
}
private def hintToBroadcastLeft(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(BROADCAST))
}
private def hintToBroadcastRight(hint: JoinHint): Boolean = {
hint.rightHint.exists(_.strategy.contains(BROADCAST))
}
private def hintToShuffleHashLeft(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(SHUFFLE_HASH))
}
private def hintToShuffleHashRight(hint: JoinHint): Boolean = {
hint.rightHint.exists(_.strategy.contains(SHUFFLE_HASH))
}
private def hintToSortMergeJoin(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(SHUFFLE_MERGE)) ||
hint.rightHint.exists(_.strategy.contains(SHUFFLE_MERGE))
}
private def hintToShuffleReplicateNL(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL)) ||
hint.rightHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL))
}
object JoinSelection extends Strategy
with PredicateHelper
with JoinSelectionHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@ -245,33 +160,31 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) =>
def createBroadcastHashJoin(buildLeft: Boolean, buildRight: Boolean) = {
val wantToBuildLeft = canBuildLeft(joinType) && buildLeft
val wantToBuildRight = canBuildRight(joinType) && buildRight
getBuildSide(wantToBuildLeft, wantToBuildRight, left, right).map { buildSide =>
Seq(joins.BroadcastHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
condition,
planLater(left),
planLater(right)))
def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
buildSide =>
Seq(joins.BroadcastHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
condition,
planLater(left),
planLater(right)))
}
}
def createShuffleHashJoin(buildLeft: Boolean, buildRight: Boolean) = {
val wantToBuildLeft = canBuildLeft(joinType) && buildLeft
val wantToBuildRight = canBuildRight(joinType) && buildRight
getBuildSide(wantToBuildLeft, wantToBuildRight, left, right).map { buildSide =>
Seq(joins.ShuffledHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
condition,
planLater(left),
planLater(right)))
def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
buildSide =>
Seq(joins.ShuffledHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
condition,
planLater(left),
planLater(right)))
}
}
@ -293,14 +206,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
def createJoinWithoutHint() = {
createBroadcastHashJoin(
canBroadcast(left) && !hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_HASH)),
canBroadcast(right) && !hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_HASH)))
createBroadcastHashJoin(false)
.orElse {
if (!conf.preferSortMergeJoin) {
createShuffleHashJoin(
canBuildLocalHashMap(left) && muchSmaller(left, right),
canBuildLocalHashMap(right) && muchSmaller(right, left))
createShuffleHashJoin(false)
} else {
None
}
@ -315,9 +224,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
createBroadcastHashJoin(true)
.orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
.orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint)))
.orElse(createShuffleHashJoin(true))
.orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
.getOrElse(createJoinWithoutHint())
@ -374,7 +283,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
def createJoinWithoutHint() = {
createBroadcastNLJoin(canBroadcast(left), canBroadcast(right))
createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
.orElse(createCartesianProduct())
.getOrElse {
// This join could be very slow or OOM

View file

@ -19,10 +19,11 @@ package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, BuildLeft, BuildRight}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
/**
* Strategy for plans containing [[LogicalQueryStage]] nodes:

View file

@ -17,10 +17,11 @@
package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.internal.SQLConf
/**

View file

@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.dynamicpruning
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal, PredicateHelper}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.catalyst.rules.Rule

View file

@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan}

View file

@ -21,6 +21,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan}

View file

@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ExplainUtils, RowIterator}

View file

@ -23,6 +23,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.SparkPlan

View file

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
/**
* Physical execution operators for join operations.
*/
package object joins {
sealed abstract class BuildSide
case object BuildRight extends BuildSide
case object BuildLeft extends BuildSide
}

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql
import org.apache.log4j.Level
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, EliminateResolvedHint}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor

View file

@ -24,11 +24,12 @@ import org.apache.log4j.Level
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
import org.apache.spark.sql.{QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf

View file

@ -22,6 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, BitwiseOr, Cast, Literal, ShiftLeft}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.logical.BROADCAST
import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AdaptiveTestUtils, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint}

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint}
@ -133,7 +134,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
makeBroadcastHashJoin(
leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildLeft),
leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, BuildLeft),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
@ -145,7 +146,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
makeBroadcastHashJoin(
leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildRight),
leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, BuildRight),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
@ -157,7 +158,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
makeShuffledHashJoin(
leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildLeft),
leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, BuildLeft),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}
@ -169,7 +170,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
makeShuffledHashJoin(
leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildRight),
leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, BuildRight),
expectedAnswer.map(Row.fromTuple),
sortAnswers = true)
}

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint}