[SPARK-34363][CORE] Add an option for limiting storage for migrated shuffle blocks

### What changes were proposed in this pull request?

Allow users to configure a maximum amount of shuffle blocks to be stored and reject remote shuffle blocks when this threshold is exceeded.

### Why are the changes needed?

In disk constrained environments with large amount of shuffle data, migrations may result in excessive disk pressure on the nodes. On Kube nodes this can result in cascading failures when combined with `emptyDir`.

### Does this PR introduce _any_ user-facing change?

Yes, new configuration parameter.

### How was this patch tested?

New unit tests.

Closes #31493 from holdenk/SPARK-34337-reject-disk-blocks-when-under-disk-pressure.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Signed-off-by: Holden Karau <hkarau@apple.com>
This commit is contained in:
Holden Karau 2021-02-09 10:21:56 -08:00
parent cf7a13c363
commit 2b51843ca4
4 changed files with 65 additions and 13 deletions

View file

@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE =
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize")
.doc("Maximum disk space to use to store shuffle blocks before rejecting remote " +
"shuffle blocks. Rejecting remote shuffle blocks means that an executor will not receive " +
"any shuffle migrations, and if there are no other executors available for migration " +
"then shuffle blocks will be lost unless " +
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")

View file

@ -22,8 +22,8 @@ import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.NioBufferedFileInputStream
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.client.StreamCallbackWithID
@ -56,6 +56,8 @@ private[spark] class IndexShuffleBlockResolver(
private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
private val remoteShuffleMaxDisk: Option[Long] =
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE)
def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None)
@ -72,6 +74,13 @@ private[spark] class IndexShuffleBlockResolver(
}
}
private def getShuffleBytesStored(): Long = {
val shuffleFiles: Seq[File] = getStoredShuffles().map {
si => getDataFile(si.shuffleId, si.mapId)
}
shuffleFiles.map(_.length()).sum
}
/**
* Get the shuffle data file.
*
@ -173,6 +182,13 @@ private[spark] class IndexShuffleBlockResolver(
*/
override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager):
StreamCallbackWithID = {
// Throw an exception if we have exceeded maximum shuffle files stored
remoteShuffleMaxDisk.foreach { maxBytes =>
val bytesUsed = getShuffleBytesStored()
if (maxBytes < bytesUsed) {
throw new SparkException(s"Not storing remote shuffles $bytesUsed exceeds $maxBytes")
}
}
val file = blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
getIndexFile(shuffleId, mapId)

View file

@ -37,6 +37,7 @@ trait MigratableResolver {
/**
* Write a provided shuffle block as a stream. Used for block migrations.
* Up to the implementation to support STORAGE_REMOTE_SHUFFLE_MAX_DISK
*/
def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager):
StreamCallbackWithID

View file

@ -103,8 +103,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.set(PUSH_BASED_SHUFFLE_ENABLED, true)
}
private def makeSortShuffleManager(): SortShuffleManager = {
val newMgr = new SortShuffleManager(new SparkConf(false))
private def makeSortShuffleManager(conf: Option[SparkConf] = None): SortShuffleManager = {
val newMgr = new SortShuffleManager(conf.getOrElse(new SparkConf(false)))
sortShuffleManagers += newMgr
newMgr
}
@ -1932,22 +1932,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
}
test("test migration of shuffle blocks during decommissioning") {
val shuffleManager1 = makeSortShuffleManager()
private def testShuffleBlockDecommissioning(maxShuffleSize: Option[Int], willReject: Boolean) = {
maxShuffleSize.foreach{ size =>
conf.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE.key, s"${size}b")
}
val shuffleManager1 = makeSortShuffleManager(Some(conf))
val bm1 = makeBlockManager(3500, "exec1", shuffleManager = shuffleManager1)
shuffleManager1.shuffleBlockResolver._blockManager = bm1
val shuffleManager2 = makeSortShuffleManager()
val shuffleManager2 = makeSortShuffleManager(Some(conf))
val bm2 = makeBlockManager(3500, "exec2", shuffleManager = shuffleManager2)
shuffleManager2.shuffleBlockResolver._blockManager = bm2
val blockSize = 5
val shuffleDataBlockContent = Array[Byte](0, 1, 2, 3, 4)
val shuffleData = ShuffleDataBlockId(0, 0, 0)
val shuffleData2 = ShuffleDataBlockId(1, 0, 0)
Files.write(bm1.diskBlockManager.getFile(shuffleData).toPath(), shuffleDataBlockContent)
Files.write(bm2.diskBlockManager.getFile(shuffleData2).toPath(), shuffleDataBlockContent)
val shuffleIndexBlockContent = Array[Byte](5, 6, 7, 8, 9)
val shuffleIndex = ShuffleIndexBlockId(0, 0, 0)
val shuffleIndex2 = ShuffleIndexBlockId(1, 0, 0)
Files.write(bm1.diskBlockManager.getFile(shuffleIndex).toPath(), shuffleIndexBlockContent)
Files.write(bm2.diskBlockManager.getFile(shuffleIndex2).toPath(), shuffleIndexBlockContent)
mapOutputTracker.registerShuffle(0, 1)
val decomManager = new BlockManagerDecommissioner(conf, bm1)
@ -1961,6 +1968,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
decomManager.refreshOffloadingShuffleBlocks()
if (willReject) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === bm2.blockManagerId)
}
@ -1968,6 +1976,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
=== shuffleDataBlockContent)
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
=== shuffleIndexBlockContent)
} else {
Thread.sleep(1000)
assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === bm1.blockManagerId)
}
} finally {
mapOutputTracker.unregisterShuffle(0)
// Avoid thread leak
@ -1975,6 +1987,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}
test("test migration of shuffle blocks during decommissioning - no limit") {
testShuffleBlockDecommissioning(None, true)
}
test("test migration of shuffle blocks during decommissioning - larger limit") {
testShuffleBlockDecommissioning(Some(10000), true)
}
test("[SPARK-34363]test migration of shuffle blocks during decommissioning - small limit") {
testShuffleBlockDecommissioning(Some(1), false)
}
test("SPARK-32919: Shuffle push merger locations should be bounded with in" +
" spark.shuffle.push.retainedMergerLocations") {
assert(master.getShufflePushMergerLocations(10, Set.empty).isEmpty)