[SPARK-35135][CORE] Turn the WritablePartitionedIterator from a trait into a default implementation class

### What changes were proposed in this pull request?
`WritablePartitionedIterator` define in `WritablePartitionedPairCollection.scala` and there are two implementation of these trait,  but the code for these two implementations is duplicate.

The main change of this pr is turn the `WritablePartitionedIterator` from a trait into a default implementation class because there is only one implementation now.

### Why are the changes needed?
Cleanup duplicate code.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #32232 from LuciferYang/writable-partitioned-iterator.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
This commit is contained in:
yangjie01 2021-04-29 11:46:24 +08:00 committed by yi.wu
parent 403e4795e9
commit 74b93261af
3 changed files with 18 additions and 32 deletions

View file

@ -263,7 +263,7 @@ private[spark] class ExternalSorter[K, V, C](
/**
* Spill contents of in-memory iterator to a temporary file on disk.
*/
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator[K, C])
: SpilledFile = {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
@ -750,7 +750,7 @@ private[spark] class ExternalSorter[K, V, C](
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext()) {
while (it.hasNext) {
val partitionId = it.nextPartition()
var partitionWriter: ShufflePartitionWriter = null
var partitionPairsWriter: ShufflePartitionPairsWriter = null
@ -866,18 +866,7 @@ private[spark] class ExternalSorter[K, V, C](
if (hasSpilled) {
false
} else {
val inMemoryIterator = new WritablePartitionedIterator {
private[this] var cur = if (upstream.hasNext) upstream.next() else null
def writeNext(writer: PairsWriter): Unit = {
writer.write(cur._1._2, cur._2)
cur = if (upstream.hasNext) upstream.next() else null
}
def hasNext(): Boolean = cur != null
def nextPartition(): Int = cur._1._1
}
val inMemoryIterator = new WritablePartitionedIterator[K, C](upstream)
logInfo(s"Task ${TaskContext.get().taskAttemptId} force spilling in-memory map to disk " +
s"and it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)

View file

@ -46,20 +46,9 @@ private[spark] trait WritablePartitionedPairCollection[K, V] {
* This may destroy the underlying collection.
*/
def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
: WritablePartitionedIterator = {
: WritablePartitionedIterator[K, V] = {
val it = partitionedDestructiveSortedIterator(keyComparator)
new WritablePartitionedIterator {
private[this] var cur = if (it.hasNext) it.next() else null
def writeNext(writer: PairsWriter): Unit = {
writer.write(cur._1._2, cur._2)
cur = if (it.hasNext) it.next() else null
}
def hasNext(): Boolean = cur != null
def nextPartition(): Int = cur._1._1
}
new WritablePartitionedIterator[K, V](it)
}
}
@ -87,10 +76,15 @@ private[spark] object WritablePartitionedPairCollection {
* Iterator that writes elements to a DiskBlockObjectWriter instead of returning them. Each element
* has an associated partition.
*/
private[spark] trait WritablePartitionedIterator {
def writeNext(writer: PairsWriter): Unit
private[spark] class WritablePartitionedIterator[K, V](it: Iterator[((Int, K), V)]) {
private[this] var cur = if (it.hasNext) it.next() else null
def hasNext(): Boolean
def writeNext(writer: PairsWriter): Unit = {
writer.write(cur._1._2, cur._2)
cur = if (it.hasNext) it.next() else null
}
def nextPartition(): Int
def hasNext: Boolean = cur != null
def nextPartition(): Int = cur._1._1
}

View file

@ -55,7 +55,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBinary"),
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getArray"),
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getMap"),
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild")
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild"),
// [SPARK-35135][CORE] Turn WritablePartitionedIterator from trait into a default implementation class
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.WritablePartitionedIterator")
)
// Exclude rules for 3.1.x