[SPARK-34054][CORE] BlockManagerDecommissioner code cleanup

### What changes were proposed in this pull request?

This PR cleans up the code of `BlockManagerDecommissioner`. It includes a few changes:

* Only create `BlockManagerDecommissioner` instance when shuffle or RDD blocks requires migration:
   there's no need to create `BlockManagerDecommissioner` instance if only `STORAGE_DECOMMISSION_ENABLED=true` and to check blocks migration in `shutdownThread`.

* Shut down the migration thread more gracefully:

  1. we'd better not log errors if the `BlockManagerDecommissioner.stop()` is invoked explicitly. But currently, users will see
    <details>

      <summary>error message</summary>

    ```
    21/01/04 20:11:52 ERROR BlockManagerDecommissioner: Error while waiting for block to migrate
    java.lang.InterruptedException: sleep interrupted
	    at java.lang.Thread.sleep(Native Method)
	    at org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:83)
	    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
	    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
	    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	    at java.lang.Thread.run(Thread.java:748)
    ```
    </details>

   2. don't shut down a thread pool like below since `shutdown()` actually doesn't block to wait for running tasks finish:

      ```scala
      executor.shutdown()
      executor.shutdownNow()
      ```

* Avoid initiating `shuffleMigrationPool` when it's unnecessary:
 Currently, it's always initiated even if shuffle block migration is disabled. (`BlockManagerDecommissioner.stop()` -> `stopOffloadingShuffleBlocks()` -> initiate `shuffleMigrationPool`)

* Unify the terminologies between `offload` and `migrate`:
  replace `offload` with `migrate`

* Do not add back the shuffle blocks when it exceeds the max failure number:
   this avoids unnecessary operations

* Do not try `decommissionRddCacheBlocks()` if we already know there are no available peers

* Clean up logs:
   Currently, we have many different description for the same thing, which is not good for the user experience

* Other cleanups

### Why are the changes needed?

code clean up

### Does this PR introduce _any_ user-facing change?

Yes, users will not see misleading logs, e.g., the interrupted error.

### How was this patch tested?

Update a unite test since we change the behavior of creating the `BlockManagerDecommissioner` instance.

Other changes are only code cleanup so they won't cause behaviour change. So passing the existing tests should be enough.

Closes #31102 from Ngone51/stop-decommission-gracefully.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yi.wu 2021-06-17 15:00:18 +00:00 committed by Wenchen Fan
parent ee2d8ae322
commit 509c076bc0
5 changed files with 188 additions and 182 deletions

View file

