[SPARK-35639][SQL] Add metrics about coalesced partitions to AQEShuffleRead in AQE
### What changes were proposed in this pull request?
AQEShuffleReadExec already reports "number of skewed partitions" and "number of skewed partition splits".
It would be useful to also report "number of coalesced partitions" and for ShuffleExchange to report "number of partitions"
This way it's clear what happened on the map side and on the reduce side.
![Metrics](https://user-images.githubusercontent.com/4297661/126729820-cf01b3fa-7bc4-44a5-8098-91689766a68a.png)
### Why are the changes needed?
Improves usability
### Does this PR introduce _any_ user-facing change?
Yes, it now provides more information about `AQEShuffleReadExec` operator behavior in the metrics system.
### How was this patch tested?
Existing tests
Closes #32776 from ekoifman/PRISM-91635-customshufflereader-sql-metrics.
Authored-by: Eugene Koifman <eugene.koifman@workday.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 41a16ebf11
)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
993ffafc3e
commit
c59e54fe0e
|
@ -89,16 +89,20 @@ case class AQEShuffleReadExec private(
|
|||
Iterator(desc)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff some partitions were actually combined
|
||||
*/
|
||||
private def isCoalesced(spec: ShufflePartitionSpec) = spec match {
|
||||
case CoalescedPartitionSpec(0, 0, _) => true
|
||||
case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1
|
||||
case _ => false
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff some non-empty partitions were combined
|
||||
*/
|
||||
def hasCoalescedPartition: Boolean = {
|
||||
partitionSpecs.exists {
|
||||
// shuffle from empty RDD
|
||||
case CoalescedPartitionSpec(0, 0, _) => true
|
||||
case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1
|
||||
case _ => false
|
||||
}
|
||||
partitionSpecs.exists(isCoalesced)
|
||||
}
|
||||
|
||||
def hasSkewedPartition: Boolean =
|
||||
|
@ -153,6 +157,13 @@ case class AQEShuffleReadExec private(
|
|||
driverAccumUpdates += (skewedSplits.id -> numSplits)
|
||||
}
|
||||
|
||||
if (hasCoalescedPartition) {
|
||||
val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions")
|
||||
val x = partitionSpecs.count(isCoalesced)
|
||||
numCoalescedPartitionsMetric.set(x)
|
||||
driverAccumUpdates += numCoalescedPartitionsMetric.id -> x
|
||||
}
|
||||
|
||||
partitionDataSizes.foreach { dataSizes =>
|
||||
val partitionDataSizeMetrics = metrics("partitionDataSize")
|
||||
driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _)
|
||||
|
@ -183,6 +194,13 @@ case class AQEShuffleReadExec private(
|
|||
} else {
|
||||
Map.empty
|
||||
}
|
||||
} ++ {
|
||||
if (hasCoalescedPartition) {
|
||||
Map("numCoalescedPartitions" ->
|
||||
SQLMetrics.createMetric(sparkContext, "number of coalesced partitions"))
|
||||
} else {
|
||||
Map.empty
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// It's a canonicalized plan, no need to report metrics.
|
||||
|
|
|
@ -123,7 +123,8 @@ case class ShuffleExchangeExec(
|
|||
private[sql] lazy val readMetrics =
|
||||
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
|
||||
override lazy val metrics = Map(
|
||||
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
|
||||
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
|
||||
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")
|
||||
) ++ readMetrics ++ writeMetrics
|
||||
|
||||
override def nodeName: String = "Exchange"
|
||||
|
@ -164,12 +165,17 @@ case class ShuffleExchangeExec(
|
|||
*/
|
||||
@transient
|
||||
lazy val shuffleDependency : ShuffleDependency[Int, InternalRow, InternalRow] = {
|
||||
ShuffleExchangeExec.prepareShuffleDependency(
|
||||
val dep = ShuffleExchangeExec.prepareShuffleDependency(
|
||||
inputRDD,
|
||||
child.output,
|
||||
outputPartitioning,
|
||||
serializer,
|
||||
writeMetrics)
|
||||
metrics("numPartitions").set(dep.partitioner.numPartitions)
|
||||
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
|
||||
SQLMetrics.postDriverMetricUpdates(
|
||||
sparkContext, executionId, metrics("numPartitions") :: Nil)
|
||||
dep
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -572,6 +572,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
|
|||
"""
|
||||
|(11) AQEShuffleRead
|
||||
|Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|
||||
|Arguments: coalesced
|
||||
|""".stripMargin,
|
||||
"""
|
||||
|(16) BroadcastHashJoin
|
||||
|
|
|
@ -927,7 +927,8 @@ class AdaptiveQueryExecSuite
|
|||
assert(!read.hasSkewedPartition)
|
||||
assert(read.hasCoalescedPartition)
|
||||
assert(read.metrics.keys.toSeq.sorted == Seq(
|
||||
"numPartitions", "partitionDataSize"))
|
||||
"numCoalescedPartitions", "numPartitions", "partitionDataSize"))
|
||||
assert(read.metrics("numCoalescedPartitions").value == 1)
|
||||
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
|
||||
assert(read.metrics("partitionDataSize").value > 0)
|
||||
|
||||
|
|
Loading…
Reference in a new issue