[SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused

## What changes were proposed in this pull request?
In the current master, `EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` before `ReuseExchange`. Then, `ReuseExchange` removes some duplicate exchange and the actual number of registered exchanges changes. Finally, the assertion in `ExchangeCoordinator` fails because the logical number of exchanges and the actual number of registered exchanges become different;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201

This pr fixed the issue and the code to reproduce this is as follows;
```
scala> sql("SET spark.sql.adaptive.enabled=true")
scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
scala> val df = spark.range(1).selectExpr("id AS key", "id AS value")
scala> val resultDf = df.join(df, "key").join(df, "key")
scala> resultDf.show
...
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 101 more
Caused by: java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)
  at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
...
```

## How was this patch tested?
Added tests in `ExchangeCoordinatorSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21754 from maropu/SPARK-24705-2.
This commit is contained in:
Takeshi Yamamuro 2018-08-02 13:05:36 -07:00 committed by Xiao Li
parent 02f967795b
commit efef55388f
3 changed files with 28 additions and 11 deletions

View file

@ -82,7 +82,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
if (adaptiveExecutionEnabled && supportsCoordinator) {
val coordinator =
new ExchangeCoordinator(
children.length,
targetPostShuffleInputSize,
minNumPostShufflePartitions)
children.zip(requiredChildDistributions).map {

View file

@ -83,7 +83,6 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
* - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB)
*/
class ExchangeCoordinator(
numExchanges: Int,
advisoryTargetPostShuffleInputSize: Long,
minNumPostShufflePartitions: Option[Int] = None)
extends Logging {
@ -91,8 +90,14 @@ class ExchangeCoordinator(
// The registered Exchange operators.
private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()
// `lazy val` is used here so that we could notice the wrong use of this class, e.g., all the
// exchanges should be registered before `postShuffleRDD` called first time. If a new exchange is
// registered after the `postShuffleRDD` call, `assert(exchanges.length == numExchanges)` fails
// in `doEstimationIfNecessary`.
private[this] lazy val numExchanges = exchanges.size
// This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
private[this] val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] =
private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] =
new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
// A boolean that indicates if this coordinator has made decision on how to shuffle data.
@ -117,10 +122,6 @@ class ExchangeCoordinator(
*/
def estimatePartitionStartIndices(
mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
// If we have mapOutputStatistics.length < numExchange, it is because we do not submit
// a stage when the number of partitions of this dependency is 0.
assert(mapOutputStatistics.length <= numExchanges)
// If minNumPostShufflePartitions is defined, it is possible that we need to use a
// value less than advisoryTargetPostShuffleInputSize as the target input size of
// a post shuffle task.
@ -228,6 +229,10 @@ class ExchangeCoordinator(
j += 1
}
// If we have mapOutputStatistics.length < numExchange, it is because we do not submit
// a stage when the number of partitions of this dependency is 0.
assert(mapOutputStatistics.length <= numExchanges)
// Now, we estimate partitionStartIndices. partitionStartIndices.length will be the
// number of post-shuffle partitions.
val partitionStartIndices =

View file

@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@ -58,7 +58,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
}
test("test estimatePartitionStartIndices - 1 Exchange") {
val coordinator = new ExchangeCoordinator(1, 100L)
val coordinator = new ExchangeCoordinator(100L)
{
// All bytes per partition are 0.
@ -105,7 +105,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
}
test("test estimatePartitionStartIndices - 2 Exchanges") {
val coordinator = new ExchangeCoordinator(2, 100L)
val coordinator = new ExchangeCoordinator(100L)
{
// If there are multiple values of the number of pre-shuffle partitions,
@ -199,7 +199,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
}
test("test estimatePartitionStartIndices and enforce minimal number of reducers") {
val coordinator = new ExchangeCoordinator(2, 100L, Some(2))
val coordinator = new ExchangeCoordinator(100L, Some(2))
{
// The minimal number of post-shuffle partitions is not enforced because
@ -480,4 +480,17 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
withSparkSession(test, 6144, minNumPostShufflePartitions)
}
}
test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") {
val test = { spark: SparkSession =>
spark.sql("SET spark.sql.exchange.reuse=true")
val df = spark.range(1).selectExpr("id AS key", "id AS value")
val resultDf = df.join(df, "key").join(df, "key")
val sparkPlan = resultDf.queryExecution.executedPlan
assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1)
assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) => p }.length == 3)
checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil)
}
withSparkSession(test, 4, None)
}
}