[SPARK-35639][SQL] Add metrics about coalesced partitions to AQEShuffleRead in AQE

### What changes were proposed in this pull request?

AQEShuffleReadExec already reports "number of skewed partitions" and "number of skewed partition splits".
It would be useful to also report "number of coalesced partitions" and for ShuffleExchange to report "number of partitions"
This way it's clear what happened on the map side and on the reduce side.

![Metrics](https://user-images.githubusercontent.com/4297661/126729820-cf01b3fa-7bc4-44a5-8098-91689766a68a.png)

### Why are the changes needed?

Improves usability

### Does this PR introduce _any_ user-facing change?

Yes, it now provides more information about `AQEShuffleReadExec` operator behavior in the metrics system.

### How was this patch tested?

Existing tests

Closes #32776 from ekoifman/PRISM-91635-customshufflereader-sql-metrics.

Authored-by: Eugene Koifman <eugene.koifman@workday.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Eugene Koifman 2021-07-28 13:49:48 +08:00 committed by Wenchen Fan
parent 23a6ffa5dc
commit 41a16ebf11
4 changed files with 35 additions and 9 deletions

View file

@ -89,16 +89,20 @@ case class AQEShuffleReadExec private(
Iterator(desc)
}
/**
* Returns true iff some partitions were actually combined
*/
private def isCoalesced(spec: ShufflePartitionSpec) = spec match {
case CoalescedPartitionSpec(0, 0, _) => true
case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1
case _ => false
}
/**
* Returns true iff some non-empty partitions were combined
*/
def hasCoalescedPartition: Boolean = {
partitionSpecs.exists {
// shuffle from empty RDD
case CoalescedPartitionSpec(0, 0, _) => true
case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1
case _ => false
}
partitionSpecs.exists(isCoalesced)
}
def hasSkewedPartition: Boolean =
@ -153,6 +157,13 @@ case class AQEShuffleReadExec private(
driverAccumUpdates += (skewedSplits.id -> numSplits)
}
if (hasCoalescedPartition) {
val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions")
val x = partitionSpecs.count(isCoalesced)
numCoalescedPartitionsMetric.set(x)
driverAccumUpdates += numCoalescedPartitionsMetric.id -> x
}
partitionDataSizes.foreach { dataSizes =>
val partitionDataSizeMetrics = metrics("partitionDataSize")
driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _)
@ -183,6 +194,13 @@ case class AQEShuffleReadExec private(
} else {
Map.empty
}
} ++ {
if (hasCoalescedPartition) {
Map("numCoalescedPartitions" ->
SQLMetrics.createMetric(sparkContext, "number of coalesced partitions"))
} else {
Map.empty
}
}
} else {
// It's a canonicalized plan, no need to report metrics.

View file

@ -123,7 +123,8 @@ case class ShuffleExchangeExec(
private[sql] lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")
) ++ readMetrics ++ writeMetrics
override def nodeName: String = "Exchange"
@ -164,12 +165,17 @@ case class ShuffleExchangeExec(
*/
@transient
lazy val shuffleDependency : ShuffleDependency[Int, InternalRow, InternalRow] = {
ShuffleExchangeExec.prepareShuffleDependency(
val dep = ShuffleExchangeExec.prepareShuffleDependency(
inputRDD,
child.output,
outputPartitioning,
serializer,
writeMetrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext, executionId, metrics("numPartitions") :: Nil)
dep
}
/**

View file

@ -572,6 +572,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
"""
|(11) AQEShuffleRead
|Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|Arguments: coalesced
|""".stripMargin,
"""
|(16) BroadcastHashJoin

View file

@ -927,7 +927,8 @@ class AdaptiveQueryExecSuite
assert(!read.hasSkewedPartition)
assert(read.hasCoalescedPartition)
assert(read.metrics.keys.toSeq.sorted == Seq(
"numPartitions", "partitionDataSize"))
"numCoalescedPartitions", "numPartitions", "partitionDataSize"))
assert(read.metrics("numCoalescedPartitions").value == 1)
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)