[SPARK-36892][CORE] Disable batch fetch for a shuffle when push based shuffle is enabled

We found an issue where user configured both AQE and push based shuffle, but the job started to hang after running some  stages. We took the thread dump from the Executors, which showed the task is still waiting to fetch shuffle blocks.
Proposed changes in the PR to fix the issue.

### What changes were proposed in this pull request?
Disabled Batch fetch when push based shuffle is enabled.

### Why are the changes needed?
Without this patch, enabling AQE and Push based shuffle will have a chance to hang the tasks.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested the PR within our PR, with Spark shell and the queries are:

sql("""SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS s_item_id, CAST(rand() * 100 AS INT) AS s_quantity, DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date FROM RANGE(1000000000)""").createOrReplaceTempView("sales")
// Dynamically coalesce partitions
sql("""SELECT s_date, sum(s_quantity) AS q FROM sales GROUP BY s_date ORDER BY q DESC""").collect

Unit tests to be added.

Closes #34156 from zhouyejoe/SPARK-36892.

Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 31b6f614d3)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
This commit is contained in:
Ye Zhou 2021-10-06 15:42:25 +08:00 committed by Gengliang Wang
parent 688808900d
commit 88f4809142
4 changed files with 183 additions and 17 deletions

View file

@ -436,6 +436,8 @@ private[spark] case class GetMapOutputMessage(shuffleId: Int,
context: RpcCallContext) extends MapOutputTrackerMasterMessage context: RpcCallContext) extends MapOutputTrackerMasterMessage
private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int, private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int,
context: RpcCallContext) extends MapOutputTrackerMasterMessage context: RpcCallContext) extends MapOutputTrackerMasterMessage
private[spark] case class MapSizesByExecutorId(
iter: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], enableBatchFetch: Boolean)
/** RpcEndpoint class for MapOutputTrackerMaster */ /** RpcEndpoint class for MapOutputTrackerMaster */
private[spark] class MapOutputTrackerMasterEndpoint( private[spark] class MapOutputTrackerMasterEndpoint(
@ -512,12 +514,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1)
} }
// For testing
def getPushBasedShuffleMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: MapSizesByExecutorId = {
getPushBasedShuffleMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1)
}
/** /**
* Called from executors to get the server URIs and output sizes for each shuffle block that * Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but * needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range) within a range of mappers (startMapIndex is included * endPartition is excluded from the range) within a range of mappers (startMapIndex is included
* but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be * but endMapIndex is excluded) when push based shuffle is not enabled for the specific shuffle
* changed to the length of total map outputs. * dependency. If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length
* of total map outputs.
* *
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size, map index) * and the second item is a sequence of (shuffle block id, shuffle block size, map index)
@ -529,7 +538,34 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
startMapIndex: Int, startMapIndex: Int,
endMapIndex: Int, endMapIndex: Int,
startPartition: Int, startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
val mapSizesByExecutorId = getPushBasedShuffleMapSizesByExecutorId(
shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
assert(mapSizesByExecutorId.enableBatchFetch == true)
mapSizesByExecutorId.iter
}
/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range) within a range of mappers (startMapIndex is included
* but endMapIndex is excluded) when push based shuffle is enabled for the specific shuffle
* dependency. If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length
* of total map outputs.
*
* @return A case class object which includes two attributes. The first attribute is a sequence
* of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
* second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
* tuples describing the shuffle blocks that are stored at that block manager. Note that
* zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
* indicating whether batch fetch can be enabled.
*/
def getPushBasedShuffleMapSizesByExecutorId(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): MapSizesByExecutorId
/** /**
* Called from executors upon fetch failure on an entire merged shuffle reduce partition. * Called from executors upon fetch failure on an entire merged shuffle reduce partition.
@ -1060,12 +1096,12 @@ private[spark] class MapOutputTrackerMaster(
} }
// This method is only called in local-mode. // This method is only called in local-mode.
def getMapSizesByExecutorId( def getPushBasedShuffleMapSizesByExecutorId(
shuffleId: Int, shuffleId: Int,
startMapIndex: Int, startMapIndex: Int,
endMapIndex: Int, endMapIndex: Int,
startPartition: Int, startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { endPartition: Int): MapSizesByExecutorId = {
logDebug(s"Fetching outputs for shuffle $shuffleId") logDebug(s"Fetching outputs for shuffle $shuffleId")
shuffleStatuses.get(shuffleId) match { shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) => case Some(shuffleStatus) =>
@ -1077,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster(
shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex)
} }
case None => case None =>
Iterator.empty MapSizesByExecutorId(Iterator.empty, true)
} }
} }
@ -1138,12 +1174,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
*/ */
private val fetchingLock = new KeyLock[Int] private val fetchingLock = new KeyLock[Int]
override def getMapSizesByExecutorId( override def getPushBasedShuffleMapSizesByExecutorId(
shuffleId: Int, shuffleId: Int,
startMapIndex: Int, startMapIndex: Int,
endMapIndex: Int, endMapIndex: Int,
startPartition: Int, startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { endPartition: Int): MapSizesByExecutorId = {
logDebug(s"Fetching outputs for shuffle $shuffleId") logDebug(s"Fetching outputs for shuffle $shuffleId")
val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf) val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf)
try { try {
@ -1439,10 +1475,10 @@ private[spark] object MapOutputTracker extends Logging {
mapStatuses: Array[MapStatus], mapStatuses: Array[MapStatus],
startMapIndex : Int, startMapIndex : Int,
endMapIndex: Int, endMapIndex: Int,
mergeStatuses: Option[Array[MergeStatus]] = None): mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = {
Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
assert (mapStatuses != null) assert (mapStatuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
var enableBatchFetch = true
// Only use MergeStatus for reduce tasks that fetch all map outputs. Since a merged shuffle // Only use MergeStatus for reduce tasks that fetch all map outputs. Since a merged shuffle
// partition consists of blocks merged in random order, we are unable to serve map index // partition consists of blocks merged in random order, we are unable to serve map index
// subrange requests. However, when a reduce task needs to fetch blocks from a subrange of // subrange requests. However, when a reduce task needs to fetch blocks from a subrange of
@ -1451,8 +1487,10 @@ private[spark] object MapOutputTracker extends Logging {
// TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle, // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
// TODO: improve push based shuffle to read partial merged blocks satisfying the start/end // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
// TODO: map indexes // TODO: map indexes
if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0 if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0
&& endMapIndex == mapStatuses.length) { && endMapIndex == mapStatuses.length) {
enableBatchFetch = false
logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled for $shuffleId.")
// We have MergeStatus and full range of mapIds are requested so return a merged block. // We have MergeStatus and full range of mapIds are requested so return a merged block.
val numMaps = mapStatuses.length val numMaps = mapStatuses.length
mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach { mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach {
@ -1497,7 +1535,7 @@ private[spark] object MapOutputTracker extends Logging {
} }
} }
splitsByAddress.mapValues(_.toSeq).iterator MapSizesByExecutorId(splitsByAddress.mapValues(_.toSeq).iterator, enableBatchFetch)
} }
/** /**

View file

@ -129,11 +129,21 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
endPartition: Int, endPartition: Int,
context: TaskContext, context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) val (blocksByAddress, canEnableBatchFetch) =
if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(res.iter, res.enableBatchFetch)
} else {
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(address, true)
}
new BlockStoreShuffleReader( new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) shouldBatchFetch =
canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))
} }
/** Get a writer for a given partition. Called on executors by map tasks. */ /** Get a writer for a given partition. Called on executors by map tasks. */

View file

@ -362,6 +362,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
test("SPARK-32921: get map sizes with merged shuffle") { test("SPARK-32921: get map sizes with merged shuffle") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true) conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val hostname = "localhost" val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))
@ -391,7 +392,9 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
bitmap, 3000L)) bitmap, 3000L))
slaveTracker.updateEpoch(masterTracker.getEpoch) slaveTracker.updateEpoch(masterTracker.getEpoch)
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq === val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0)
assert(mapSizesByExecutorId.enableBatchFetch === false)
assert(mapSizesByExecutorId.iter.toSeq ===
Seq((blockMgrId, ArrayBuffer((ShuffleMergedBlockId(10, 0, 0), 3000, -1), Seq((blockMgrId, ArrayBuffer((ShuffleMergedBlockId(10, 0, 0), 3000, -1),
(ShuffleBlockId(10, 2, 0), size1000, 2))))) (ShuffleBlockId(10, 2, 0), size1000, 2)))))
@ -404,6 +407,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
test("SPARK-32921: get map statuses from merged shuffle") { test("SPARK-32921: get map statuses from merged shuffle") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true) conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val hostname = "localhost" val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))
@ -436,6 +440,8 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
bitmap, 4000L)) bitmap, 4000L))
slaveTracker.updateEpoch(masterTracker.getEpoch) slaveTracker.updateEpoch(masterTracker.getEpoch)
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0)
assert(mapSizesByExecutorId.enableBatchFetch === false)
assert(slaveTracker.getMapSizesForMergeResult(10, 0).toSeq === assert(slaveTracker.getMapSizesForMergeResult(10, 0).toSeq ===
Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0), Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0),
(ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2), (ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2),
@ -449,6 +455,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
test("SPARK-32921: get map statuses for merged shuffle block chunks") { test("SPARK-32921: get map statuses for merged shuffle block chunks") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true) conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val hostname = "localhost" val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))
@ -736,4 +743,115 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
tracker.stop() tracker.stop()
} }
} }
test("SPARK-36892: Batch fetch should be enabled in some scenarios with push based shuffle") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))
val masterTracker = newTrackerMaster()
masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf))
val slaveTracker = new MapOutputTrackerWorker(conf)
slaveTracker.trackerEndpoint =
slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
masterTracker.registerShuffle(10, 4, 1)
slaveTracker.updateEpoch(masterTracker.getEpoch)
val blockMgrId = BlockManagerId("a", "hostA", 1000)
masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L), 0))
masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L), 1))
masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L), 2))
masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L), 3))
slaveTracker.updateEpoch(masterTracker.getEpoch)
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0)
// Batch fetch should be enabled when there are no merged shuffle files
assert(mapSizesByExecutorId.enableBatchFetch === true)
assert(mapSizesByExecutorId.iter.toSeq ===
Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0),
(ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2),
(ShuffleBlockId(10, 3, 0), size1000, 3)))))
masterTracker.registerShuffle(11, 4, 1)
slaveTracker.updateEpoch(masterTracker.getEpoch)
val bitmap = new RoaringBitmap()
bitmap.add(0)
bitmap.add(1)
bitmap.add(3)
masterTracker.registerMergeResult(11, 0, MergeStatus(blockMgrId, 0,
bitmap, 3000L))
masterTracker.registerMapOutput(11, 0, MapStatus(blockMgrId, Array(1000L), 0))
masterTracker.registerMapOutput(11, 1, MapStatus(blockMgrId, Array(1000L), 1))
masterTracker.registerMapOutput(11, 2, MapStatus(blockMgrId, Array(1000L), 2))
masterTracker.registerMapOutput(11, 3, MapStatus(blockMgrId, Array(1000L), 3))
slaveTracker.updateEpoch(masterTracker.getEpoch)
val mapSizesByExecutorId2 = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(11, 0, 2, 0, 1)
// Batch fetch should be enabled when it only fetches subsets of mapper outputs
assert(mapSizesByExecutorId2.enableBatchFetch === true)
assert(mapSizesByExecutorId2.iter.toSeq ===
Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(11, 0, 0), size1000, 0),
(ShuffleBlockId(11, 1, 0), size1000, 1)))))
masterTracker.unregisterShuffle(10)
masterTracker.unregisterShuffle(11)
masterTracker.stop()
slaveTracker.stop()
rpcEnv.shutdown()
slaveRpcEnv.shutdown()
}
test("SPARK-36892: Batch fetch should be disabled in some scenarios with push based shuffle") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))
val masterTracker = newTrackerMaster()
masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf))
val slaveTracker = new MapOutputTrackerWorker(conf)
slaveTracker.trackerEndpoint =
slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
masterTracker.registerShuffle(10, 4, 2)
assert(masterTracker.containsShuffle(10))
val blockMgrId = BlockManagerId("a", "hostA", 1000)
masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L, 1000L), 0))
masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L, 1000L), 1))
masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L, 1000L), 2))
masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L, 1000L), 3))
slaveTracker.updateEpoch(masterTracker.getEpoch)
val bitmap = new RoaringBitmap()
bitmap.add(0)
bitmap.add(1)
masterTracker.registerMergeResult(10, 0, MergeStatus(blockMgrId, 0,
bitmap, 2000L))
masterTracker.registerMergeResult(10, 1, MergeStatus(blockMgrId, 0,
bitmap, 2000L))
slaveTracker.updateEpoch(masterTracker.getEpoch)
// Query for all mappers output for multiple reducers, since there are merged shuffles,
// batch fetch should be disabled.
val mapSizesByExecutorId =
slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0, Int.MaxValue, 0, 2)
assert(mapSizesByExecutorId.enableBatchFetch === false)
masterTracker.unregisterShuffle(10)
masterTracker.stop()
rpcEnv.shutdown()
}
} }

View file

@ -3155,7 +3155,7 @@ See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this fe
# Push-based shuffle overview # Push-based shuffle overview
Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce when merged output is available.
<p> Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.</p> <p> Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.</p>