diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 3cf0c6c408..6106a562e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -147,7 +147,7 @@ object ShufflePartitionsUtil extends Logging { // coalesce any partitions before partition(i - 1) and after the end of latest skew section. if (i - 1 > start) { val partitionSpecs = coalescePartitions( - partitionIndices(start), repeatValue, validMetrics, targetSize) + partitionIndices(start), repeatValue, validMetrics, targetSize, true) newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2) } // find the end of this skew section, skipping partition(i - 1) and partition(i). @@ -172,7 +172,7 @@ object ShufflePartitionsUtil extends Logging { // coalesce any partitions after the end of last skew section. if (numPartitions > start) { val partitionSpecs = coalescePartitions( - partitionIndices(start), partitionIndices.last + 1, validMetrics, targetSize) + partitionIndices(start), partitionIndices.last + 1, validMetrics, targetSize, true) newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2) } // only return coalesced result if any coalescing has happened. @@ -215,7 +215,8 @@ object ShufflePartitionsUtil extends Logging { start: Int, end: Int, mapOutputStatistics: Seq[MapOutputStatistics], - targetSize: Long): Seq[Seq[CoalescedPartitionSpec]] = { + targetSize: Long, + allowReturnEmpty: Boolean = false): Seq[Seq[CoalescedPartitionSpec]] = { val partitionSpecs = ArrayBuffer.empty[CoalescedPartitionSpec] var coalescedSize = 0L var i = start @@ -249,8 +250,8 @@ object ShufflePartitionsUtil extends Logging { } i += 1 } - // Create at least one partition if all partitions are empty. - createPartitionSpec(partitionSpecs.isEmpty) + // If do not allowReturnEmpty, create at least one partition if all partitions are empty. + createPartitionSpec(!allowReturnEmpty && partitionSpecs.isEmpty) // add data size for each partitionSpecs mapOutputStatistics.map { mapStats => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala index beefc91ef6..53cff47b06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala @@ -658,4 +658,46 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList4, targetSize).toSeq == Seq(0, 2, 3)) } + + test("SPARK-35923: Coalesce empty partition with mixed CoalescedPartitionSpec and" + + "PartialReducerPartitionSpec") { + val targetSize = 100 + val bytesByPartitionId1 = Array[Long](0, 200, 0, 10, 0, 0, 400, 0, 0) + val bytesByPartitionId2 = Array[Long](0, 30, 10, 300, 0, 0, 5, 0, 0) + val specs1 = + Seq(CoalescedPartitionSpec(0, 1, 0)) ++ // 0 - empty + Seq.tabulate(2)(i => PartialReducerPartitionSpec(1, i, i + 1, 100)) ++ // 1 - skew + Seq(CoalescedPartitionSpec(2, 3, 0)) ++ // 2 + Seq.fill(3)(CoalescedPartitionSpec(3, 4, 10)) ++ // 3 - other side skew + Seq.tabulate(2)(i => CoalescedPartitionSpec(4 + i, 5 + i, 0)) ++ // 4, 5 - empty + Seq.tabulate(4)(i => PartialReducerPartitionSpec(6, i, i + 1, 100)) ++ // 6 - skew + Seq.tabulate(2)(i => CoalescedPartitionSpec(7 + i, 8 + i, 0)) // 7, 8 - empty + val specs2 = + Seq(CoalescedPartitionSpec(0, 1, 0)) ++ // 0 - empty + Seq.fill(2)(CoalescedPartitionSpec(1, 2, 30)) ++ // 1 - other side skew + Seq(CoalescedPartitionSpec(2, 3, 10)) ++ // 2 + Seq.tabulate(3)(i => PartialReducerPartitionSpec(3, i, i + 1, 100)) ++ // 3 - skew + Seq.tabulate(2)(i => CoalescedPartitionSpec(4 + i, 5 + i, 0)) ++ // 4, 5 - empty + Seq.fill(4)(CoalescedPartitionSpec(6, 7, 5)) ++ // 6 - other side skew + Seq.tabulate(2)(i => CoalescedPartitionSpec(7 + i, 8 + i, 0)) // 7, 8 - empty + val expected1 = + Seq.tabulate(2)(i => PartialReducerPartitionSpec(1, i, i + 1, 100)) ++ // 1 - skew + Seq(CoalescedPartitionSpec(2, 3, 0)) ++ // 2 not coalesce since other shuffle is not empty + Seq.fill(3)(CoalescedPartitionSpec(3, 4, 10)) ++ // 3 - other side skew + Seq.tabulate(4)(i => PartialReducerPartitionSpec(6, i, i + 1, 100)) // 6 - skew + val expected2 = + Seq.fill(2)(CoalescedPartitionSpec(1, 2, 30)) ++ // 1 - other side skew + Seq(CoalescedPartitionSpec(2, 3, 10)) ++ // 2 + Seq.tabulate(3)(i => PartialReducerPartitionSpec(3, i, i + 1, 100)) ++ // 3 - skew + Seq.fill(4)(CoalescedPartitionSpec(6, 7, 5)) // 6 - other side skew + val coalesced = ShufflePartitionsUtil.coalescePartitions( + Array( + Some(new MapOutputStatistics(0, bytesByPartitionId1)), + Some(new MapOutputStatistics(1, bytesByPartitionId2))), + Seq( + Some(specs1), + Some(specs2)), + targetSize, 1) + assert(coalesced == Seq(expected1, expected2)) + } }