[SPARK-35888][SQL][FOLLOWUP] Return partition specs for all the shuffles
### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/33079, to fix a bug in corner cases: `ShufflePartitionsUtil.coalescePartitions` should either return the shuffle spec for all the shuffles, or none. If the input RDD has no partition, the `mapOutputStatistics` is None, and we should still return shuffle specs with size 0. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? a new test Closes #33158 from cloud-fan/bug. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
a98c8ae57d
commit
cd6a463811
|
@ -96,9 +96,8 @@ object ShufflePartitionsUtil extends Logging {
|
|||
|
||||
val numPartitions = validMetrics.head.bytesByPartitionId.length
|
||||
val newPartitionSpecs = coalescePartitions(0, numPartitions, validMetrics, targetSize)
|
||||
assert(newPartitionSpecs.length == validMetrics.length)
|
||||
if (newPartitionSpecs.head.length < numPartitions) {
|
||||
newPartitionSpecs
|
||||
if (newPartitionSpecs.length < numPartitions) {
|
||||
attachDataSize(mapOutputStatistics, newPartitionSpecs)
|
||||
} else {
|
||||
Seq.empty
|
||||
}
|
||||
|
@ -148,7 +147,8 @@ object ShufflePartitionsUtil extends Logging {
|
|||
if (i - 1 > start) {
|
||||
val partitionSpecs = coalescePartitions(
|
||||
partitionIndices(start), repeatValue, validMetrics, targetSize, true)
|
||||
newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2)
|
||||
newPartitionSpecsSeq.zip(attachDataSize(mapOutputStatistics, partitionSpecs))
|
||||
.foreach(spec => spec._1 ++= spec._2)
|
||||
}
|
||||
// find the end of this skew section, skipping partition(i - 1) and partition(i).
|
||||
var repeatIndex = i + 1
|
||||
|
@ -173,7 +173,8 @@ object ShufflePartitionsUtil extends Logging {
|
|||
if (numPartitions > start) {
|
||||
val partitionSpecs = coalescePartitions(
|
||||
partitionIndices(start), partitionIndices.last + 1, validMetrics, targetSize, true)
|
||||
newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2)
|
||||
newPartitionSpecsSeq.zip(attachDataSize(mapOutputStatistics, partitionSpecs))
|
||||
.foreach(spec => spec._1 ++= spec._2)
|
||||
}
|
||||
// only return coalesced result if any coalescing has happened.
|
||||
if (newPartitionSpecsSeq.head.length < numPartitions) {
|
||||
|
@ -204,19 +205,17 @@ object ShufflePartitionsUtil extends Logging {
|
|||
* - coalesced partition 2: shuffle partition 2 (size 170 MiB)
|
||||
* - coalesced partition 3: shuffle partition 3 and 4 (size 50 MiB)
|
||||
*
|
||||
* @return A sequence of sequence of [[CoalescedPartitionSpec]]s. which each inner sequence as
|
||||
* the new partition specs for its corresponding shuffle after coalescing. For example,
|
||||
* if partitions [0, 1, 2, 3, 4] and partition bytes [10, 10, 100, 10, 20] with
|
||||
* targetSize 100, split at indices [0, 2, 3], the returned partition specs will be:
|
||||
* CoalescedPartitionSpec(0, 2, 20), CoalescedPartitionSpec(2, 3, 100) and
|
||||
* CoalescedPartitionSpec(3, 5, 30).
|
||||
* @return A sequence of [[CoalescedPartitionSpec]]s. For example, if partitions [0, 1, 2, 3, 4]
|
||||
* split at indices [0, 2, 3], the returned partition specs will be:
|
||||
* CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and
|
||||
* CoalescedPartitionSpec(3, 5).
|
||||
*/
|
||||
private def coalescePartitions(
|
||||
start: Int,
|
||||
end: Int,
|
||||
mapOutputStatistics: Seq[MapOutputStatistics],
|
||||
targetSize: Long,
|
||||
allowReturnEmpty: Boolean = false): Seq[Seq[CoalescedPartitionSpec]] = {
|
||||
allowReturnEmpty: Boolean = false): Seq[CoalescedPartitionSpec] = {
|
||||
val partitionSpecs = ArrayBuffer.empty[CoalescedPartitionSpec]
|
||||
var coalescedSize = 0L
|
||||
var i = start
|
||||
|
@ -252,14 +251,20 @@ object ShufflePartitionsUtil extends Logging {
|
|||
}
|
||||
// If do not allowReturnEmpty, create at least one partition if all partitions are empty.
|
||||
createPartitionSpec(!allowReturnEmpty && partitionSpecs.isEmpty)
|
||||
partitionSpecs.toSeq
|
||||
}
|
||||
|
||||
// add data size for each partitionSpecs
|
||||
mapOutputStatistics.map { mapStats =>
|
||||
partitionSpecs.map { spec =>
|
||||
val dataSize = spec.startReducerIndex.until(spec.endReducerIndex)
|
||||
.map(mapStats.bytesByPartitionId).sum
|
||||
spec.copy(dataSize = Some(dataSize))
|
||||
}.toSeq
|
||||
private def attachDataSize(
|
||||
mapOutputStatistics: Seq[Option[MapOutputStatistics]],
|
||||
partitionSpecs: Seq[CoalescedPartitionSpec]): Seq[Seq[CoalescedPartitionSpec]] = {
|
||||
mapOutputStatistics.map {
|
||||
case Some(mapStats) =>
|
||||
partitionSpecs.map { spec =>
|
||||
val dataSize = spec.startReducerIndex.until(spec.endReducerIndex)
|
||||
.map(mapStats.bytesByPartitionId).sum
|
||||
spec.copy(dataSize = Some(dataSize))
|
||||
}.toSeq
|
||||
case None => partitionSpecs.map(_.copy(dataSize = Some(0))).toSeq
|
||||
}.toSeq
|
||||
}
|
||||
|
||||
|
|
|
@ -1861,4 +1861,28 @@ class AdaptiveQueryExecSuite
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-35888: join with a 0-partition table") {
|
||||
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
|
||||
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
|
||||
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
|
||||
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) {
|
||||
withTempView("t2") {
|
||||
// create a temp view with 0 partition
|
||||
spark.createDataFrame(sparkContext.emptyRDD[Row], new StructType().add("b", IntegerType))
|
||||
.createOrReplaceTempView("t2")
|
||||
val (_, adaptive) =
|
||||
runAdaptiveAndVerifyResult("SELECT * FROM testData2 t1 left semi join t2 ON t1.a=t2.b")
|
||||
val customReaders = collect(adaptive) {
|
||||
case c: CustomShuffleReaderExec => c
|
||||
}
|
||||
assert(customReaders.length == 2)
|
||||
customReaders.foreach { c =>
|
||||
val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
|
||||
assert(stats.sizeInBytes >= 0)
|
||||
assert(stats.rowCount.get >= 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue