[SPARK-31253][SQL] Add metrics to AQE shuffle reader

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
  7. If you want to add a new configuration, please read the guideline first for naming configurations in
     'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
Add SQL metrics to the AQE shuffle reader (`CustomShuffleReaderExec`)

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
to be more UI friendly

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
new test

Closes #28022 from cloud-fan/metrics.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
Wenchen Fan 2020-03-31 13:03:52 -07:00 committed by gatorsmile
parent 590b9a0132
commit 34c7ec8e0c
8 changed files with 230 additions and 107 deletions

View file

@ -31,17 +31,23 @@ sealed trait ShufflePartitionSpec
// A partition that reads data of one or more reducers, from `startReducerIndex` (inclusive) to // A partition that reads data of one or more reducers, from `startReducerIndex` (inclusive) to
// `endReducerIndex` (exclusive). // `endReducerIndex` (exclusive).
case class CoalescedPartitionSpec( case class CoalescedPartitionSpec(
startReducerIndex: Int, endReducerIndex: Int) extends ShufflePartitionSpec startReducerIndex: Int,
endReducerIndex: Int) extends ShufflePartitionSpec
// A partition that reads partial data of one reducer, from `startMapIndex` (inclusive) to // A partition that reads partial data of one reducer, from `startMapIndex` (inclusive) to
// `endMapIndex` (exclusive). // `endMapIndex` (exclusive).
case class PartialReducerPartitionSpec( case class PartialReducerPartitionSpec(
reducerIndex: Int, startMapIndex: Int, endMapIndex: Int) extends ShufflePartitionSpec reducerIndex: Int,
startMapIndex: Int,
endMapIndex: Int,
@transient dataSize: Long) extends ShufflePartitionSpec
// A partition that reads partial data of one mapper, from `startReducerIndex` (inclusive) to // A partition that reads partial data of one mapper, from `startReducerIndex` (inclusive) to
// `endReducerIndex` (exclusive). // `endReducerIndex` (exclusive).
case class PartialMapperPartitionSpec( case class PartialMapperPartitionSpec(
mapIndex: Int, startReducerIndex: Int, endReducerIndex: Int) extends ShufflePartitionSpec mapIndex: Int,
startReducerIndex: Int,
endReducerIndex: Int) extends ShufflePartitionSpec
/** /**
* The [[Partition]] used by [[ShuffledRowRDD]]. * The [[Partition]] used by [[ShuffledRowRDD]].
@ -162,7 +168,7 @@ class ShuffledRowRDD(
tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
} }
case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex) => case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) =>
tracker.getMapLocation(dependency, startMapIndex, endMapIndex) tracker.getMapLocation(dependency, startMapIndex, endMapIndex)
case PartialMapperPartitionSpec(mapIndex, _, _) => case PartialMapperPartitionSpec(mapIndex, _, _) =>
@ -184,7 +190,7 @@ class ShuffledRowRDD(
context, context,
sqlMetricsReporter) sqlMetricsReporter)
case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) => case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) =>
SparkEnv.get.shuffleManager.getReaderForRange( SparkEnv.get.shuffleManager.getReaderForRange(
dependency.shuffleHandle, dependency.shuffleHandle,
startMapIndex, startMapIndex,

View file

@ -83,7 +83,7 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl
// `partitionStartIndices`, so that all the leaf shuffles in a stage have the same // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
// number of output partitions. // number of output partitions.
case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) => case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION) CustomShuffleReaderExec(stage, partitionSpecs)
} }
} else { } else {
plan plan
@ -91,7 +91,3 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl
} }
} }
} }
object CoalesceShufflePartitions {
val COALESCED_SHUFFLE_READER_DESCRIPTION = "coalesced"
}

View file

@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
/** /**
@ -31,12 +32,10 @@ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExcha
* @param child It is usually `ShuffleQueryStageExec`, but can be the shuffle exchange * @param child It is usually `ShuffleQueryStageExec`, but can be the shuffle exchange
* node during canonicalization. * node during canonicalization.
* @param partitionSpecs The partition specs that defines the arrangement. * @param partitionSpecs The partition specs that defines the arrangement.
* @param description The string description of this shuffle reader.
*/ */
case class CustomShuffleReaderExec private( case class CustomShuffleReaderExec private(
child: SparkPlan, child: SparkPlan,
partitionSpecs: Seq[ShufflePartitionSpec], partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode {
description: String) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override lazy val outputPartitioning: Partitioning = { override lazy val outputPartitioning: Partitioning = {
@ -62,20 +61,109 @@ case class CustomShuffleReaderExec private(
} }
} }
override def stringArgs: Iterator[Any] = Iterator(description) override def stringArgs: Iterator[Any] = {
val desc = if (isLocalReader) {
"local"
} else if (hasCoalescedPartition && hasSkewedPartition) {
"coalesced and skewed"
} else if (hasCoalescedPartition) {
"coalesced"
} else if (hasSkewedPartition) {
"skewed"
} else {
""
}
Iterator(desc)
}
private var cachedShuffleRDD: RDD[InternalRow] = null def hasCoalescedPartition: Boolean = {
partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
}
def hasSkewedPartition: Boolean = {
partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
}
def isLocalReader: Boolean = {
if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
true
} else {
false
}
}
private def shuffleStage = child match {
case stage: ShuffleQueryStageExec => Some(stage)
case _ => None
}
private lazy val partitionDataSizeMetrics = {
val maxSize = SQLMetrics.createSizeMetric(sparkContext, "maximum partition data size")
val minSize = SQLMetrics.createSizeMetric(sparkContext, "minimum partition data size")
val avgSize = SQLMetrics.createSizeMetric(sparkContext, "average partition data size")
val mapStats = shuffleStage.get.mapStats.bytesByPartitionId
val sizes = partitionSpecs.map {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
startReducerIndex.until(endReducerIndex).map(mapStats(_)).sum
case p: PartialReducerPartitionSpec => p.dataSize
case p => throw new IllegalStateException("unexpected " + p)
}
maxSize.set(sizes.max)
minSize.set(sizes.min)
avgSize.set(sizes.sum / sizes.length)
Map(
"maxPartitionDataSize" -> maxSize,
"minPartitionDataSize" -> minSize,
"avgPartitionDataSize" -> avgSize)
}
private lazy val skewedPartitionMetrics = {
val metrics = SQLMetrics.createMetric(sparkContext, "number of skewed partitions")
val numSkewedPartitions = partitionSpecs.collect {
case p: PartialReducerPartitionSpec => p.reducerIndex
}.distinct.length
metrics.set(numSkewedPartitions)
Map("numSkewedPartitions" -> metrics)
}
override lazy val metrics: Map[String, SQLMetric] = {
if (shuffleStage.isDefined) {
val numPartitions = SQLMetrics.createMetric(sparkContext, "number of partitions")
numPartitions.set(partitionSpecs.length)
Map("numPartitions" -> numPartitions) ++ {
if (isLocalReader) {
// We split the mapper partition evenly when creating local shuffle reader, so no
// data size info is available.
Map.empty
} else {
partitionDataSizeMetrics
}
} ++ {
if (hasSkewedPartition) {
skewedPartitionMetrics
} else {
Map.empty
}
}
} else {
// It's a canonicalized plan, no need to report metrics.
Map.empty
}
}
private lazy val cachedShuffleRDD: RDD[InternalRow] = {
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
shuffleStage.map { stage =>
new ShuffledRowRDD(
stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
}.getOrElse {
throw new IllegalStateException("operating on canonicalized plan")
}
}
override protected def doExecute(): RDD[InternalRow] = { override protected def doExecute(): RDD[InternalRow] = {
if (cachedShuffleRDD == null) {
cachedShuffleRDD = child match {
case stage: ShuffleQueryStageExec =>
new ShuffledRowRDD(
stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
case _ =>
throw new IllegalStateException("operating on canonicalization plan")
}
}
cachedShuffleRDD cachedShuffleRDD
} }
} }

