[SPARK-28177][SQL] Adjust post shuffle partition number in adaptive execution
## What changes were proposed in this pull request? This is to implement a ReduceNumShufflePartitions rule in the new adaptive execution framework introduced in #24706. This rule is used to adjust the post shuffle partitions based on the map output statistics. ## How was this patch tested? Added ReduceNumShufflePartitionsSuite Closes #24978 from carsonwang/reduceNumShufflePartitions. Authored-by: Carson Wang <carson.wang@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
4212a30883
commit
cec6a32904
|
@ -311,16 +311,30 @@ object SQLConf {
|
|||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
|
||||
buildConf("spark.sql.adaptive.reducePostShufflePartitions.enabled")
|
||||
.doc("When true and adaptive execution is enabled, this enables reducing the number of " +
|
||||
"post-shuffle partitions based on map output statistics.")
|
||||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
|
||||
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
|
||||
.internal()
|
||||
.doc("The advisory minimal number of post-shuffle partitions provided to " +
|
||||
"ExchangeCoordinator. This setting is used in our test to make sure we " +
|
||||
"have enough parallelism to expose issues that will not be exposed with a " +
|
||||
"single partition. When the value is a non-positive value, this setting will " +
|
||||
"not be provided to ExchangeCoordinator.")
|
||||
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
|
||||
.intConf
|
||||
.createWithDefault(-1)
|
||||
.checkValue(_ > 0, "The minimum shuffle partition number " +
|
||||
"must be a positive integer.")
|
||||
.createWithDefault(1)
|
||||
|
||||
val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
|
||||
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
|
||||
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
|
||||
"This is used as the initial number of pre-shuffle partitions. By default it equals to " +
|
||||
"spark.sql.shuffle.partitions")
|
||||
.intConf
|
||||
.checkValue(_ > 0, "The maximum shuffle partition number " +
|
||||
"must be a positive integer.")
|
||||
.createOptional
|
||||
|
||||
val SUBEXPRESSION_ELIMINATION_ENABLED =
|
||||
buildConf("spark.sql.subexpressionElimination.enabled")
|
||||
|
@ -1939,9 +1953,14 @@ class SQLConf extends Serializable with Logging {
|
|||
def adaptiveExecutionEnabled: Boolean =
|
||||
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)
|
||||
|
||||
def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
|
||||
|
||||
def minNumPostShufflePartitions: Int =
|
||||
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
|
||||
|
||||
def maxNumPostShufflePartitions: Int =
|
||||
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
|
||||
|
||||
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
|
||||
|
||||
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
|
|||
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
|
||||
import org.apache.spark.sql.execution._
|
||||
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
|
||||
import org.apache.spark.sql.execution.adaptive.rule.ReduceNumShufflePartitions
|
||||
import org.apache.spark.sql.execution.exchange._
|
||||
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
@ -82,6 +83,7 @@ case class AdaptiveSparkPlanExec(
|
|||
// A list of physical optimizer rules to be applied to a new stage before its execution. These
|
||||
// optimizations should be stage-independent.
|
||||
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
|
||||
ReduceNumShufflePartitions(conf),
|
||||
CollapseCodegenStages(conf)
|
||||
)
|
||||
|
||||
|
|
|
@ -161,9 +161,21 @@ case class BroadcastQueryStageExec(
|
|||
}
|
||||
}
|
||||
|
||||
object ShuffleQueryStageExec {
|
||||
/**
|
||||
* Returns true if the plan is a [[ShuffleQueryStageExec]] or a reused [[ShuffleQueryStageExec]].
|
||||
*/
|
||||
def isShuffleQueryStageExec(plan: SparkPlan): Boolean = plan match {
|
||||
case r: ReusedQueryStageExec => isShuffleQueryStageExec(r.plan)
|
||||
case _: ShuffleQueryStageExec => true
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
|
||||
object BroadcastQueryStageExec {
|
||||
/**
|
||||
* Returns if the plan is a [[BroadcastQueryStageExec]] or a reused [[BroadcastQueryStageExec]].
|
||||
* Returns true if the plan is a [[BroadcastQueryStageExec]] or a reused
|
||||
* [[BroadcastQueryStageExec]].
|
||||
*/
|
||||
def isBroadcastQueryStageExec(plan: SparkPlan): Boolean = plan match {
|
||||
case r: ReusedQueryStageExec => isBroadcastQueryStageExec(r.plan)
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.adaptive.rule
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.concurrent.duration.Duration
|
||||
|
||||
import org.apache.spark.MapOutputStatistics
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.expressions.Attribute
|
||||
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode}
|
||||
import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.util.ThreadUtils
|
||||
|
||||
/**
|
||||
* A rule to adjust the post shuffle partitions based on the map output statistics.
|
||||
*
|
||||
* The strategy used to determine the number of post-shuffle partitions is described as follows.
|
||||
* To determine the number of post-shuffle partitions, we have a target input size for a
|
||||
* post-shuffle partition. Once we have size statistics of all pre-shuffle partitions, we will do
|
||||
* a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single
|
||||
* post-shuffle partition until adding another pre-shuffle partition would cause the size of a
|
||||
* post-shuffle partition to be greater than the target size.
|
||||
*
|
||||
* For example, we have two stages with the following pre-shuffle partition size statistics:
|
||||
* stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
|
||||
* stage 2: [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB]
|
||||
* assuming the target input size is 128 MiB, we will have four post-shuffle partitions,
|
||||
* which are:
|
||||
* - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB)
|
||||
* - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB)
|
||||
* - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
|
||||
* - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
|
||||
*/
|
||||
case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
|
||||
|
||||
override def apply(plan: SparkPlan): SparkPlan = {
|
||||
if (!conf.reducePostShufflePartitionsEnabled) {
|
||||
return plan
|
||||
}
|
||||
if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) {
|
||||
// If not all leaf nodes are query stages, it's not safe to reduce the number of
|
||||
// shuffle partitions, because we may break the assumption that all children of a spark plan
|
||||
// have same number of output partitions.
|
||||
plan
|
||||
} else {
|
||||
val shuffleStages = plan.collect {
|
||||
case stage: ShuffleQueryStageExec => stage
|
||||
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
|
||||
}
|
||||
val shuffleMetrics = shuffleStages.map { stage =>
|
||||
val metricsFuture = stage.mapOutputStatisticsFuture
|
||||
assert(metricsFuture.isCompleted, "ShuffleQueryStageExec should already be ready")
|
||||
ThreadUtils.awaitResult(metricsFuture, Duration.Zero)
|
||||
}
|
||||
|
||||
// `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions,
|
||||
// we should skip it when calculating the `partitionStartIndices`.
|
||||
val validMetrics = shuffleMetrics.filter(_ != null)
|
||||
// We may get different pre-shuffle partition number if user calls repartition manually.
|
||||
// We don't reduce shuffle partition number in that case.
|
||||
val distinctNumPreShufflePartitions =
|
||||
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
|
||||
|
||||
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
|
||||
val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray)
|
||||
// This transformation adds new nodes, so we must use `transformUp` here.
|
||||
plan.transformUp {
|
||||
// even for shuffle exchange whose input RDD has 0 partition, we should still update its
|
||||
// `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
|
||||
// number of output partitions.
|
||||
case stage: QueryStageExec if ShuffleQueryStageExec.isShuffleQueryStageExec(stage) =>
|
||||
CoalescedShuffleReaderExec(stage, partitionStartIndices)
|
||||
}
|
||||
} else {
|
||||
plan
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimates partition start indices for post-shuffle partitions based on
|
||||
* mapOutputStatistics provided by all pre-shuffle stages.
|
||||
*/
|
||||
// visible for testing.
|
||||
private[sql] def estimatePartitionStartIndices(
|
||||
mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
|
||||
val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
|
||||
val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
|
||||
// If minNumPostShufflePartitions is defined, it is possible that we need to use a
|
||||
// value less than advisoryTargetPostShuffleInputSize as the target input size of
|
||||
// a post shuffle task.
|
||||
val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
|
||||
// The max at here is to make sure that when we have an empty table, we
|
||||
// only have a single post-shuffle partition.
|
||||
// There is no particular reason that we pick 16. We just need a number to
|
||||
// prevent maxPostShuffleInputSize from being set to 0.
|
||||
val maxPostShuffleInputSize = math.max(
|
||||
math.ceil(totalPostShuffleInputSize / minNumPostShufflePartitions.toDouble).toLong, 16)
|
||||
val targetPostShuffleInputSize =
|
||||
math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
|
||||
|
||||
logInfo(
|
||||
s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
|
||||
s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
|
||||
|
||||
// Make sure we do get the same number of pre-shuffle partitions for those stages.
|
||||
val distinctNumPreShufflePartitions =
|
||||
mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
|
||||
// The reason that we are expecting a single value of the number of pre-shuffle partitions
|
||||
// is that when we add Exchanges, we set the number of pre-shuffle partitions
|
||||
// (i.e. map output partitions) using a static setting, which is the value of
|
||||
// spark.sql.shuffle.partitions. Even if two input RDDs are having different
|
||||
// number of partitions, they will have the same number of pre-shuffle partitions
|
||||
// (i.e. map output partitions).
|
||||
assert(
|
||||
distinctNumPreShufflePartitions.length == 1,
|
||||
"There should be only one distinct value of the number pre-shuffle partitions " +
|
||||
"among registered Exchange operator.")
|
||||
val numPreShufflePartitions = distinctNumPreShufflePartitions.head
|
||||
|
||||
val partitionStartIndices = ArrayBuffer[Int]()
|
||||
// The first element of partitionStartIndices is always 0.
|
||||
partitionStartIndices += 0
|
||||
|
||||
var postShuffleInputSize = 0L
|
||||
|
||||
var i = 0
|
||||
while (i < numPreShufflePartitions) {
|
||||
// We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
|
||||
// Then, we add the total size to postShuffleInputSize.
|
||||
var nextShuffleInputSize = 0L
|
||||
var j = 0
|
||||
while (j < mapOutputStatistics.length) {
|
||||
nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
|
||||
j += 1
|
||||
}
|
||||
|
||||
// If including the nextShuffleInputSize would exceed the target partition size, then start a
|
||||
// new partition.
|
||||
if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) {
|
||||
partitionStartIndices += i
|
||||
// reset postShuffleInputSize.
|
||||
postShuffleInputSize = nextShuffleInputSize
|
||||
} else {
|
||||
postShuffleInputSize += nextShuffleInputSize
|
||||
}
|
||||
|
||||
i += 1
|
||||
}
|
||||
|
||||
partitionStartIndices.toArray
|
||||
}
|
||||
}
|
||||
|
||||
case class CoalescedShuffleReaderExec(
|
||||
child: QueryStageExec,
|
||||
partitionStartIndices: Array[Int]) extends UnaryExecNode {
|
||||
|
||||
override def output: Seq[Attribute] = child.output
|
||||
|
||||
override def doCanonicalize(): SparkPlan = child.canonicalized
|
||||
|
||||
override def outputPartitioning: Partitioning = {
|
||||
UnknownPartitioning(partitionStartIndices.length)
|
||||
}
|
||||
|
||||
private var cachedShuffleRDD: ShuffledRowRDD = null
|
||||
|
||||
override protected def doExecute(): RDD[InternalRow] = {
|
||||
if (cachedShuffleRDD == null) {
|
||||
cachedShuffleRDD = child match {
|
||||
case stage: ShuffleQueryStageExec =>
|
||||
stage.plan.createShuffledRDD(Some(partitionStartIndices))
|
||||
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
|
||||
stage.plan.createShuffledRDD(Some(partitionStartIndices))
|
||||
}
|
||||
}
|
||||
cachedShuffleRDD
|
||||
}
|
||||
}
|
|
@ -36,107 +36,12 @@ import org.apache.spark.sql.internal.SQLConf
|
|||
* the input partition ordering requirements are met.
|
||||
*/
|
||||
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
|
||||
private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
|
||||
|
||||
private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize
|
||||
|
||||
private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
|
||||
|
||||
private def minNumPostShufflePartitions: Option[Int] = {
|
||||
val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
|
||||
if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled
|
||||
* and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]].
|
||||
*/
|
||||
private def withExchangeCoordinator(
|
||||
children: Seq[SparkPlan],
|
||||
requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
|
||||
val supportsCoordinator =
|
||||
if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
|
||||
// Right now, ExchangeCoordinator only support HashPartitionings.
|
||||
children.forall {
|
||||
case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
|
||||
case child =>
|
||||
child.outputPartitioning match {
|
||||
case hash: HashPartitioning => true
|
||||
case collection: PartitioningCollection =>
|
||||
collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// In this case, although we do not have Exchange operators, we may still need to
|
||||
// shuffle data when we have more than one children because data generated by
|
||||
// these children may not be partitioned in the same way.
|
||||
// Please see the comment in withCoordinator for more details.
|
||||
val supportsDistribution = requiredChildDistributions.forall { dist =>
|
||||
dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution]
|
||||
}
|
||||
children.length > 1 && supportsDistribution
|
||||
}
|
||||
|
||||
val withCoordinator =
|
||||
if (adaptiveExecutionEnabled && supportsCoordinator) {
|
||||
val coordinator =
|
||||
new ExchangeCoordinator(
|
||||
targetPostShuffleInputSize,
|
||||
minNumPostShufflePartitions)
|
||||
children.zip(requiredChildDistributions).map {
|
||||
case (e: ShuffleExchangeExec, _) =>
|
||||
// This child is an Exchange, we need to add the coordinator.
|
||||
e.copy(coordinator = Some(coordinator))
|
||||
case (child, distribution) =>
|
||||
// If this child is not an Exchange, we need to add an Exchange for now.
|
||||
// Ideally, we can try to avoid this Exchange. However, when we reach here,
|
||||
// there are at least two children operators (because if there is a single child
|
||||
// and we can avoid Exchange, supportsCoordinator will be false and we
|
||||
// will not reach here.). Although we can make two children have the same number of
|
||||
// post-shuffle partitions. Their numbers of pre-shuffle partitions may be different.
|
||||
// For example, let's say we have the following plan
|
||||
// Join
|
||||
// / \
|
||||
// Agg Exchange
|
||||
// / \
|
||||
// Exchange t2
|
||||
// /
|
||||
// t1
|
||||
// In this case, because a post-shuffle partition can include multiple pre-shuffle
|
||||
// partitions, a HashPartitioning will not be strictly partitioned by the hashcodes
|
||||
// after shuffle. So, even we can use the child Exchange operator of the Join to
|
||||
// have a number of post-shuffle partitions that matches the number of partitions of
|
||||
// Agg, we cannot say these two children are partitioned in the same way.
|
||||
// Here is another case
|
||||
// Join
|
||||
// / \
|
||||
// Agg1 Agg2
|
||||
// / \
|
||||
// Exchange1 Exchange2
|
||||
// / \
|
||||
// t1 t2
|
||||
// In this case, two Aggs shuffle data with the same column of the join condition.
|
||||
// After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same
|
||||
// way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2
|
||||
// post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle
|
||||
// partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its
|
||||
// pre-shuffle partitions by using another partitionStartIndices [0, 4].
|
||||
// So, Agg1 and Agg2 are actually not co-partitioned.
|
||||
//
|
||||
// It will be great to introduce a new Partitioning to represent the post-shuffle
|
||||
// partitions when one post-shuffle partition includes multiple pre-shuffle partitions.
|
||||
val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions)
|
||||
assert(targetPartitioning.isInstanceOf[HashPartitioning])
|
||||
ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
|
||||
}
|
||||
} else {
|
||||
// If we do not need ExchangeCoordinator, the original children are returned.
|
||||
children
|
||||
}
|
||||
|
||||
withCoordinator
|
||||
}
|
||||
private def defaultNumPreShufflePartitions: Int =
|
||||
if (conf.runtimeReoptimizationEnabled) {
|
||||
conf.maxNumPostShufflePartitions
|
||||
} else {
|
||||
conf.numShufflePartitions
|
||||
}
|
||||
|
||||
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
|
||||
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
|
||||
|
@ -189,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
|
|||
val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
|
||||
child match {
|
||||
// If child is an exchange, we replace it with a new one having defaultPartitioning.
|
||||
case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
|
||||
case ShuffleExchangeExec(_, c) => ShuffleExchangeExec(defaultPartitioning, c)
|
||||
case _ => ShuffleExchangeExec(defaultPartitioning, child)
|
||||
}
|
||||
}
|
||||
|
@ -198,15 +103,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
|
|||
}
|
||||
}
|
||||
|
||||
// Now, we need to add ExchangeCoordinator if necessary.
|
||||
// Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
|
||||
// However, with the way that we plan the query, we do not have a place where we have a
|
||||
// global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
|
||||
// at here for now.
|
||||
// Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
|
||||
// we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
|
||||
children = withExchangeCoordinator(children, requiredChildDistributions)
|
||||
|
||||
// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
|
||||
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
|
||||
// If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
|
||||
|
@ -295,7 +191,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
|
|||
|
||||
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
|
||||
// TODO: remove this after we create a physical operator for `RepartitionByExpression`.
|
||||
case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
|
||||
case operator @ ShuffleExchangeExec(upper: HashPartitioning, child) =>
|
||||
child.outputPartitioning match {
|
||||
case lower: HashPartitioning if upper.semanticEquals(lower) => child
|
||||
case _ => operator
|
||||
|
|
|
@ -1,277 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.exchange
|
||||
|
||||
import java.util.{HashMap => JHashMap, Map => JMap}
|
||||
import javax.annotation.concurrent.GuardedBy
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.apache.spark.{MapOutputStatistics, ShuffleDependency, SimpleFutureAction}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
|
||||
|
||||
/**
|
||||
* A coordinator used to determines how we shuffle data between stages generated by Spark SQL.
|
||||
* Right now, the work of this coordinator is to determine the number of post-shuffle partitions
|
||||
* for a stage that needs to fetch shuffle data from one or multiple stages.
|
||||
*
|
||||
* A coordinator is constructed with three parameters, `numExchanges`,
|
||||
* `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
|
||||
* - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be
|
||||
* registered to this coordinator. So, when we start to do any actual work, we have a way to
|
||||
* make sure that we have got expected number of [[ShuffleExchangeExec]]s.
|
||||
* - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
|
||||
* input data size. With this parameter, we can estimate the number of post-shuffle partitions.
|
||||
* This parameter is configured through
|
||||
* `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`.
|
||||
* - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator
|
||||
* will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle
|
||||
* partitions.
|
||||
*
|
||||
* The workflow of this coordinator is described as follows:
|
||||
* - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator,
|
||||
* if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
|
||||
* This happens in the `doPrepare` method.
|
||||
* - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this
|
||||
* coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
|
||||
* [[ShuffledRowRDD]].
|
||||
* If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]]
|
||||
* will immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
|
||||
* - If this coordinator has not made the decision on how to shuffle data, it will ask those
|
||||
* registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the
|
||||
* size statistics of pre-shuffle partitions, this coordinator will determine the number of
|
||||
* post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
|
||||
* to a single post-shuffle partition whenever necessary.
|
||||
* - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
|
||||
* [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this
|
||||
* coordinator can lookup the corresponding [[RDD]].
|
||||
*
|
||||
* The strategy used to determine the number of post-shuffle partitions is described as follows.
|
||||
* To determine the number of post-shuffle partitions, we have a target input size for a
|
||||
* post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
|
||||
* corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics
|
||||
* and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
|
||||
* adding another pre-shuffle partition would cause the size of a post-shuffle partition to be
|
||||
* greater than the target size.
|
||||
*
|
||||
* For example, we have two stages with the following pre-shuffle partition size statistics:
|
||||
* stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
|
||||
* stage 2: [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB]
|
||||
* assuming the target input size is 128 MiB, we will have four post-shuffle partitions,
|
||||
* which are:
|
||||
* - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB)
|
||||
* - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB)
|
||||
* - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
|
||||
* - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
|
||||
*/
|
||||
class ExchangeCoordinator(
|
||||
advisoryTargetPostShuffleInputSize: Long,
|
||||
minNumPostShufflePartitions: Option[Int] = None)
|
||||
extends Logging {
|
||||
|
||||
// The registered Exchange operators.
|
||||
private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()
|
||||
|
||||
// `lazy val` is used here so that we could notice the wrong use of this class, e.g., all the
|
||||
// exchanges should be registered before `postShuffleRDD` called first time. If a new exchange is
|
||||
// registered after the `postShuffleRDD` call, `assert(exchanges.length == numExchanges)` fails
|
||||
// in `doEstimationIfNecessary`.
|
||||
private[this] lazy val numExchanges = exchanges.size
|
||||
|
||||
// This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
|
||||
private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] =
|
||||
new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
|
||||
|
||||
// A boolean that indicates if this coordinator has made decision on how to shuffle data.
|
||||
// This variable will only be updated by doEstimationIfNecessary, which is protected by
|
||||
// synchronized.
|
||||
@volatile private[this] var estimated: Boolean = false
|
||||
|
||||
/**
|
||||
* Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed
|
||||
* to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator.
|
||||
*/
|
||||
@GuardedBy("this")
|
||||
def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized {
|
||||
exchanges += exchange
|
||||
}
|
||||
|
||||
def isEstimated: Boolean = estimated
|
||||
|
||||
/**
|
||||
* Estimates partition start indices for post-shuffle partitions based on
|
||||
* mapOutputStatistics provided by all pre-shuffle stages.
|
||||
*/
|
||||
def estimatePartitionStartIndices(
|
||||
mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
|
||||
// If minNumPostShufflePartitions is defined, it is possible that we need to use a
|
||||
// value less than advisoryTargetPostShuffleInputSize as the target input size of
|
||||
// a post shuffle task.
|
||||
val targetPostShuffleInputSize = minNumPostShufflePartitions match {
|
||||
case Some(numPartitions) =>
|
||||
val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
|
||||
// The max at here is to make sure that when we have an empty table, we
|
||||
// only have a single post-shuffle partition.
|
||||
// There is no particular reason that we pick 16. We just need a number to
|
||||
// prevent maxPostShuffleInputSize from being set to 0.
|
||||
val maxPostShuffleInputSize =
|
||||
math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16)
|
||||
math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
|
||||
|
||||
case None => advisoryTargetPostShuffleInputSize
|
||||
}
|
||||
|
||||
logInfo(
|
||||
s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
|
||||
s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
|
||||
|
||||
// Make sure we do get the same number of pre-shuffle partitions for those stages.
|
||||
val distinctNumPreShufflePartitions =
|
||||
mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
|
||||
// The reason that we are expecting a single value of the number of pre-shuffle partitions
|
||||
// is that when we add Exchanges, we set the number of pre-shuffle partitions
|
||||
// (i.e. map output partitions) using a static setting, which is the value of
|
||||
// spark.sql.shuffle.partitions. Even if two input RDDs are having different
|
||||
// number of partitions, they will have the same number of pre-shuffle partitions
|
||||
// (i.e. map output partitions).
|
||||
assert(
|
||||
distinctNumPreShufflePartitions.length == 1,
|
||||
"There should be only one distinct value of the number pre-shuffle partitions " +
|
||||
"among registered Exchange operator.")
|
||||
val numPreShufflePartitions = distinctNumPreShufflePartitions.head
|
||||
|
||||
val partitionStartIndices = ArrayBuffer[Int]()
|
||||
// The first element of partitionStartIndices is always 0.
|
||||
partitionStartIndices += 0
|
||||
|
||||
var postShuffleInputSize = 0L
|
||||
|
||||
var i = 0
|
||||
while (i < numPreShufflePartitions) {
|
||||
// We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
|
||||
// Then, we add the total size to postShuffleInputSize.
|
||||
var nextShuffleInputSize = 0L
|
||||
var j = 0
|
||||
while (j < mapOutputStatistics.length) {
|
||||
nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
|
||||
j += 1
|
||||
}
|
||||
|
||||
// If including the nextShuffleInputSize would exceed the target partition size, then start a
|
||||
// new partition.
|
||||
if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) {
|
||||
partitionStartIndices += i
|
||||
// reset postShuffleInputSize.
|
||||
postShuffleInputSize = nextShuffleInputSize
|
||||
} else postShuffleInputSize += nextShuffleInputSize
|
||||
|
||||
i += 1
|
||||
}
|
||||
|
||||
partitionStartIndices.toArray
|
||||
}
|
||||
|
||||
@GuardedBy("this")
|
||||
private def doEstimationIfNecessary(): Unit = synchronized {
|
||||
// It is unlikely that this method will be called from multiple threads
|
||||
// (when multiple threads trigger the execution of THIS physical)
|
||||
// because in common use cases, we will create new physical plan after
|
||||
// users apply operations (e.g. projection) to an existing DataFrame.
|
||||
// However, if it happens, we have synchronized to make sure only one
|
||||
// thread will trigger the job submission.
|
||||
if (!estimated) {
|
||||
// Make sure we have the expected number of registered Exchange operators.
|
||||
assert(exchanges.length == numExchanges)
|
||||
|
||||
val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
|
||||
|
||||
// Submit all map stages
|
||||
val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
|
||||
val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]()
|
||||
var i = 0
|
||||
while (i < numExchanges) {
|
||||
val exchange = exchanges(i)
|
||||
val shuffleDependency = exchange.shuffleDependency
|
||||
shuffleDependencies += shuffleDependency
|
||||
if (shuffleDependency.rdd.partitions.length != 0) {
|
||||
// submitMapStage does not accept RDD with 0 partition.
|
||||
// So, we will not submit this dependency.
|
||||
submittedStageFutures +=
|
||||
exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency)
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
|
||||
// Wait for the finishes of those submitted map stages.
|
||||
val mapOutputStatistics = new Array[MapOutputStatistics](submittedStageFutures.length)
|
||||
var j = 0
|
||||
while (j < submittedStageFutures.length) {
|
||||
// This call is a blocking call. If the stage has not finished, we will wait at here.
|
||||
mapOutputStatistics(j) = submittedStageFutures(j).get()
|
||||
j += 1
|
||||
}
|
||||
|
||||
// If we have mapOutputStatistics.length < numExchange, it is because we do not submit
|
||||
// a stage when the number of partitions of this dependency is 0.
|
||||
assert(mapOutputStatistics.length <= numExchanges)
|
||||
|
||||
// Now, we estimate partitionStartIndices. partitionStartIndices.length will be the
|
||||
// number of post-shuffle partitions.
|
||||
val partitionStartIndices =
|
||||
if (mapOutputStatistics.length == 0) {
|
||||
Array.empty[Int]
|
||||
} else {
|
||||
estimatePartitionStartIndices(mapOutputStatistics)
|
||||
}
|
||||
|
||||
var k = 0
|
||||
while (k < numExchanges) {
|
||||
val exchange = exchanges(k)
|
||||
val rdd =
|
||||
exchange.preparePostShuffleRDD(shuffleDependencies(k), Some(partitionStartIndices))
|
||||
newPostShuffleRDDs.put(exchange, rdd)
|
||||
|
||||
k += 1
|
||||
}
|
||||
|
||||
// Finally, we set postShuffleRDDs and estimated.
|
||||
assert(postShuffleRDDs.isEmpty)
|
||||
assert(newPostShuffleRDDs.size() == numExchanges)
|
||||
postShuffleRDDs.putAll(newPostShuffleRDDs)
|
||||
estimated = true
|
||||
}
|
||||
}
|
||||
|
||||
def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = {
|
||||
doEstimationIfNecessary()
|
||||
|
||||
if (!postShuffleRDDs.containsKey(exchange)) {
|
||||
throw new IllegalStateException(
|
||||
s"The given $exchange is not registered in this coordinator.")
|
||||
}
|
||||
|
||||
postShuffleRDDs.get(exchange)
|
||||
}
|
||||
|
||||
override def toString: String = {
|
||||
s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]"
|
||||
}
|
||||
}
|
|
@ -39,12 +39,11 @@ import org.apache.spark.util.MutablePair
|
|||
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
|
||||
|
||||
/**
|
||||
* Performs a shuffle that will result in the desired `newPartitioning`.
|
||||
* Performs a shuffle that will result in the desired partitioning.
|
||||
*/
|
||||
case class ShuffleExchangeExec(
|
||||
var newPartitioning: Partitioning,
|
||||
child: SparkPlan,
|
||||
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
|
||||
override val outputPartitioning: Partitioning,
|
||||
child: SparkPlan) extends Exchange {
|
||||
|
||||
// NOTE: coordinator can be null after serialization/deserialization,
|
||||
// e.g. it can be null on the Executor side
|
||||
|
@ -56,37 +55,11 @@ case class ShuffleExchangeExec(
|
|||
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
|
||||
) ++ readMetrics ++ writeMetrics
|
||||
|
||||
override def nodeName: String = {
|
||||
val extraInfo = coordinator match {
|
||||
case Some(exchangeCoordinator) =>
|
||||
s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
|
||||
case _ => ""
|
||||
}
|
||||
|
||||
val simpleNodeName = "Exchange"
|
||||
s"$simpleNodeName$extraInfo"
|
||||
}
|
||||
|
||||
override def outputPartitioning: Partitioning = newPartitioning
|
||||
override def nodeName: String = "Exchange"
|
||||
|
||||
private val serializer: Serializer =
|
||||
new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
|
||||
|
||||
override protected def doPrepare(): Unit = {
|
||||
// If an ExchangeCoordinator is needed, we register this Exchange operator
|
||||
// to the coordinator when we do prepare. It is important to make sure
|
||||
// we register this operator right before the execution instead of register it
|
||||
// in the constructor because it is possible that we create new instances of
|
||||
// Exchange operators when we transform the physical plan
|
||||
// (then the ExchangeCoordinator will hold references of unneeded Exchanges).
|
||||
// So, we should only call registerExchange just before we start to execute
|
||||
// the plan.
|
||||
coordinator match {
|
||||
case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this)
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
|
||||
@transient lazy val inputRDD: RDD[InternalRow] = child.execute()
|
||||
|
||||
/**
|
||||
|
@ -99,28 +72,13 @@ case class ShuffleExchangeExec(
|
|||
ShuffleExchangeExec.prepareShuffleDependency(
|
||||
inputRDD,
|
||||
child.output,
|
||||
newPartitioning,
|
||||
outputPartitioning,
|
||||
serializer,
|
||||
writeMetrics)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset.
|
||||
* This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional
|
||||
* partition start indices array. If this optional array is defined, the returned
|
||||
* [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array.
|
||||
*/
|
||||
private[exchange] def preparePostShuffleRDD(
|
||||
shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
|
||||
specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
|
||||
// If an array of partition start indices is provided, we need to use this array
|
||||
// to create the ShuffledRowRDD. Also, we need to update newPartitioning to
|
||||
// update the number of post-shuffle partitions.
|
||||
specifiedPartitionStartIndices.foreach { indices =>
|
||||
assert(newPartitioning.isInstanceOf[HashPartitioning])
|
||||
newPartitioning = UnknownPartitioning(indices.length)
|
||||
}
|
||||
new ShuffledRowRDD(shuffleDependency, readMetrics, specifiedPartitionStartIndices)
|
||||
def createShuffledRDD(partitionStartIndices: Option[Array[Int]]): ShuffledRowRDD = {
|
||||
new ShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,23 +89,13 @@ case class ShuffleExchangeExec(
|
|||
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
|
||||
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
|
||||
if (cachedShuffleRDD == null) {
|
||||
cachedShuffleRDD = coordinator match {
|
||||
case Some(exchangeCoordinator) =>
|
||||
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
|
||||
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
|
||||
shuffleRDD
|
||||
case _ =>
|
||||
preparePostShuffleRDD(shuffleDependency)
|
||||
}
|
||||
cachedShuffleRDD = createShuffledRDD(None)
|
||||
}
|
||||
cachedShuffleRDD
|
||||
}
|
||||
}
|
||||
|
||||
object ShuffleExchangeExec {
|
||||
def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = {
|
||||
ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether records must be defensively copied before being sent to the shuffle.
|
||||
|
|
|
@ -320,7 +320,7 @@ abstract class StreamExecution(
|
|||
logicalPlan
|
||||
|
||||
// Adaptive execution can change num shuffle partitions, disallow
|
||||
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
|
||||
sparkSessionForStream.conf.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "false")
|
||||
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
|
||||
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
|
||||
offsetSeqMetadata = OffsetSeqMetadata(
|
||||
|
|
|
@ -255,8 +255,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
|
|||
|
||||
val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
|
||||
|
||||
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
|
||||
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
|
||||
if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
|
||||
logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} " +
|
||||
"is not supported in streaming DataFrames/Datasets and will be disabled.")
|
||||
}
|
||||
|
||||
|
|
|
@ -1365,7 +1365,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
|
|||
val agg = cp.groupBy('id % 2).agg(count('id))
|
||||
|
||||
agg.queryExecution.executedPlan.collectFirst {
|
||||
case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
|
||||
case ShuffleExchangeExec(_, _: RDDScanExec) =>
|
||||
case BroadcastExchangeExec(_, _: RDDScanExec) =>
|
||||
}.foreach { _ =>
|
||||
fail(
|
||||
|
|
|
@ -413,8 +413,7 @@ class PlannerSuite extends SharedSQLContext {
|
|||
|
||||
val inputPlan = ShuffleExchangeExec(
|
||||
partitioning,
|
||||
DummySparkPlan(outputPartitioning = partitioning),
|
||||
None)
|
||||
DummySparkPlan(outputPartitioning = partitioning))
|
||||
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
|
||||
assertDistributionRequirementsAreSatisfied(outputPlan)
|
||||
if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 2) {
|
||||
|
@ -429,8 +428,7 @@ class PlannerSuite extends SharedSQLContext {
|
|||
|
||||
val inputPlan = ShuffleExchangeExec(
|
||||
partitioning,
|
||||
DummySparkPlan(outputPartitioning = partitioning),
|
||||
None)
|
||||
DummySparkPlan(outputPartitioning = partitioning))
|
||||
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
|
||||
assertDistributionRequirementsAreSatisfied(outputPlan)
|
||||
if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) {
|
||||
|
@ -452,7 +450,7 @@ class PlannerSuite extends SharedSQLContext {
|
|||
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
|
||||
val shuffle = outputPlan.collect { case e: ShuffleExchangeExec => e }
|
||||
assert(shuffle.size === 1)
|
||||
assert(shuffle.head.newPartitioning === finalPartitioning)
|
||||
assert(shuffle.head.outputPartitioning === finalPartitioning)
|
||||
}
|
||||
|
||||
test("Reuse exchanges") {
|
||||
|
@ -464,8 +462,7 @@ class PlannerSuite extends SharedSQLContext {
|
|||
DummySparkPlan(
|
||||
children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
|
||||
requiredChildDistribution = Seq(distribution),
|
||||
requiredChildOrdering = Seq(Seq.empty)),
|
||||
None)
|
||||
requiredChildOrdering = Seq(Seq.empty)))
|
||||
|
||||
val inputPlan = SortMergeJoinExec(
|
||||
Literal(1) :: Nil,
|
||||
|
|
|
@ -22,11 +22,12 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec}
|
||||
import org.apache.spark.sql.execution.adaptive._
|
||||
import org.apache.spark.sql.execution.adaptive.rule.{CoalescedShuffleReaderExec, ReduceNumShufflePartitions}
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
||||
class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll {
|
||||
|
||||
private var originalActiveSparkSession: Option[SparkSession] = _
|
||||
private var originalInstantiatedSparkSession: Option[SparkSession] = _
|
||||
|
@ -51,7 +52,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
private def checkEstimation(
|
||||
coordinator: ExchangeCoordinator,
|
||||
rule: ReduceNumShufflePartitions,
|
||||
bytesByPartitionIdArray: Array[Array[Long]],
|
||||
expectedPartitionStartIndices: Array[Int]): Unit = {
|
||||
val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
|
||||
|
@ -59,18 +60,27 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
new MapOutputStatistics(index, bytesByPartitionId)
|
||||
}
|
||||
val estimatedPartitionStartIndices =
|
||||
coordinator.estimatePartitionStartIndices(mapOutputStatistics)
|
||||
rule.estimatePartitionStartIndices(mapOutputStatistics)
|
||||
assert(estimatedPartitionStartIndices === expectedPartitionStartIndices)
|
||||
}
|
||||
|
||||
private def createReduceNumShufflePartitionsRule(
|
||||
advisoryTargetPostShuffleInputSize: Long,
|
||||
minNumPostShufflePartitions: Int = 1): ReduceNumShufflePartitions = {
|
||||
val conf = new SQLConf().copy(
|
||||
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE -> advisoryTargetPostShuffleInputSize,
|
||||
SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS -> minNumPostShufflePartitions)
|
||||
ReduceNumShufflePartitions(conf)
|
||||
}
|
||||
|
||||
test("test estimatePartitionStartIndices - 1 Exchange") {
|
||||
val coordinator = new ExchangeCoordinator(100L)
|
||||
val rule = createReduceNumShufflePartitionsRule(100L)
|
||||
|
||||
{
|
||||
// All bytes per partition are 0.
|
||||
val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
|
||||
val expectedPartitionStartIndices = Array[Int](0)
|
||||
checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -78,40 +88,40 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
// 1 post-shuffle partition is needed.
|
||||
val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
|
||||
val expectedPartitionStartIndices = Array[Int](0)
|
||||
checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
}
|
||||
|
||||
{
|
||||
// 2 post-shuffle partitions are needed.
|
||||
val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 3)
|
||||
checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
}
|
||||
|
||||
{
|
||||
// There are a few large pre-shuffle partitions.
|
||||
val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
|
||||
checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
}
|
||||
|
||||
{
|
||||
// All pre-shuffle partitions are larger than the targeted size.
|
||||
val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
|
||||
checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
}
|
||||
|
||||
{
|
||||
// The last pre-shuffle partition is in a single post-shuffle partition.
|
||||
val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 4)
|
||||
checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
|
||||
}
|
||||
}
|
||||
|
||||
test("test estimatePartitionStartIndices - 2 Exchanges") {
|
||||
val coordinator = new ExchangeCoordinator(100L)
|
||||
val rule = createReduceNumShufflePartitionsRule(100L)
|
||||
|
||||
{
|
||||
// If there are multiple values of the number of pre-shuffle partitions,
|
||||
|
@ -122,7 +132,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
Array(
|
||||
new MapOutputStatistics(0, bytesByPartitionId1),
|
||||
new MapOutputStatistics(1, bytesByPartitionId2))
|
||||
intercept[AssertionError](coordinator.estimatePartitionStartIndices(mapOutputStatistics))
|
||||
intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics))
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -131,7 +141,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
|
||||
val expectedPartitionStartIndices = Array[Int](0)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -143,7 +153,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
|
||||
val expectedPartitionStartIndices = Array[Int](0)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -154,7 +164,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 2, 4)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -165,7 +175,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -176,7 +186,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -187,7 +197,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -198,14 +208,14 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
}
|
||||
|
||||
test("test estimatePartitionStartIndices and enforce minimal number of reducers") {
|
||||
val coordinator = new ExchangeCoordinator(100L, Some(2))
|
||||
val rule = createReduceNumShufflePartitionsRule(100L, 2)
|
||||
|
||||
{
|
||||
// The minimal number of post-shuffle partitions is not enforced because
|
||||
|
@ -214,7 +224,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
|
||||
val expectedPartitionStartIndices = Array[Int](0)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -225,7 +235,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 3)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -236,7 +246,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30)
|
||||
val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4)
|
||||
checkEstimation(
|
||||
coordinator,
|
||||
rule,
|
||||
Array(bytesByPartitionId1, bytesByPartitionId2),
|
||||
expectedPartitionStartIndices)
|
||||
}
|
||||
|
@ -257,24 +267,24 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
|
||||
def withSparkSession(
|
||||
f: SparkSession => Unit,
|
||||
targetNumPostShufflePartitions: Int,
|
||||
targetPostShuffleInputSize: Int,
|
||||
minNumPostShufflePartitions: Option[Int]): Unit = {
|
||||
val sparkConf =
|
||||
new SparkConf(false)
|
||||
.setMaster("local[*]")
|
||||
.setAppName("test")
|
||||
.set(UI_ENABLED, false)
|
||||
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
|
||||
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
|
||||
.set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
|
||||
.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "true")
|
||||
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
|
||||
.set(
|
||||
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
|
||||
targetNumPostShufflePartitions.toString)
|
||||
targetPostShuffleInputSize.toString)
|
||||
minNumPostShufflePartitions match {
|
||||
case Some(numPartitions) =>
|
||||
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, numPartitions.toString)
|
||||
case None =>
|
||||
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "-1")
|
||||
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1")
|
||||
}
|
||||
|
||||
val spark = SparkSession.builder()
|
||||
|
@ -304,25 +314,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
|
||||
// Then, let's look at the number of post-shuffle partitions estimated
|
||||
// by the ExchangeCoordinator.
|
||||
val exchanges = agg.queryExecution.executedPlan.collect {
|
||||
case e: ShuffleExchangeExec => e
|
||||
val finalPlan = agg.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
val shuffleReaders = finalPlan.collect {
|
||||
case reader: CoalescedShuffleReaderExec => reader
|
||||
}
|
||||
assert(exchanges.length === 1)
|
||||
assert(shuffleReaders.length === 1)
|
||||
minNumPostShufflePartitions match {
|
||||
case Some(numPartitions) =>
|
||||
exchanges.foreach {
|
||||
case e: ShuffleExchangeExec =>
|
||||
assert(e.coordinator.isDefined)
|
||||
assert(e.outputPartitioning.numPartitions === 5)
|
||||
case o =>
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === numPartitions)
|
||||
}
|
||||
|
||||
case None =>
|
||||
exchanges.foreach {
|
||||
case e: ShuffleExchangeExec =>
|
||||
assert(e.coordinator.isDefined)
|
||||
assert(e.outputPartitioning.numPartitions === 3)
|
||||
case o =>
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === 3)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -355,25 +361,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
|
||||
// Then, let's look at the number of post-shuffle partitions estimated
|
||||
// by the ExchangeCoordinator.
|
||||
val exchanges = join.queryExecution.executedPlan.collect {
|
||||
case e: ShuffleExchangeExec => e
|
||||
val finalPlan = join.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
val shuffleReaders = finalPlan.collect {
|
||||
case reader: CoalescedShuffleReaderExec => reader
|
||||
}
|
||||
assert(exchanges.length === 2)
|
||||
assert(shuffleReaders.length === 2)
|
||||
minNumPostShufflePartitions match {
|
||||
case Some(numPartitions) =>
|
||||
exchanges.foreach {
|
||||
case e: ShuffleExchangeExec =>
|
||||
assert(e.coordinator.isDefined)
|
||||
assert(e.outputPartitioning.numPartitions === 5)
|
||||
case o =>
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === numPartitions)
|
||||
}
|
||||
|
||||
case None =>
|
||||
exchanges.foreach {
|
||||
case e: ShuffleExchangeExec =>
|
||||
assert(e.coordinator.isDefined)
|
||||
assert(e.outputPartitioning.numPartitions === 2)
|
||||
case o =>
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === 2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -411,26 +413,26 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
|
||||
// Then, let's look at the number of post-shuffle partitions estimated
|
||||
// by the ExchangeCoordinator.
|
||||
val exchanges = join.queryExecution.executedPlan.collect {
|
||||
case e: ShuffleExchangeExec => e
|
||||
val finalPlan = join.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
val shuffleReaders = finalPlan.collect {
|
||||
case reader: CoalescedShuffleReaderExec => reader
|
||||
}
|
||||
assert(exchanges.length === 4)
|
||||
assert(shuffleReaders.length === 2)
|
||||
minNumPostShufflePartitions match {
|
||||
case Some(numPartitions) =>
|
||||
exchanges.foreach {
|
||||
case e: ShuffleExchangeExec =>
|
||||
assert(e.coordinator.isDefined)
|
||||
assert(e.outputPartitioning.numPartitions === 5)
|
||||
case o =>
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === numPartitions)
|
||||
}
|
||||
|
||||
case None =>
|
||||
assert(exchanges.forall(_.coordinator.isDefined))
|
||||
assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(2, 3))
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === 2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
withSparkSession(test, 6644, minNumPostShufflePartitions)
|
||||
withSparkSession(test, 16384, minNumPostShufflePartitions)
|
||||
}
|
||||
|
||||
test(s"determining the number of reducers: complex query 2$testNameNote") {
|
||||
|
@ -463,39 +465,131 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
|
||||
// Then, let's look at the number of post-shuffle partitions estimated
|
||||
// by the ExchangeCoordinator.
|
||||
val exchanges = join.queryExecution.executedPlan.collect {
|
||||
case e: ShuffleExchangeExec => e
|
||||
val finalPlan = join.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
val shuffleReaders = finalPlan.collect {
|
||||
case reader: CoalescedShuffleReaderExec => reader
|
||||
}
|
||||
assert(exchanges.length === 3)
|
||||
assert(shuffleReaders.length === 2)
|
||||
minNumPostShufflePartitions match {
|
||||
case Some(numPartitions) =>
|
||||
exchanges.foreach {
|
||||
case e: ShuffleExchangeExec =>
|
||||
assert(e.coordinator.isDefined)
|
||||
assert(e.outputPartitioning.numPartitions === 5)
|
||||
case o =>
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === numPartitions)
|
||||
}
|
||||
|
||||
case None =>
|
||||
assert(exchanges.forall(_.coordinator.isDefined))
|
||||
assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(5, 3))
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === 3)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
withSparkSession(test, 6144, minNumPostShufflePartitions)
|
||||
withSparkSession(test, 12000, minNumPostShufflePartitions)
|
||||
}
|
||||
|
||||
test(s"determining the number of reducers: plan already partitioned$testNameNote") {
|
||||
val test: SparkSession => Unit = { spark: SparkSession =>
|
||||
try {
|
||||
spark.range(1000).write.bucketBy(30, "id").saveAsTable("t")
|
||||
// `df1` is hash partitioned by `id`.
|
||||
val df1 = spark.read.table("t")
|
||||
val df2 =
|
||||
spark
|
||||
.range(0, 1000, 1, numInputPartitions)
|
||||
.selectExpr("id % 500 as key2", "id as value2")
|
||||
|
||||
val join = df1.join(df2, col("id") === col("key2")).select(col("id"), col("value2"))
|
||||
|
||||
// Check the answer first.
|
||||
val expectedAnswer = spark.range(0, 500).selectExpr("id % 500", "id as value")
|
||||
.union(spark.range(500, 1000).selectExpr("id % 500", "id as value"))
|
||||
checkAnswer(
|
||||
join,
|
||||
expectedAnswer.collect())
|
||||
|
||||
// Then, let's make sure we do not reduce number of ppst shuffle partitions.
|
||||
val finalPlan = join.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
val shuffleReaders = finalPlan.collect {
|
||||
case reader: CoalescedShuffleReaderExec => reader
|
||||
}
|
||||
assert(shuffleReaders.length === 0)
|
||||
} finally {
|
||||
spark.sql("drop table t")
|
||||
}
|
||||
}
|
||||
withSparkSession(test, 12000, minNumPostShufflePartitions)
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") {
|
||||
val test = { spark: SparkSession =>
|
||||
val test: SparkSession => Unit = { spark: SparkSession =>
|
||||
spark.sql("SET spark.sql.exchange.reuse=true")
|
||||
val df = spark.range(1).selectExpr("id AS key", "id AS value")
|
||||
|
||||
// test case 1: a query stage has 3 child stages but they are the same stage.
|
||||
// Final Stage 1
|
||||
// ShuffleQueryStage 0
|
||||
// ReusedQueryStage 0
|
||||
// ReusedQueryStage 0
|
||||
val resultDf = df.join(df, "key").join(df, "key")
|
||||
val sparkPlan = resultDf.queryExecution.executedPlan
|
||||
assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1)
|
||||
assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) => p }.length == 3)
|
||||
checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil)
|
||||
val finalPlan = resultDf.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
assert(finalPlan.collect { case p: ReusedQueryStageExec => p }.length == 2)
|
||||
assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 3)
|
||||
|
||||
|
||||
// test case 2: a query stage has 2 parent stages.
|
||||
// Final Stage 3
|
||||
// ShuffleQueryStage 1
|
||||
// ShuffleQueryStage 0
|
||||
// ShuffleQueryStage 2
|
||||
// ReusedQueryStage 0
|
||||
val grouped = df.groupBy("key").agg(max("value").as("value"))
|
||||
val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
|
||||
.union(grouped.groupBy(col("key") + 2).max("value"))
|
||||
checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil)
|
||||
|
||||
val finalPlan2 = resultDf2.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
|
||||
// The result stage has 2 children
|
||||
val level1Stages = finalPlan2.collect { case q: QueryStageExec => q }
|
||||
assert(level1Stages.length == 2)
|
||||
|
||||
val leafStages = level1Stages.flatMap { stage =>
|
||||
// All of the child stages of result stage have only one child stage.
|
||||
val children = stage.plan.collect { case q: QueryStageExec => q }
|
||||
assert(children.length == 1)
|
||||
children
|
||||
}
|
||||
assert(leafStages.length == 2)
|
||||
|
||||
val reusedStages = level1Stages.flatMap { stage =>
|
||||
stage.plan.collect { case r: ReusedQueryStageExec => r }
|
||||
}
|
||||
assert(reusedStages.length == 1)
|
||||
}
|
||||
withSparkSession(test, 4, None)
|
||||
}
|
||||
|
||||
test("Union two datasets with different pre-shuffle partition number") {
|
||||
val test: SparkSession => Unit = { spark: SparkSession =>
|
||||
val dataset1 = spark.range(3)
|
||||
val dataset2 = spark.range(3)
|
||||
|
||||
val resultDf = dataset1.repartition(2, dataset1.col("id"))
|
||||
.union(dataset2.repartition(3, dataset2.col("id"))).toDF()
|
||||
|
||||
checkAnswer(resultDf,
|
||||
Seq((0), (0), (1), (1), (2), (2)).map(i => Row(i)))
|
||||
val finalPlan = resultDf.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
// As the pre-shuffle partition number are different, we will skip reducing
|
||||
// the shuffle partition numbers.
|
||||
assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 0)
|
||||
}
|
||||
withSparkSession(test, 100, None)
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive
|
|||
|
||||
import org.apache.spark.sql.QueryTest
|
||||
import org.apache.spark.sql.execution.{ReusedSubqueryExec, SparkPlan}
|
||||
import org.apache.spark.sql.execution.adaptive.rule.CoalescedShuffleReaderExec
|
||||
import org.apache.spark.sql.execution.exchange.Exchange
|
||||
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
@ -92,6 +93,30 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("Change merge join to broadcast join and reduce number of shuffle partitions") {
|
||||
withSQLConf(
|
||||
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
|
||||
SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "true",
|
||||
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
|
||||
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "150") {
|
||||
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
|
||||
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
|
||||
val smj = findTopLevelSortMergeJoin(plan)
|
||||
assert(smj.size == 1)
|
||||
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
|
||||
assert(bhj.size == 1)
|
||||
|
||||
val shuffleReaders = adaptivePlan.collect {
|
||||
case reader: CoalescedShuffleReaderExec => reader
|
||||
}
|
||||
assert(shuffleReaders.length === 1)
|
||||
// The pre-shuffle partition size is [0, 72, 0, 72, 126]
|
||||
shuffleReaders.foreach { reader =>
|
||||
assert(reader.outputPartitioning.numPartitions === 2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("Scalar subquery") {
|
||||
withSQLConf(
|
||||
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
|
||||
|
|
Loading…
Reference in a new issue