[SPARK-36228][SQL] Skip splitting a skewed partition when some map outputs are removed

### What changes were proposed in this pull request?

Sometimes, AQE skew join optimization can fail with NPE. This is because AQE tries to get the shuffle block sizes, but some map outputs are missing due to the executor lost or something.

This PR fixes this bug by skipping skew join handling if some map outputs are missing in the `MapOutputTracker`.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

a new UT

Closes #33445 from cloud-fan/bug.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9c8a3d3975)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2021-07-21 22:17:56 +08:00
parent 06520b2849
commit f4291e373e
2 changed files with 38 additions and 4 deletions

View file

@ -362,11 +362,15 @@ object ShufflePartitionsUtil extends Logging {
}
/**
* Get the map size of the specific reduce shuffle Id.
* Get the map size of the specific shuffle and reduce ID. Note that, some map outputs can be
* missing due to issues like executor lost. The size will be -1 for missing map outputs and the
* caller side should take care of it.
*/
private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = {
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
mapOutputTracker.shuffleStatuses(shuffleId).withMapStatuses(_.map { stat =>
if (stat == null) -1 else stat.getSizeForBlock(partitionId)
})
}
/**
@ -378,6 +382,7 @@ object ShufflePartitionsUtil extends Logging {
reducerId: Int,
targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = {
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId)
if (mapPartitionSizes.exists(_ < 0)) return None
val mapStartIndices = splitSizeListByTargetSize(mapPartitionSizes, targetSize)
if (mapStartIndices.length > 1) {
Some(mapStartIndices.indices.map { i =>

View file

@ -17,10 +17,12 @@
package org.apache.spark.sql.execution
import org.apache.spark.{MapOutputStatistics, SparkFunSuite}
import org.apache.spark.{LocalSparkContext, MapOutputStatistics, MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil
import org.apache.spark.storage.BlockManagerId
class ShufflePartitionsUtilSuite extends SparkFunSuite {
class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext {
private def checkEstimation(
bytesByPartitionIdArray: Array[Array[Long]],
@ -765,4 +767,31 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
targetSize, 1, 0)
assert(coalesced == Seq(expected1, expected2))
}
test("SPARK-36228: Skip splitting a skewed partition when some map outputs are removed") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[2]"))
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
mapOutputTracker.registerShuffle(shuffleId = 10, numMaps = 2, numReduces = 1)
mapOutputTracker.registerMapOutput(shuffleId = 10, mapIndex = 0, MapStatus(
BlockManagerId("a", "hostA", port = 1000),
Array(MapStatus.compressSize(10)),
mapTaskId = 5))
mapOutputTracker.registerMapOutput(shuffleId = 10, mapIndex = 1, MapStatus(
BlockManagerId("b", "hostB", port = 1000),
Array(MapStatus.compressSize(20)),
mapTaskId = 6))
val skewPartitionSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs(
shuffleId = 10, reducerId = 0, targetSize = 2)
assert(skewPartitionSpecs.isDefined)
// Returns 2 partition specs because there are 2 mappers.
assert(skewPartitionSpecs.get.size == 2)
// As if one map output is removed
mapOutputTracker.unregisterMapOutput(
shuffleId = 10, mapIndex = 0, BlockManagerId("a", "hostA", port = 1000))
val skewPartitionSpecsAfterRemoval = ShufflePartitionsUtil.createSkewPartitionSpecs(
shuffleId = 10, reducerId = 0, targetSize = 2)
assert(skewPartitionSpecsAfterRemoval.isEmpty)
}
}