diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 53ab0493f4..5936492dd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -31,17 +31,23 @@ sealed trait ShufflePartitionSpec // A partition that reads data of one or more reducers, from `startReducerIndex` (inclusive) to // `endReducerIndex` (exclusive). case class CoalescedPartitionSpec( - startReducerIndex: Int, endReducerIndex: Int) extends ShufflePartitionSpec + startReducerIndex: Int, + endReducerIndex: Int) extends ShufflePartitionSpec // A partition that reads partial data of one reducer, from `startMapIndex` (inclusive) to // `endMapIndex` (exclusive). case class PartialReducerPartitionSpec( - reducerIndex: Int, startMapIndex: Int, endMapIndex: Int) extends ShufflePartitionSpec + reducerIndex: Int, + startMapIndex: Int, + endMapIndex: Int, + @transient dataSize: Long) extends ShufflePartitionSpec // A partition that reads partial data of one mapper, from `startReducerIndex` (inclusive) to // `endReducerIndex` (exclusive). case class PartialMapperPartitionSpec( - mapIndex: Int, startReducerIndex: Int, endReducerIndex: Int) extends ShufflePartitionSpec + mapIndex: Int, + startReducerIndex: Int, + endReducerIndex: Int) extends ShufflePartitionSpec /** * The [[Partition]] used by [[ShuffledRowRDD]]. @@ -162,7 +168,7 @@ class ShuffledRowRDD( tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) } - case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex) => + case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) => tracker.getMapLocation(dependency, startMapIndex, endMapIndex) case PartialMapperPartitionSpec(mapIndex, _, _) => @@ -184,7 +190,7 @@ class ShuffledRowRDD( context, sqlMetricsReporter) - case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) => + case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) => SparkEnv.get.shuffleManager.getReaderForRange( dependency.shuffleHandle, startMapIndex, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 226d692f7b..53ef454b96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -83,7 +83,7 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same // number of output partitions. case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) => - CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION) + CustomShuffleReaderExec(stage, partitionSpecs) } } else { plan @@ -91,7 +91,3 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl } } } - -object CoalesceShufflePartitions { - val COALESCED_SHUFFLE_READER_DESCRIPTION = "coalesced" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index ba3f725929..180eab4582 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} /** @@ -31,12 +32,10 @@ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExcha * @param child It is usually `ShuffleQueryStageExec`, but can be the shuffle exchange * node during canonicalization. * @param partitionSpecs The partition specs that defines the arrangement. - * @param description The string description of this shuffle reader. */ case class CustomShuffleReaderExec private( child: SparkPlan, - partitionSpecs: Seq[ShufflePartitionSpec], - description: String) extends UnaryExecNode { + partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode { override def output: Seq[Attribute] = child.output override lazy val outputPartitioning: Partitioning = { @@ -62,20 +61,109 @@ case class CustomShuffleReaderExec private( } } - override def stringArgs: Iterator[Any] = Iterator(description) + override def stringArgs: Iterator[Any] = { + val desc = if (isLocalReader) { + "local" + } else if (hasCoalescedPartition && hasSkewedPartition) { + "coalesced and skewed" + } else if (hasCoalescedPartition) { + "coalesced" + } else if (hasSkewedPartition) { + "skewed" + } else { + "" + } + Iterator(desc) + } - private var cachedShuffleRDD: RDD[InternalRow] = null + def hasCoalescedPartition: Boolean = { + partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec]) + } + + def hasSkewedPartition: Boolean = { + partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) + } + + def isLocalReader: Boolean = { + if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) { + assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec])) + true + } else { + false + } + } + + private def shuffleStage = child match { + case stage: ShuffleQueryStageExec => Some(stage) + case _ => None + } + + private lazy val partitionDataSizeMetrics = { + val maxSize = SQLMetrics.createSizeMetric(sparkContext, "maximum partition data size") + val minSize = SQLMetrics.createSizeMetric(sparkContext, "minimum partition data size") + val avgSize = SQLMetrics.createSizeMetric(sparkContext, "average partition data size") + val mapStats = shuffleStage.get.mapStats.bytesByPartitionId + val sizes = partitionSpecs.map { + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => + startReducerIndex.until(endReducerIndex).map(mapStats(_)).sum + case p: PartialReducerPartitionSpec => p.dataSize + case p => throw new IllegalStateException("unexpected " + p) + } + maxSize.set(sizes.max) + minSize.set(sizes.min) + avgSize.set(sizes.sum / sizes.length) + Map( + "maxPartitionDataSize" -> maxSize, + "minPartitionDataSize" -> minSize, + "avgPartitionDataSize" -> avgSize) + } + + private lazy val skewedPartitionMetrics = { + val metrics = SQLMetrics.createMetric(sparkContext, "number of skewed partitions") + val numSkewedPartitions = partitionSpecs.collect { + case p: PartialReducerPartitionSpec => p.reducerIndex + }.distinct.length + metrics.set(numSkewedPartitions) + Map("numSkewedPartitions" -> metrics) + } + + override lazy val metrics: Map[String, SQLMetric] = { + if (shuffleStage.isDefined) { + val numPartitions = SQLMetrics.createMetric(sparkContext, "number of partitions") + numPartitions.set(partitionSpecs.length) + Map("numPartitions" -> numPartitions) ++ { + if (isLocalReader) { + // We split the mapper partition evenly when creating local shuffle reader, so no + // data size info is available. + Map.empty + } else { + partitionDataSizeMetrics + } + } ++ { + if (hasSkewedPartition) { + skewedPartitionMetrics + } else { + Map.empty + } + } + } else { + // It's a canonicalized plan, no need to report metrics. + Map.empty + } + } + + private lazy val cachedShuffleRDD: RDD[InternalRow] = { + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) + shuffleStage.map { stage => + new ShuffledRowRDD( + stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray) + }.getOrElse { + throw new IllegalStateException("operating on canonicalized plan") + } + } override protected def doExecute(): RDD[InternalRow] = { - if (cachedShuffleRDD == null) { - cachedShuffleRDD = child match { - case stage: ShuffleQueryStageExec => - new ShuffledRowRDD( - stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray) - case _ => - throw new IllegalStateException("operating on canonicalization plan") - } - } cachedShuffleRDD } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index fb6b40c710..a5b3cac4df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -65,11 +65,10 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { private def createLocalReader(plan: SparkPlan): CustomShuffleReaderExec = { plan match { - case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) => - CustomShuffleReaderExec( - s, getPartitionSpecs(s, Some(c.partitionSpecs.length)), LOCAL_SHUFFLE_READER_DESCRIPTION) + case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) => + CustomShuffleReaderExec(s, getPartitionSpecs(s, Some(c.partitionSpecs.length))) case s: ShuffleQueryStageExec => - CustomShuffleReaderExec(s, getPartitionSpecs(s, None), LOCAL_SHUFFLE_READER_DESCRIPTION) + CustomShuffleReaderExec(s, getPartitionSpecs(s, None)) } } @@ -123,8 +122,6 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { object OptimizeLocalShuffleReader { - val LOCAL_SHUFFLE_READER_DESCRIPTION: String = "local" - object BroadcastJoinWithShuffleLeft { def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match { case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.left) => @@ -142,8 +139,10 @@ object OptimizeLocalShuffleReader { } def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match { - case s: ShuffleQueryStageExec => s.shuffle.canChangeNumPartitions - case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) => s.shuffle.canChangeNumPartitions + case s: ShuffleQueryStageExec => + s.shuffle.canChangeNumPartitions + case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) => + s.shuffle.canChangeNumPartitions case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index b09e5630f8..a66129900b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -120,7 +120,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } else { mapStartIndices(i + 1) } - PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex) + val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum + PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) }) } else { None @@ -182,8 +183,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] - val leftSkewDesc = new SkewDesc - val rightSkewDesc = new SkewDesc + var numSkewedLeft = 0 + var numSkewedRight = 0 for (partitionIndex <- 0 until numPartitions) { val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1 @@ -199,9 +200,10 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { val skewSpecs = createSkewPartitionSpecs( left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize) if (skewSpecs.isDefined) { - logDebug(s"Left side partition $partitionIndex is skewed, split it into " + - s"${skewSpecs.get.length} parts.") - leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex)) + val sizeStr = FileUtils.byteCountToDisplaySize(leftActualSizes(partitionIndex)) + logDebug(s"Left side partition $partitionIndex ($sizeStr) is skewed, " + + s"split it into ${skewSpecs.get.length} parts.") + numSkewedLeft += 1 } skewSpecs.getOrElse(Seq(leftPartSpec)) } else { @@ -214,9 +216,10 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { val skewSpecs = createSkewPartitionSpecs( right.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, rightTargetSize) if (skewSpecs.isDefined) { - logDebug(s"Right side partition $partitionIndex is skewed, split it into " + - s"${skewSpecs.get.length} parts.") - rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex)) + val sizeStr = FileUtils.byteCountToDisplaySize(rightActualSizes(partitionIndex)) + logDebug(s"Right side partition $partitionIndex ($sizeStr) is skewed, " + + s"split it into ${skewSpecs.get.length} parts.") + numSkewedRight += 1 } skewSpecs.getOrElse(Seq(rightPartSpec)) } else { @@ -232,13 +235,10 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } } - logDebug("number of skewed partitions: " + - s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}") - if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) { - val newLeft = CustomShuffleReaderExec( - left.shuffleStage, leftSidePartitions, leftSkewDesc.toString) - val newRight = CustomShuffleReaderExec( - right.shuffleStage, rightSidePartitions, rightSkewDesc.toString) + logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight") + if (numSkewedLeft > 0 || numSkewedRight > 0) { + val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions) + val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions) smj.copy( left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true) } else { @@ -287,16 +287,14 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { private object ShuffleStage { def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match { case s: ShuffleQueryStageExec => - val mapStats = getMapStats(s) - val sizes = mapStats.bytesByPartitionId + val sizes = s.mapStats.bytesByPartitionId val partitions = sizes.zipWithIndex.map { case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size } - Some(ShuffleStageInfo(s, mapStats, partitions)) + Some(ShuffleStageInfo(s, s.mapStats, partitions)) - case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _) => - val mapStats = getMapStats(s) - val sizes = mapStats.bytesByPartitionId + case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) => + val sizes = s.mapStats.bytesByPartitionId val partitions = partitionSpecs.map { case spec @ CoalescedPartitionSpec(start, end) => var sum = 0L @@ -309,51 +307,13 @@ private object ShuffleStage { case other => throw new IllegalArgumentException( s"Expect CoalescedPartitionSpec but got $other") } - Some(ShuffleStageInfo(s, mapStats, partitions)) + Some(ShuffleStageInfo(s, s.mapStats, partitions)) case _ => None } - - private def getMapStats(stage: ShuffleQueryStageExec): MapOutputStatistics = { - assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" + - " already be ready when executing OptimizeSkewedPartitions rule") - stage.resultOption.get.asInstanceOf[MapOutputStatistics] - } } private case class ShuffleStageInfo( shuffleStage: ShuffleQueryStageExec, mapStats: MapOutputStatistics, partitionsWithSizes: Seq[(CoalescedPartitionSpec, Long)]) - -private class SkewDesc { - private[this] var numSkewedPartitions: Int = 0 - private[this] var totalSize: Long = 0 - private[this] var maxSize: Long = 0 - private[this] var minSize: Long = 0 - - def numPartitions: Int = numSkewedPartitions - - def addPartitionSize(size: Long): Unit = { - if (numSkewedPartitions == 0) { - maxSize = size - minSize = size - } - numSkewedPartitions += 1 - totalSize += size - if (size > maxSize) maxSize = size - if (size < minSize) minSize = size - } - - override def toString: String = { - if (numSkewedPartitions == 0) { - "no skewed partition" - } else { - val maxSizeStr = FileUtils.byteCountToDisplaySize(maxSize) - val minSizeStr = FileUtils.byteCountToDisplaySize(minSize) - val avgSizeStr = FileUtils.byteCountToDisplaySize(totalSize / numSkewedPartitions) - s"$numSkewedPartitions skewed partitions with " + - s"size(max=$maxSizeStr, min=$minSizeStr, avg=$avgSizeStr)" - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index d5dc1be63f..e4e8d21764 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -161,6 +161,11 @@ case class ShuffleQueryStageExec( case _ => } } + + def mapStats: MapOutputStatistics = { + assert(resultOption.isDefined) + resultOption.get.asInstanceOf[MapOutputStatistics] + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index 9e77f618ed..22c5b651f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -23,7 +23,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql._ import org.apache.spark.sql.execution.adaptive._ -import org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.functions._ @@ -108,7 +107,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl val finalPlan = agg.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan val shuffleReaders = finalPlan.collect { - case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r + case r @ CoalescedShuffleReader() => r } assert(shuffleReaders.length === 1) minNumPostShufflePartitions match { @@ -155,7 +154,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan val shuffleReaders = finalPlan.collect { - case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r + case r @ CoalescedShuffleReader() => r } assert(shuffleReaders.length === 2) minNumPostShufflePartitions match { @@ -207,7 +206,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan val shuffleReaders = finalPlan.collect { - case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r + case r @ CoalescedShuffleReader() => r } assert(shuffleReaders.length === 2) minNumPostShufflePartitions match { @@ -259,7 +258,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan val shuffleReaders = finalPlan.collect { - case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r + case r @ CoalescedShuffleReader() => r } assert(shuffleReaders.length === 2) minNumPostShufflePartitions match { @@ -302,7 +301,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan val shuffleReaders = finalPlan.collect { - case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r + case r @ CoalescedShuffleReader() => r } assert(shuffleReaders.length === 0) } finally { @@ -332,7 +331,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl }.length == 2) assert( finalPlan.collect { - case p @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => p + case r @ CoalescedShuffleReader() => r }.length == 3) @@ -383,7 +382,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl .asInstanceOf[AdaptiveSparkPlanExec].executedPlan assert( finalPlan.collect { - case p @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => p + case r @ CoalescedShuffleReader() => r }.isEmpty) } withSparkSession(test, 200, None) @@ -404,9 +403,15 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl // the shuffle partition numbers. assert( finalPlan.collect { - case p @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => p + case r @ CoalescedShuffleReader() => r }.isEmpty) } withSparkSession(test, 100, None) } } + +object CoalescedShuffleReader { + def unapply(reader: CustomShuffleReaderExec): Boolean = { + !reader.isLocalReader && !reader.hasSkewedPartition && reader.hasCoalescedPartition + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index d4c5b0d42f..4ae18b0562 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -25,9 +25,8 @@ import org.apache.log4j.Level import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.QueryTest import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} -import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -113,7 +112,7 @@ class AdaptiveQueryExecSuite }.length val numLocalReaders = collect(plan) { - case reader @ CustomShuffleReaderExec(_, _, LOCAL_SHUFFLE_READER_DESCRIPTION) => reader + case reader: CustomShuffleReaderExec if reader.isLocalReader => reader } numLocalReaders.foreach { r => val rdd = r.execute() @@ -149,7 +148,7 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) val localReaders = collect(adaptivePlan) { - case reader @ CustomShuffleReaderExec(_, _, LOCAL_SHUFFLE_READER_DESCRIPTION) => reader + case reader: CustomShuffleReaderExec if reader.isLocalReader => reader } assert(localReaders.length == 2) val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD] @@ -181,7 +180,7 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) val localReaders = collect(adaptivePlan) { - case reader @ CustomShuffleReaderExec(_, _, LOCAL_SHUFFLE_READER_DESCRIPTION) => reader + case reader: CustomShuffleReaderExec if reader.isLocalReader => reader } assert(localReaders.length == 2) val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD] @@ -781,4 +780,69 @@ class AdaptiveQueryExecSuite } } } + + test("metrics of the shuffle reader") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val (_, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT key FROM testData GROUP BY key") + val readers = collect(adaptivePlan) { + case r: CustomShuffleReaderExec => r + } + assert(readers.length == 1) + val reader = readers.head + assert(!reader.isLocalReader) + assert(!reader.hasSkewedPartition) + assert(reader.hasCoalescedPartition) + assert(reader.metrics.keys.toSeq.sorted == Seq( + "avgPartitionDataSize", "maxPartitionDataSize", "minPartitionDataSize", "numPartitions")) + assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length) + assert(reader.metrics("avgPartitionDataSize").value > 0) + assert(reader.metrics("maxPartitionDataSize").value > 0) + assert(reader.metrics("minPartitionDataSize").value > 0) + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (_, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData join testData2 ON key = a where value = '1'") + val join = collect(adaptivePlan) { + case j: BroadcastHashJoinExec => j + }.head + assert(join.buildSide == BuildLeft) + + val readers = collect(join.right) { + case r: CustomShuffleReaderExec => r + } + assert(readers.length == 1) + val reader = readers.head + assert(reader.isLocalReader) + assert(reader.metrics.keys.toSeq == Seq("numPartitions")) + assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length) + } + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000") { + withTempView("skewData1", "skewData2") { + spark + .range(0, 1000, 1, 10) + .selectExpr("id % 2 as key1", "id as value1") + .createOrReplaceTempView("skewData1") + spark + .range(0, 1000, 1, 10) + .selectExpr("id % 1 as key2", "id as value2") + .createOrReplaceTempView("skewData2") + val (_, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM skewData1 join skewData2 ON key1 = key2") + val reader = collect(adaptivePlan) { + case r: CustomShuffleReaderExec => r + }.head + assert(!reader.isLocalReader) + assert(reader.hasSkewedPartition) + assert(reader.hasCoalescedPartition) + assert(reader.metrics.contains("numSkewedPartitions")) + assert(reader.metrics("numSkewedPartitions").value > 0) + } + } + } + } }