From 09789ff725f1718a55df9595e736c3cf13746f25 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 11 Jul 2020 14:48:23 -0700 Subject: [PATCH] [SPARK-31226][CORE][TESTS] SizeBasedCoalesce logic will lose partition ### What changes were proposed in this pull request? When last partition's splitFile's split size is larger then maxSize, this partition will be lost Origin logic error like below as 1, 2, 3, 4, 5 ```scala // 5. since index = partition.size now, jump out of the loop , then the last partition is lost since we won't call updatePartition() again. while (index < partitions.size) { // 1. we assume that when index = partitions.length -1(the last partition) val partition = partitions(index) val fileSplit = partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit] val splitSize = fileSplit.getLength // 2. if this partition's splitSize > maxSize if (currentSum + splitSize < maxSize) { addPartition(partition, splitSize) index += 1 if (index == partitions.size) { updateGroups } } else { // 3. if currentGroup.partitions.size >0, this situation is possiable if (currentGroup.partitions.size == 0) { addPartition(partition, splitSize) index += 1 } else { // 4. then will just call updateGroups() here first, and index won't update in group updateGroups } } } groups.toArray } } ``` ### Why are the changes needed? Fix bug ### Does this PR introduce any user-facing change? NO ### How was this patch tested? Manual code review. Closes #27988 from AngersZhuuuu/SPARK-31226. Authored-by: angerszhu Signed-off-by: Dongjoon Hyun --- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 1f4e784723..79f9c1396c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -1298,19 +1298,15 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria val splitSize = fileSplit.getLength if (currentSum + splitSize < maxSize) { addPartition(partition, splitSize) - index += 1 - if (index == partitions.size) { - updateGroups - } } else { - if (currentGroup.partitions.size == 0) { - addPartition(partition, splitSize) - index += 1 - } else { - updateGroups + if (currentGroup.partitions.nonEmpty) { + updateGroups() } + addPartition(partition, splitSize) } + index += 1 } + updateGroups() groups.toArray } }