[SPARK-35923][SQL] Coalesce empty partition with mixed CoalescedPartitionSpec and PartialReducerPartitionSpec

### What changes were proposed in this pull request?

Skip empty partitions in `ShufflePartitionsUtil.coalescePartitionsWithSkew`.

### Why are the changes needed?

Since [SPARK-35447](https://issues.apache.org/jira/browse/SPARK-35447), we apply `OptimizeSkewedJoin` before `CoalesceShufflePartitions`. However, There are something different with the order of these two rules.

Let's say if we have a skewed partitions: [0, 128MB, 0, 128MB, 0]:

* coalesce partitions first then optimize skewed partitions:
  [64MB, 64MB, 64MB, 64MB]
* optimize skewed partition first then coalesce partitions:
  [0, 64MB, 64MB, 0, 64MB, 64MB, 0]

So we can do coalesce in `ShufflePartitionsUtil.coalescePartitionsWithSkew` with mixed `CoalescedPartitionSpec` and `PartialReducerPartitionSpec` if `CoalescedPartitionSpec` is empty.

### Does this PR introduce _any_ user-facing change?

No, not release yet.

### How was this patch tested?

Add test.

Closes #33123 from ulysses-you/SPARK-35923.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
ulysses-you 2021-06-29 14:58:51 +00:00 committed by Wenchen Fan
parent 78e6263cce
commit def738365e
2 changed files with 48 additions and 5 deletions

View file

@ -147,7 +147,7 @@ object ShufflePartitionsUtil extends Logging {
// coalesce any partitions before partition(i - 1) and after the end of latest skew section.
if (i - 1 > start) {
val partitionSpecs = coalescePartitions(
partitionIndices(start), repeatValue, validMetrics, targetSize)
partitionIndices(start), repeatValue, validMetrics, targetSize, true)
newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2)
}
// find the end of this skew section, skipping partition(i - 1) and partition(i).
@ -172,7 +172,7 @@ object ShufflePartitionsUtil extends Logging {
// coalesce any partitions after the end of last skew section.
if (numPartitions > start) {
val partitionSpecs = coalescePartitions(
partitionIndices(start), partitionIndices.last + 1, validMetrics, targetSize)
partitionIndices(start), partitionIndices.last + 1, validMetrics, targetSize, true)
newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2)
}
// only return coalesced result if any coalescing has happened.
@ -215,7 +215,8 @@ object ShufflePartitionsUtil extends Logging {
start: Int,
end: Int,
mapOutputStatistics: Seq[MapOutputStatistics],
targetSize: Long): Seq[Seq[CoalescedPartitionSpec]] = {
targetSize: Long,
allowReturnEmpty: Boolean = false): Seq[Seq[CoalescedPartitionSpec]] = {
val partitionSpecs = ArrayBuffer.empty[CoalescedPartitionSpec]
var coalescedSize = 0L
var i = start
@ -249,8 +250,8 @@ object ShufflePartitionsUtil extends Logging {
}
i += 1
}
// Create at least one partition if all partitions are empty.
createPartitionSpec(partitionSpecs.isEmpty)
// If do not allowReturnEmpty, create at least one partition if all partitions are empty.
createPartitionSpec(!allowReturnEmpty && partitionSpecs.isEmpty)
// add data size for each partitionSpecs
mapOutputStatistics.map { mapStats =>

View file

@ -658,4 +658,46 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList4, targetSize).toSeq ==
Seq(0, 2, 3))
}
test("SPARK-35923: Coalesce empty partition with mixed CoalescedPartitionSpec and" +
"PartialReducerPartitionSpec") {
val targetSize = 100
val bytesByPartitionId1 = Array[Long](0, 200, 0, 10, 0, 0, 400, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 30, 10, 300, 0, 0, 5, 0, 0)
val specs1 =
Seq(CoalescedPartitionSpec(0, 1, 0)) ++ // 0 - empty
Seq.tabulate(2)(i => PartialReducerPartitionSpec(1, i, i + 1, 100)) ++ // 1 - skew
Seq(CoalescedPartitionSpec(2, 3, 0)) ++ // 2
Seq.fill(3)(CoalescedPartitionSpec(3, 4, 10)) ++ // 3 - other side skew
Seq.tabulate(2)(i => CoalescedPartitionSpec(4 + i, 5 + i, 0)) ++ // 4, 5 - empty
Seq.tabulate(4)(i => PartialReducerPartitionSpec(6, i, i + 1, 100)) ++ // 6 - skew
Seq.tabulate(2)(i => CoalescedPartitionSpec(7 + i, 8 + i, 0)) // 7, 8 - empty
val specs2 =
Seq(CoalescedPartitionSpec(0, 1, 0)) ++ // 0 - empty
Seq.fill(2)(CoalescedPartitionSpec(1, 2, 30)) ++ // 1 - other side skew
Seq(CoalescedPartitionSpec(2, 3, 10)) ++ // 2
Seq.tabulate(3)(i => PartialReducerPartitionSpec(3, i, i + 1, 100)) ++ // 3 - skew
Seq.tabulate(2)(i => CoalescedPartitionSpec(4 + i, 5 + i, 0)) ++ // 4, 5 - empty
Seq.fill(4)(CoalescedPartitionSpec(6, 7, 5)) ++ // 6 - other side skew
Seq.tabulate(2)(i => CoalescedPartitionSpec(7 + i, 8 + i, 0)) // 7, 8 - empty
val expected1 =
Seq.tabulate(2)(i => PartialReducerPartitionSpec(1, i, i + 1, 100)) ++ // 1 - skew
Seq(CoalescedPartitionSpec(2, 3, 0)) ++ // 2 not coalesce since other shuffle is not empty
Seq.fill(3)(CoalescedPartitionSpec(3, 4, 10)) ++ // 3 - other side skew
Seq.tabulate(4)(i => PartialReducerPartitionSpec(6, i, i + 1, 100)) // 6 - skew
val expected2 =
Seq.fill(2)(CoalescedPartitionSpec(1, 2, 30)) ++ // 1 - other side skew
Seq(CoalescedPartitionSpec(2, 3, 10)) ++ // 2
Seq.tabulate(3)(i => PartialReducerPartitionSpec(3, i, i + 1, 100)) ++ // 3 - skew
Seq.fill(4)(CoalescedPartitionSpec(6, 7, 5)) // 6 - other side skew
val coalesced = ShufflePartitionsUtil.coalescePartitions(
Array(
Some(new MapOutputStatistics(0, bytesByPartitionId1)),
Some(new MapOutputStatistics(1, bytesByPartitionId2))),
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
assert(coalesced == Seq(expected1, expected2))
}
}