View file

@ -65,11 +65,10 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
private def createLocalReader(plan: SparkPlan): CustomShuffleReaderExec = { private def createLocalReader(plan: SparkPlan): CustomShuffleReaderExec = {
plan match { plan match {
case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) => case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
CustomShuffleReaderExec( CustomShuffleReaderExec(s, getPartitionSpecs(s, Some(c.partitionSpecs.length)))
s, getPartitionSpecs(s, Some(c.partitionSpecs.length)), LOCAL_SHUFFLE_READER_DESCRIPTION)
case s: ShuffleQueryStageExec => case s: ShuffleQueryStageExec =>
CustomShuffleReaderExec(s, getPartitionSpecs(s, None), LOCAL_SHUFFLE_READER_DESCRIPTION) CustomShuffleReaderExec(s, getPartitionSpecs(s, None))
} }
} }
@ -123,8 +122,6 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
object OptimizeLocalShuffleReader { object OptimizeLocalShuffleReader {
val LOCAL_SHUFFLE_READER_DESCRIPTION: String = "local"
object BroadcastJoinWithShuffleLeft { object BroadcastJoinWithShuffleLeft {
def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match { def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match {
case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.left) => case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.left) =>
@ -142,8 +139,10 @@ object OptimizeLocalShuffleReader {
} }
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match { def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec => s.shuffle.canChangeNumPartitions case s: ShuffleQueryStageExec =>
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) => s.shuffle.canChangeNumPartitions s.shuffle.canChangeNumPartitions
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
s.shuffle.canChangeNumPartitions
case _ => false case _ => false
} }
} }

