[SPARK-36423][SHUFFLE] Randomize order of blocks in a push request to improve block merge ratio for push-based shuffle

### What changes were proposed in this pull request?

On the client side, we are currently randomizing the order of push requests before processing each request. In addition we can further randomize the order of blocks within each push request before pushing them.
In our benchmark, this has resulted in a 60%-70% reduction of blocks that fail to be merged due to bock collision (the existing block merge ratio is already pretty good in general, and this further improves it).

### Why are the changes needed?

Improve block merge ratio for push-based shuffle

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Straightforward small change, no additional test needed.

Closes #33649 from Victsm/SPARK-36423.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
This commit is contained in:
Min Shen 2021-08-06 09:47:42 -05:00 committed by Mridul Muralidharan
parent 41b011e416
commit 6e729515fd

View file

@ -242,10 +242,16 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
handleResult(PushResult(blockId, exception))
}
}
// In addition to randomizing the order of the push requests, further randomize the order
// of blocks within the push request to further reduce the likelihood of shuffle server side
// collision of pushed blocks. This does not increase the cost of reading unmerged shuffle
// files on the executor side, because we are still reading MB-size chunks and only randomize
// the in-memory sliced buffers post reading.
val (blockPushIds, blockPushBuffers) = Utils.randomize(blockIds.zip(
sliceReqBufferIntoBlockBuffers(request.reqBuffer, request.blocks.map(_._2)))).unzip
SparkEnv.get.blockManager.blockStoreClient.pushBlocks(
address.host, address.port, blockIds.toArray,
sliceReqBufferIntoBlockBuffers(request.reqBuffer, request.blocks.map(_._2)),
blockPushListener)
address.host, address.port, blockPushIds.toArray,
blockPushBuffers.toArray, blockPushListener)
}
/**