@ -296,8 +296,15 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(msg) logInfo(msg)
try { try {
decommissioned = true decommissioned = true
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { val migrationEnabled = env.conf.get(STORAGE_DECOMMISSION_ENABLED) &&
(env.conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) ||
env.conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
if (migrationEnabled) {
env.blockManager.decommissionBlockManager() env.blockManager.decommissionBlockManager()
} else if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
logError(s"Storage decommissioning attempted but neither " +
s"${STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key} or " +
s"${STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key} is enabled ")
} }
if (executor != null) { if (executor != null) {
executor.decommission() executor.decommission()
@ -324,7 +331,7 @@ private[spark] class CoarseGrainedExecutorBackend(
while (true) { while (true) {
logInfo("Checking to see if we can shutdown.") logInfo("Checking to see if we can shutdown.")
if (executor == null || executor.numRunningTasks == 0) { if (executor == null || executor.numRunningTasks == 0) {
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { if (migrationEnabled) {
logInfo("No running tasks, checking migrations") logInfo("No running tasks, checking migrations")
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
// We can only trust allBlocksMigrated boolean value if there were no tasks running // We can only trust allBlocksMigrated boolean value if there were no tasks running
@ -340,7 +347,7 @@ private[spark] class CoarseGrainedExecutorBackend(
exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
} }
} else { } else {
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks")
// If there is a running task it could store blocks, so make sure we wait for a // If there is a running task it could store blocks, so make sure we wait for a
// migration loop to complete after the last task is done. // migration loop to complete after the last task is done.
// Note: this is only advanced if there is a running task, if there // Note: this is only advanced if there is a running task, if there

View file

@ -25,4 +25,6 @@ import org.apache.spark.annotation.Experimental
* type safe way. * type safe way.
*/ */
@Experimental @Experimental
case class ShuffleBlockInfo(shuffleId: Int, mapId: Long) case class ShuffleBlockInfo(shuffleId: Int, mapId: Long) {
override def toString: String = s"migrate_shuffle_${shuffleId}_$mapId"
}

View file

@ -33,7 +33,7 @@ import org.apache.spark.util.ThreadUtils
/** /**
* Class to handle block manager decommissioning retries. * Class to handle block manager decommissioning retries.
* It creates a Thread to retry offloading all RDD cache and Shuffle blocks * It creates a Thread to retry migrating all RDD cache and Shuffle blocks
*/ */
private[storage] class BlockManagerDecommissioner( private[storage] class BlockManagerDecommissioner(
conf: SparkConf, conf: SparkConf,
@ -66,86 +66,104 @@ private[storage] class BlockManagerDecommissioner(
* the chance of migrating all shuffle blocks before the executor is forced to exit. * the chance of migrating all shuffle blocks before the executor is forced to exit.
*/ */
private class ShuffleMigrationRunnable(peer: BlockManagerId) extends Runnable { private class ShuffleMigrationRunnable(peer: BlockManagerId) extends Runnable {
@volatile var running = true @volatile var keepRunning = true
override def run(): Unit = {
var migrating: Option[(ShuffleBlockInfo, Int)] = None
logInfo(s"Starting migration thread for ${peer}")
// Once a block fails to transfer to an executor stop trying to transfer more blocks
try {
while (running && !Thread.interrupted()) {
migrating = Option(shufflesToMigrate.poll())
migrating match {
case None =>
logDebug("Nothing to migrate")
// Nothing to do right now, but maybe a transfer will fail or a new block
// will finish being committed.
val SLEEP_TIME_SECS = 1
Thread.sleep(SLEEP_TIME_SECS * 1000L)
case Some((shuffleBlockInfo, retryCount)) =>
if (retryCount < maxReplicationFailuresForDecommission) {
val blocks = bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
if (blocks.isEmpty) {
logInfo(s"Ignore empty shuffle block $shuffleBlockInfo")
} else {
logInfo(s"Got migration sub-blocks ${blocks}")
logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to ${peer} " +
s"($retryCount / $maxReplicationFailuresForDecommission)")
// Migrate the components of the blocks. private def allowRetry(shuffleBlock: ShuffleBlockInfo, failureNum: Int): Boolean = {
try { if (failureNum < maxReplicationFailuresForDecommission) {
blocks.foreach { case (blockId, buffer) => logInfo(s"Add $shuffleBlock back to migration queue for " +
logDebug(s"Migrating sub-block ${blockId}") s"retry ($failureNum / $maxReplicationFailuresForDecommission)")
bm.blockTransferService.uploadBlockSync( // The block needs to retry so we should not mark it as finished
peer.host, shufflesToMigrate.add((shuffleBlock, failureNum))
peer.port, } else {
peer.executorId, logWarning(s"Give up migrating $shuffleBlock since it's been " +
blockId, s"failed for $maxReplicationFailuresForDecommission times")
buffer, false
StorageLevel.DISK_ONLY, }
null) // class tag, we don't need for shuffle }
logDebug(s"Migrated sub block ${blockId}")
} private def nextShuffleBlockToMigrate(): (ShuffleBlockInfo, Int) = {
logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}") while (!Thread.currentThread().isInterrupted) {
} catch { Option(shufflesToMigrate.poll()) match {
case e: IOException => case Some(head) => return head
// If a block got deleted before netty opened the file handle, then trying to // Nothing to do right now, but maybe a transfer will fail or a new block
// load the blocks now will fail. This is most likely to occur if we start // will finish being committed.
// migrating blocks and then the shuffle TTL cleaner kicks in. However this case None => Thread.sleep(1000)
// could also happen with manually managed shuffles or a GC event on the
// driver a no longer referenced RDD with shuffle files.
if (bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).isEmpty) {
logWarning(s"Skipping block ${shuffleBlockInfo}, block deleted.")
} else if (fallbackStorage.isDefined) {
fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
} else {
throw e
}
}
}
} else {
logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}")
}
numMigratedShuffles.incrementAndGet()
}
} }
// This catch is intentionally outside of the while running block. }
// if we encounter errors migrating to an executor we want to stop. throw new InterruptedException()
} catch { }
case e: Exception =>
migrating match { override def run(): Unit = {
case Some((shuffleMap, retryCount)) => logInfo(s"Starting shuffle block migration thread for $peer")
logError(s"Error during migration, adding ${shuffleMap} back to migration queue", e) // Once a block fails to transfer to an executor stop trying to transfer more blocks
shufflesToMigrate.add((shuffleMap, retryCount + 1)) while (keepRunning) {
running = false try {
case None => val (shuffleBlockInfo, retryCount) = nextShuffleBlockToMigrate()
logError(s"Error while waiting for block to migrate", e) val blocks = bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
// We only migrate a shuffle block when both index file and data file exist.
if (blocks.isEmpty) {
logInfo(s"Ignore deleted shuffle block $shuffleBlockInfo")
} else {
logInfo(s"Got migration sub-blocks $blocks. Trying to migrate $shuffleBlockInfo " +
s"to $peer ($retryCount / $maxReplicationFailuresForDecommission)")
// Migrate the components of the blocks.
try {
blocks.foreach { case (blockId, buffer) =>
logDebug(s"Migrating sub-block ${blockId}")
bm.blockTransferService.uploadBlockSync(
peer.host,
peer.port,
peer.executorId,
blockId,
buffer,
StorageLevel.DISK_ONLY,
null) // class tag, we don't need for shuffle
logDebug(s"Migrated sub-block $blockId")
}
logInfo(s"Migrated $shuffleBlockInfo to $peer")
} catch {
case e: IOException =>
// If a block got deleted before netty opened the file handle, then trying to
// load the blocks now will fail. This is most likely to occur if we start
// migrating blocks and then the shuffle TTL cleaner kicks in. However this
// could also happen with manually managed shuffles or a GC event on the
// driver a no longer referenced RDD with shuffle files.
if (bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).size < blocks.size) {
logWarning(s"Skipping block $shuffleBlockInfo, block deleted.")
} else if (fallbackStorage.isDefined) {
fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
} else {
logError(s"Error occurred during migrating $shuffleBlockInfo", e)
keepRunning = false
}
case e: Exception =>
logError(s"Error occurred during migrating $shuffleBlockInfo", e)
keepRunning = false
}
} }
if (keepRunning) {
numMigratedShuffles.incrementAndGet()
} else {
logWarning(s"Stop migrating shuffle blocks to $peer")
// Do not mark the block as migrated if it still needs retry
if (!allowRetry(shuffleBlockInfo, retryCount + 1)) {
numMigratedShuffles.incrementAndGet()
}
}
} catch {
case _: InterruptedException =>
logInfo(s"Stop shuffle block migration${if (keepRunning) " unexpectedly"}.")
keepRunning = false
case NonFatal(e) =>
keepRunning = false
logError("Error occurred during shuffle blocks migration.", e)
}
} }
} }
} }
// Shuffles which are either in queue for migrations or migrated // Shuffles which are either in queue for migrations or migrated
protected[storage] val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]() private[storage] val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()
// Shuffles which have migrated. This used to know when we are "done", being done can change // Shuffles which have migrated. This used to know when we are "done", being done can change
// if a new shuffle file is created by a running task. // if a new shuffle file is created by a running task.
@ -166,88 +184,89 @@ private[storage] class BlockManagerDecommissioner(
private val migrationPeers = private val migrationPeers =
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
private lazy val rddBlockMigrationExecutor = private val rddBlockMigrationExecutor =
ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd") if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd"))
} else None
private val rddBlockMigrationRunnable = new Runnable { private val rddBlockMigrationRunnable = new Runnable {
val sleepInterval = conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) val sleepInterval = conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
override def run(): Unit = { override def run(): Unit = {
assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) logInfo("Attempting to migrate all RDD blocks")
while (!stopped && !stoppedRDD && !Thread.interrupted()) { while (!stopped && !stoppedRDD) {
logInfo("Iterating on migrating from the block manager.") // Validate if we have peers to migrate to. Otherwise, give up migration.
// Validate we have peers to migrate to. if (bm.getPeers(false).isEmpty) {
val peers = bm.getPeers(false) logWarning("No available peers to receive RDD blocks, stop migration.")
// If we have no peers give up.
if (peers.isEmpty) {
stopped = true
stoppedRDD = true stoppedRDD = true
} } else {
try { try {
val startTime = System.nanoTime() val startTime = System.nanoTime()
logDebug("Attempting to replicate all cached RDD blocks") logInfo("Attempting to migrate all cached RDD blocks")
rddBlocksLeft = decommissionRddCacheBlocks() rddBlocksLeft = decommissionRddCacheBlocks()
lastRDDMigrationTime = startTime lastRDDMigrationTime = startTime
logInfo("Attempt to replicate all cached blocks done") logInfo(s"Finished current round RDD blocks migration, " +
logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") s"waiting for ${sleepInterval}ms before the next round migration.")
Thread.sleep(sleepInterval) Thread.sleep(sleepInterval)
} catch { } catch {
case e: InterruptedException => case _: InterruptedException =>
logInfo("Interrupted during RDD migration, stopping") logInfo(s"Stop RDD blocks migration${if (!stopped && !stoppedRDD) " unexpectedly"}.")
stoppedRDD = true stoppedRDD = true
case NonFatal(e) => case NonFatal(e) =>
logError("Error occurred replicating RDD for block manager decommissioning.", logError("Error occurred during RDD blocks migration.", e)
e) stoppedRDD = true
stoppedRDD = true }
} }
} }
} }
} }
private lazy val shuffleBlockMigrationRefreshExecutor = private val shuffleBlockMigrationRefreshExecutor =
ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle") if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle"))
} else None
private val shuffleBlockMigrationRefreshRunnable = new Runnable { private val shuffleBlockMigrationRefreshRunnable = new Runnable {
val sleepInterval = conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) val sleepInterval = conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
override def run(): Unit = { override def run(): Unit = {
assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) logInfo("Attempting to migrate all shuffle blocks")
while (!stopped && !stoppedShuffle && !Thread.interrupted()) { while (!stopped && !stoppedShuffle) {
try { try {
logDebug("Attempting to replicate all shuffle blocks")
val startTime = System.nanoTime() val startTime = System.nanoTime()
shuffleBlocksLeft = refreshOffloadingShuffleBlocks() shuffleBlocksLeft = refreshMigratableShuffleBlocks()
lastShuffleMigrationTime = startTime lastShuffleMigrationTime = startTime
logInfo("Done starting workers to migrate shuffle blocks") logInfo(s"Finished current round refreshing migratable shuffle blocks, " +
s"waiting for ${sleepInterval}ms before the next round refreshing.")
Thread.sleep(sleepInterval) Thread.sleep(sleepInterval)
} catch { } catch {
case e: InterruptedException => case _: InterruptedException if stopped =>
logInfo("Interrupted during migration, will not refresh migrations.") logInfo("Stop refreshing migratable shuffle blocks.")
stoppedShuffle = true
case NonFatal(e) => case NonFatal(e) =>
logError("Error occurred while trying to replicate for block manager decommissioning.", logError("Error occurred during shuffle blocks migration.", e)
e)
stoppedShuffle = true stoppedShuffle = true
} }
} }
} }
} }
lazy val shuffleMigrationPool = ThreadUtils.newDaemonCachedThreadPool( private val shuffleMigrationPool =
"migrate-shuffles", if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_THREADS)) Some(ThreadUtils.newDaemonCachedThreadPool("migrate-shuffles",
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_THREADS)))
} else None
/** /**
* Tries to offload all shuffle blocks that are registered with the shuffle service locally. * Tries to migrate all shuffle blocks that are registered with the shuffle service locally.
* Note: this does not delete the shuffle files in-case there is an in-progress fetch * Note: this does not delete the shuffle files in-case there is an in-progress fetch
* but rather shadows them. * but rather shadows them.
* Requires an Indexed based shuffle resolver. * Requires an Indexed based shuffle resolver.
* Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage. * Note: if called in testing please call stopMigratingShuffleBlocks to avoid thread leakage.
* Returns true if we are not done migrating shuffle blocks. * Returns true if we are not done migrating shuffle blocks.
*/ */
private[storage] def refreshOffloadingShuffleBlocks(): Boolean = { private[storage] def refreshMigratableShuffleBlocks(): Boolean = {
// Update the queue of shuffles to be migrated // Update the queue of shuffles to be migrated
logInfo("Offloading shuffle blocks") logInfo("Start refreshing migratable shuffle blocks")
val localShuffles = bm.migratableResolver.getStoredShuffles().toSet val localShuffles = bm.migratableResolver.getStoredShuffles().toSet
val newShufflesToMigrate = (localShuffles.diff(migratingShuffles)).toSeq val newShufflesToMigrate = (localShuffles.diff(migratingShuffles)).toSeq
.sortBy(b => (b.shuffleId, b.mapId)) .sortBy(b => (b.shuffleId, b.mapId))
@ -264,15 +283,14 @@ private[storage] class BlockManagerDecommissioner(
migrationPeers ++= newPeers.map { peer => migrationPeers ++= newPeers.map { peer =>
logDebug(s"Starting thread to migrate shuffle blocks to ${peer}") logDebug(s"Starting thread to migrate shuffle blocks to ${peer}")
val runnable = new ShuffleMigrationRunnable(peer) val runnable = new ShuffleMigrationRunnable(peer)
shuffleMigrationPool.submit(runnable) shuffleMigrationPool.foreach(_.submit(runnable))
(peer, runnable) (peer, runnable)
} }
// A peer may have entered a decommissioning state, don't transfer any new blocks // A peer may have entered a decommissioning state, don't transfer any new blocks
deadPeers.foreach { peer => deadPeers.foreach(migrationPeers.get(_).foreach(_.keepRunning = false))
migrationPeers.get(peer).foreach(_.running = false)
}
// If we don't have anyone to migrate to give up // If we don't have anyone to migrate to give up
if (!migrationPeers.values.exists(_.running)) { if (!migrationPeers.values.exists(_.keepRunning)) {
logWarning("No available peers to receive Shuffle blocks, stop migration.")
stoppedShuffle = true stoppedShuffle = true
} }
// If we found any new shuffles to migrate or otherwise have not migrated everything. // If we found any new shuffles to migrate or otherwise have not migrated everything.
@ -282,16 +300,17 @@ private[storage] class BlockManagerDecommissioner(
/** /**
* Stop migrating shuffle blocks. * Stop migrating shuffle blocks.
*/ */
private[storage] def stopOffloadingShuffleBlocks(): Unit = { private[storage] def stopMigratingShuffleBlocks(): Unit = {
logInfo("Stopping offloading shuffle blocks.") shuffleMigrationPool.foreach { threadPool =>
// Stop as gracefully as possible. logInfo("Stopping migrating shuffle blocks.")
migrationPeers.values.foreach{ _.running = false } // Stop as gracefully as possible.
shuffleMigrationPool.shutdown() migrationPeers.values.foreach(_.keepRunning = false)
shuffleMigrationPool.shutdownNow() threadPool.shutdownNow()
}
} }
/** /**
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Tries to migrate all cached RDD blocks from this BlockManager to peer BlockManagers
* Visible for testing * Visible for testing
* Returns true if we have not migrated all of our RDD blocks. * Returns true if we have not migrated all of our RDD blocks.
*/ */
@ -328,30 +347,19 @@ private[storage] class BlockManagerDecommissioner(
blockToReplicate.maxReplicas, blockToReplicate.maxReplicas,
maxReplicationFailures = Some(maxReplicationFailuresForDecommission)) maxReplicationFailures = Some(maxReplicationFailuresForDecommission))
if (replicatedSuccessfully) { if (replicatedSuccessfully) {
logInfo(s"Block ${blockToReplicate.blockId} offloaded successfully, Removing block now") logInfo(s"Block ${blockToReplicate.blockId} migrated successfully, Removing block now")
bm.removeBlock(blockToReplicate.blockId) bm.removeBlock(blockToReplicate.blockId)
logInfo(s"Block ${blockToReplicate.blockId} removed") logInfo(s"Block ${blockToReplicate.blockId} removed")
} else { } else {
logWarning(s"Failed to offload block ${blockToReplicate.blockId}") logWarning(s"Failed to migrate block ${blockToReplicate.blockId}")
} }
replicatedSuccessfully replicatedSuccessfully
} }
def start(): Unit = { def start(): Unit = {
logInfo("Starting block migration thread") logInfo("Starting block migration")
if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { rddBlockMigrationExecutor.foreach(_.submit(rddBlockMigrationRunnable))
rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable) shuffleBlockMigrationRefreshExecutor.foreach(_.submit(shuffleBlockMigrationRefreshRunnable))
}
if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
shuffleBlockMigrationRefreshExecutor.submit(shuffleBlockMigrationRefreshRunnable)
}
if (!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) &&
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
logError(s"Storage decommissioning attempted but neither " +
s"${config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key} or " +
s"${config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key} is enabled ")
stopped = true
}
} }
def stop(): Unit = { def stop(): Unit = {
@ -361,37 +369,24 @@ private[storage] class BlockManagerDecommissioner(
stopped = true stopped = true
} }
try { try {
rddBlockMigrationExecutor.shutdown() rddBlockMigrationExecutor.foreach(_.shutdownNow())
} catch { } catch {
case e: Exception => case NonFatal(e) =>
logError(s"Error during shutdown", e) logError(s"Error during shutdown RDD block migration thread", e)
} }
try { try {
shuffleBlockMigrationRefreshExecutor.shutdown() shuffleBlockMigrationRefreshExecutor.foreach(_.shutdownNow())
} catch { } catch {
case e: Exception => case NonFatal(e) =>
logError(s"Error during shutdown", e) logError(s"Error during shutdown shuffle block refreshing thread", e)
} }
try { try {
stopOffloadingShuffleBlocks() stopMigratingShuffleBlocks()
} catch { } catch {
case e: Exception => case NonFatal(e) =>
logError(s"Error during shutdown", e) logError(s"Error during shutdown shuffle block migration thread", e)
} }
logInfo("Forcing block migrations threads to stop") logInfo("Stopped block migration")
try {
rddBlockMigrationExecutor.shutdownNow()
} catch {
case e: Exception =>
logError(s"Error during shutdown", e)
}
try {
shuffleBlockMigrationRefreshExecutor.shutdownNow()
} catch {
case e: Exception =>
logError(s"Error during shutdown", e)
}
logInfo("Stopped storage decommissioner")
} }
/* /*

View file

@ -48,6 +48,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
.setMaster("local-cluster[2, 1, 1024]") .setMaster("local-cluster[2, 1, 1024]")
.set(config.DECOMMISSION_ENABLED, true) .set(config.DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, isEnabled) .set(config.STORAGE_DECOMMISSION_ENABLED, isEnabled)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, isEnabled)
sc = new SparkContext(conf) sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 60000) TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val executors = sc.getExecutorIds().toArray val executors = sc.getExecutorIds().toArray

View file

@ -1906,7 +1906,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2)) assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2))
} }
test("test decommissionRddCacheBlocks should offload all cached blocks") { test("test decommissionRddCacheBlocks should migrate all cached blocks") {
val store1 = makeBlockManager(1000, "exec1") val store1 = makeBlockManager(1000, "exec1")
val store2 = makeBlockManager(1000, "exec2") val store2 = makeBlockManager(1000, "exec2")
val store3 = makeBlockManager(1000, "exec3") val store3 = makeBlockManager(1000, "exec3")
@ -1924,7 +1924,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store3.blockManagerId)) store3.blockManagerId))
} }
test("test decommissionRddCacheBlocks should keep the block if it is not able to offload") { test("test decommissionRddCacheBlocks should keep the block if it is not able to migrate") {
val store1 = makeBlockManager(3500, "exec1") val store1 = makeBlockManager(3500, "exec1")
val store2 = makeBlockManager(1000, "exec2") val store2 = makeBlockManager(1000, "exec2")
@ -1940,9 +1940,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val decomManager = new BlockManagerDecommissioner(conf, store1) val decomManager = new BlockManagerDecommissioner(conf, store1)
decomManager.decommissionRddCacheBlocks() decomManager.decommissionRddCacheBlocks()
// Smaller block offloaded to store2 // Smaller block migrated to store2
assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId)) assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId))
// Larger block still present in store1 as it can't be offloaded // Larger block still present in store1 as it can't be migrated
assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
} }
@ -1973,7 +1973,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
Files.write(bm2.diskBlockManager.getFile(shuffleIndex2).toPath(), shuffleIndexBlockContent) Files.write(bm2.diskBlockManager.getFile(shuffleIndex2).toPath(), shuffleIndexBlockContent)
mapOutputTracker.registerShuffle(0, 2, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) mapOutputTracker.registerShuffle(0, 2, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
val decomManager = new BlockManagerDecommissioner(conf, bm1) val decomManager = new BlockManagerDecommissioner(
conf.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true), bm1)
try { try {
mapOutputTracker.registerMapOutput(0, 0, MapStatus(bm1.blockManagerId, Array(blockSize), 0)) mapOutputTracker.registerMapOutput(0, 0, MapStatus(bm1.blockManagerId, Array(blockSize), 0))
mapOutputTracker.registerMapOutput(0, 1, MapStatus(bm1.blockManagerId, Array(blockSize), 1)) mapOutputTracker.registerMapOutput(0, 1, MapStatus(bm1.blockManagerId, Array(blockSize), 1))
@ -1984,7 +1985,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
when(env.conf).thenReturn(conf) when(env.conf).thenReturn(conf)
SparkEnv.set(env) SparkEnv.set(env)
decomManager.refreshOffloadingShuffleBlocks() decomManager.refreshMigratableShuffleBlocks()
if (willReject) { if (willReject) {
eventually(timeout(1.second), interval(10.milliseconds)) { eventually(timeout(1.second), interval(10.milliseconds)) {
@ -2002,7 +2003,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
} finally { } finally {
mapOutputTracker.unregisterShuffle(0) mapOutputTracker.unregisterShuffle(0)
// Avoid thread leak // Avoid thread leak
decomManager.stopOffloadingShuffleBlocks() decomManager.stopMigratingShuffleBlocks()
} }
} }
@ -2076,7 +2077,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
when(bm.getPeers(mc.any())).thenReturn(Seq.empty) when(bm.getPeers(mc.any())).thenReturn(Seq.empty)
val decomManager = new BlockManagerDecommissioner(conf, bm) val decomManager = new BlockManagerDecommissioner(conf, bm)
decomManager.refreshOffloadingShuffleBlocks() decomManager.refreshMigratableShuffleBlocks()
assert(sortedBlocks.sameElements(decomManager.shufflesToMigrate.asScala.map(_._1))) assert(sortedBlocks.sameElements(decomManager.shufflesToMigrate.asScala.map(_._1)))
} }