View file

@ -120,7 +120,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
} else { } else {
mapStartIndices(i + 1) mapStartIndices(i + 1)
} }
PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex) val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
}) })
} else { } else {
None None
@ -182,8 +183,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
val leftSkewDesc = new SkewDesc var numSkewedLeft = 0
val rightSkewDesc = new SkewDesc var numSkewedRight = 0
for (partitionIndex <- 0 until numPartitions) { for (partitionIndex <- 0 until numPartitions) {
val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1 val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
@ -199,9 +200,10 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
val skewSpecs = createSkewPartitionSpecs( val skewSpecs = createSkewPartitionSpecs(
left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize) left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize)
if (skewSpecs.isDefined) { if (skewSpecs.isDefined) {
logDebug(s"Left side partition $partitionIndex is skewed, split it into " + val sizeStr = FileUtils.byteCountToDisplaySize(leftActualSizes(partitionIndex))
s"${skewSpecs.get.length} parts.") logDebug(s"Left side partition $partitionIndex ($sizeStr) is skewed, " +
leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex)) s"split it into ${skewSpecs.get.length} parts.")
numSkewedLeft += 1
} }
skewSpecs.getOrElse(Seq(leftPartSpec)) skewSpecs.getOrElse(Seq(leftPartSpec))
} else { } else {
@ -214,9 +216,10 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
val skewSpecs = createSkewPartitionSpecs( val skewSpecs = createSkewPartitionSpecs(
right.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, rightTargetSize) right.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, rightTargetSize)
if (skewSpecs.isDefined) { if (skewSpecs.isDefined) {
logDebug(s"Right side partition $partitionIndex is skewed, split it into " + val sizeStr = FileUtils.byteCountToDisplaySize(rightActualSizes(partitionIndex))
s"${skewSpecs.get.length} parts.") logDebug(s"Right side partition $partitionIndex ($sizeStr) is skewed, " +
rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex)) s"split it into ${skewSpecs.get.length} parts.")
numSkewedRight += 1
} }
skewSpecs.getOrElse(Seq(rightPartSpec)) skewSpecs.getOrElse(Seq(rightPartSpec))
} else { } else {
@ -232,13 +235,10 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
} }
} }
logDebug("number of skewed partitions: " + logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}") if (numSkewedLeft > 0 || numSkewedRight > 0) {
if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) { val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions)
val newLeft = CustomShuffleReaderExec( val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions)
left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
val newRight = CustomShuffleReaderExec(
right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
smj.copy( smj.copy(
left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true) left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
} else { } else {
@ -287,16 +287,14 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
private object ShuffleStage { private object ShuffleStage {
def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match { def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match {
case s: ShuffleQueryStageExec => case s: ShuffleQueryStageExec =>
val mapStats = getMapStats(s) val sizes = s.mapStats.bytesByPartitionId
val sizes = mapStats.bytesByPartitionId
val partitions = sizes.zipWithIndex.map { val partitions = sizes.zipWithIndex.map {
case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size
} }
Some(ShuffleStageInfo(s, mapStats, partitions)) Some(ShuffleStageInfo(s, s.mapStats, partitions))
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _) => case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
val mapStats = getMapStats(s) val sizes = s.mapStats.bytesByPartitionId
val sizes = mapStats.bytesByPartitionId
val partitions = partitionSpecs.map { val partitions = partitionSpecs.map {
case spec @ CoalescedPartitionSpec(start, end) => case spec @ CoalescedPartitionSpec(start, end) =>
var sum = 0L var sum = 0L
@ -309,51 +307,13 @@ private object ShuffleStage {
case other => throw new IllegalArgumentException( case other => throw new IllegalArgumentException(
s"Expect CoalescedPartitionSpec but got $other") s"Expect CoalescedPartitionSpec but got $other")
} }
Some(ShuffleStageInfo(s, mapStats, partitions)) Some(ShuffleStageInfo(s, s.mapStats, partitions))
case _ => None case _ => None
} }
private def getMapStats(stage: ShuffleQueryStageExec): MapOutputStatistics = {
assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
" already be ready when executing OptimizeSkewedPartitions rule")
stage.resultOption.get.asInstanceOf[MapOutputStatistics]
}
} }
private case class ShuffleStageInfo( private case class ShuffleStageInfo(
shuffleStage: ShuffleQueryStageExec, shuffleStage: ShuffleQueryStageExec,
mapStats: MapOutputStatistics, mapStats: MapOutputStatistics,
partitionsWithSizes: Seq[(CoalescedPartitionSpec, Long)]) partitionsWithSizes: Seq[(CoalescedPartitionSpec, Long)])
private class SkewDesc {
private[this] var numSkewedPartitions: Int = 0
private[this] var totalSize: Long = 0
private[this] var maxSize: Long = 0
private[this] var minSize: Long = 0
def numPartitions: Int = numSkewedPartitions
def addPartitionSize(size: Long): Unit = {
if (numSkewedPartitions == 0) {
maxSize = size
minSize = size
}
numSkewedPartitions += 1
totalSize += size
if (size > maxSize) maxSize = size
if (size < minSize) minSize = size
}
override def toString: String = {
if (numSkewedPartitions == 0) {
"no skewed partition"
} else {
val maxSizeStr = FileUtils.byteCountToDisplaySize(maxSize)
val minSizeStr = FileUtils.byteCountToDisplaySize(minSize)
val avgSizeStr = FileUtils.byteCountToDisplaySize(totalSize / numSkewedPartitions)
s"$numSkewedPartitions skewed partitions with " +
s"size(max=$maxSizeStr, min=$minSizeStr, avg=$avgSizeStr)"
}
}
}

View file

@ -161,6 +161,11 @@ case class ShuffleQueryStageExec(
case _ => case _ =>
} }
} }
def mapStats: MapOutputStatistics = {
assert(resultOption.isDefined)
resultOption.get.asInstanceOf[MapOutputStatistics]
}
} }
/** /**

View file

@ -23,7 +23,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
@ -108,7 +107,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
val finalPlan = agg.queryExecution.executedPlan val finalPlan = agg.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect { val shuffleReaders = finalPlan.collect {
case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r case r @ CoalescedShuffleReader() => r
} }
assert(shuffleReaders.length === 1) assert(shuffleReaders.length === 1)
minNumPostShufflePartitions match { minNumPostShufflePartitions match {
@ -155,7 +154,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
val finalPlan = join.queryExecution.executedPlan val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect { val shuffleReaders = finalPlan.collect {
case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r case r @ CoalescedShuffleReader() => r
} }
assert(shuffleReaders.length === 2) assert(shuffleReaders.length === 2)
minNumPostShufflePartitions match { minNumPostShufflePartitions match {
@ -207,7 +206,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
val finalPlan = join.queryExecution.executedPlan val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect { val shuffleReaders = finalPlan.collect {
case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r case r @ CoalescedShuffleReader() => r
} }
assert(shuffleReaders.length === 2) assert(shuffleReaders.length === 2)
minNumPostShufflePartitions match { minNumPostShufflePartitions match {
@ -259,7 +258,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
val finalPlan = join.queryExecution.executedPlan val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect { val shuffleReaders = finalPlan.collect {
case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r case r @ CoalescedShuffleReader() => r
} }
assert(shuffleReaders.length === 2) assert(shuffleReaders.length === 2)
minNumPostShufflePartitions match { minNumPostShufflePartitions match {
@ -302,7 +301,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
val finalPlan = join.queryExecution.executedPlan val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect { val shuffleReaders = finalPlan.collect {
case r @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => r case r @ CoalescedShuffleReader() => r
} }
assert(shuffleReaders.length === 0) assert(shuffleReaders.length === 0)
} finally { } finally {
@ -332,7 +331,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
}.length == 2) }.length == 2)
assert( assert(
finalPlan.collect { finalPlan.collect {
case p @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => p case r @ CoalescedShuffleReader() => r
}.length == 3) }.length == 3)
@ -383,7 +382,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
assert( assert(
finalPlan.collect { finalPlan.collect {
case p @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => p case r @ CoalescedShuffleReader() => r
}.isEmpty) }.isEmpty)
} }
withSparkSession(test, 200, None) withSparkSession(test, 200, None)
@ -404,9 +403,15 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
// the shuffle partition numbers. // the shuffle partition numbers.
assert( assert(
finalPlan.collect { finalPlan.collect {
case p @ CustomShuffleReaderExec(_, _, COALESCED_SHUFFLE_READER_DESCRIPTION) => p case r @ CoalescedShuffleReader() => r
}.isEmpty) }.isEmpty)
} }
withSparkSession(test, 100, None) withSparkSession(test, 100, None)
} }
} }
object CoalescedShuffleReader {
def unapply(reader: CustomShuffleReaderExec): Boolean = {
!reader.isLocalReader && !reader.hasSkewedPartition && reader.hasCoalescedPartition
}
}

View file

@ -25,9 +25,8 @@ import org.apache.log4j.Level
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
import org.apache.spark.sql.QueryTest import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, SortMergeJoinExec}
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SharedSparkSession
@ -113,7 +112,7 @@ class AdaptiveQueryExecSuite
}.length }.length
val numLocalReaders = collect(plan) { val numLocalReaders = collect(plan) {
case reader @ CustomShuffleReaderExec(_, _, LOCAL_SHUFFLE_READER_DESCRIPTION) => reader case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
} }
numLocalReaders.foreach { r => numLocalReaders.foreach { r =>
val rdd = r.execute() val rdd = r.execute()
@ -149,7 +148,7 @@ class AdaptiveQueryExecSuite
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1) assert(bhj.size == 1)
val localReaders = collect(adaptivePlan) { val localReaders = collect(adaptivePlan) {
case reader @ CustomShuffleReaderExec(_, _, LOCAL_SHUFFLE_READER_DESCRIPTION) => reader case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
} }
assert(localReaders.length == 2) assert(localReaders.length == 2)
val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD] val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
@ -181,7 +180,7 @@ class AdaptiveQueryExecSuite
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1) assert(bhj.size == 1)
val localReaders = collect(adaptivePlan) { val localReaders = collect(adaptivePlan) {
case reader @ CustomShuffleReaderExec(_, _, LOCAL_SHUFFLE_READER_DESCRIPTION) => reader case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
} }
assert(localReaders.length == 2) assert(localReaders.length == 2)
val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD] val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
@ -781,4 +780,69 @@ class AdaptiveQueryExecSuite
} }
} }
} }
test("metrics of the shuffle reader") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT key FROM testData GROUP BY key")
val readers = collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
}
assert(readers.length == 1)
val reader = readers.head
assert(!reader.isLocalReader)
assert(!reader.hasSkewedPartition)
assert(reader.hasCoalescedPartition)
assert(reader.metrics.keys.toSeq.sorted == Seq(
"avgPartitionDataSize", "maxPartitionDataSize", "minPartitionDataSize", "numPartitions"))
assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length)
assert(reader.metrics("avgPartitionDataSize").value > 0)
assert(reader.metrics("maxPartitionDataSize").value > 0)
assert(reader.metrics("minPartitionDataSize").value > 0)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val join = collect(adaptivePlan) {
case j: BroadcastHashJoinExec => j
}.head
assert(join.buildSide == BuildLeft)
val readers = collect(join.right) {
case r: CustomShuffleReaderExec => r
}
assert(readers.length == 1)
val reader = readers.head
assert(reader.isLocalReader)
assert(reader.metrics.keys.toSeq == Seq("numPartitions"))
assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length)
}
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000") {
withTempView("skewData1", "skewData2") {
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 2 as key1", "id as value1")
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 1 as key2", "id as value2")
.createOrReplaceTempView("skewData2")
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
val reader = collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
}.head
assert(!reader.isLocalReader)
assert(reader.hasSkewedPartition)
assert(reader.hasCoalescedPartition)
assert(reader.metrics.contains("numSkewedPartitions"))
assert(reader.metrics("numSkewedPartitions").value > 0)
}
}
}
}
} }