[SPARK-31226][CORE][TESTS] SizeBasedCoalesce logic will lose partition

### What changes were proposed in this pull request?

When last partition's splitFile's split size is larger then  maxSize, this partition will be lost

Origin logic error like below as 1, 2, 3, 4, 5
```scala
// 5. since index = partition.size now,  jump out of the loop , then the last partition is lost since we won't call updatePartition() again.
while (index < partitions.size) {
     //  1. we assume that when index = partitions.length -1(the last partition)
      val partition = partitions(index)
      val fileSplit =
        partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
      val splitSize = fileSplit.getLength
     // 2.  if  this partition's  splitSize > maxSize
      if (currentSum + splitSize < maxSize) {
        addPartition(partition, splitSize)
        index += 1
        if (index == partitions.size) {
          updateGroups
        }
      } else {
       //  3. if currentGroup.partitions.size  >0, this situation is possiable
        if (currentGroup.partitions.size == 0) {
          addPartition(partition, splitSize)
          index += 1
        } else {
        //   4. then will just call updateGroups() here first, and index won't update in group
          updateGroups
        }
      }
    }
    groups.toArray
  }
}
```
### Why are the changes needed?
Fix bug

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?

Manual code review.

Closes #27988 from AngersZhuuuu/SPARK-31226.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
angerszhu 2020-07-11 14:48:23 -07:00 committed by Dongjoon Hyun
parent 3ad4863673
commit 09789ff725

View file

@ -1298,19 +1298,15 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria
val splitSize = fileSplit.getLength
if (currentSum + splitSize < maxSize) {
addPartition(partition, splitSize)
index += 1
if (index == partitions.size) {
updateGroups
}
} else {
if (currentGroup.partitions.size == 0) {
if (currentGroup.partitions.nonEmpty) {
updateGroups()
}
addPartition(partition, splitSize)
}
index += 1
} else {
updateGroups
}
}
}
updateGroups()
groups.toArray
}
}