[SPARK-32850][CORE][K8S] Simplify the RPC message flow of decommission

### What changes were proposed in this pull request?

This PR cleans up the RPC message flow among the multiple decommission use cases, it includes changes:

* Keep `Worker`'s decommission status be consistent between the case where decommission starts from `Worker` and the case where decommission starts from the `MasterWebUI`: sending `DecommissionWorker` from `Master` to `Worker` in the latter case.

* Change from two-way communication to one-way communication when notifying decommission between driver and executor: it's obviously unnecessary for the executor to acknowledge the decommission status to the driver since the decommission request is from the driver. And it's same in reverse.

* Only send one message instead of two(`DecommissionSelf`/`DecommissionBlockManager`) when decommission the executor: executor and `BlockManager` are in the same JVM.

* Clean up codes around here.

### Why are the changes needed?

Before:

<img width="1948" alt="WeChat56c00cc34d9785a67a544dca036d49da" src="https://user-images.githubusercontent.com/16397174/92850308-dc461c80-f41e-11ea-8ac0-287825f4e0c4.png">

After:
<img width="1968" alt="WeChat05f7afb017e3f0132394c5e54245e49e" src="https://user-images.githubusercontent.com/16397174/93189571-de88dd80-f774-11ea-9300-1943920aa27d.png">

(Note the diagrams only counts those RPC calls that needed to go through the network. Local RPC calls are not counted here.)

After this change, We reduced 6 original RPC calls and added one more RPC call for keeping the consistent decommission status for the Worker. And the RPC flow becomes more clear.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated existing tests.

Closes #29817 from Ngone51/simplify-decommission-rpc.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
yi.wu 2020-10-23 13:58:44 +09:00 committed by HyukjinKwon
parent 87b32f65ef
commit edeecada66
19 changed files with 257 additions and 166 deletions

View file

@ -91,11 +91,13 @@ private[spark] trait ExecutorAllocationClient {
* @param executorsAndDecomInfo identifiers of executors & decom info.
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been decommissioned.
* @param triggeredByExecutor whether the decommission is triggered at executor.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean): Seq[String] = {
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean): Seq[String] = {
killExecutors(executorsAndDecomInfo.map(_._1),
adjustTargetNumExecutors,
countFailures = false)
@ -109,14 +111,21 @@ private[spark] trait ExecutorAllocationClient {
* @param executorId identifiers of executor to decommission
* @param decommissionInfo information about the decommission (reason, host loss)
* @param adjustTargetNumExecutors if we should adjust the target number of executors.
* @param triggeredByExecutor whether the decommission is triggered at executor.
* (TODO: add a new type like `ExecutorDecommissionInfo` for the
* case where executor is decommissioned at executor first, so we
* don't need this extra parameter.)
* @return whether the request is acknowledged by the cluster manager.
*/
final def decommissionExecutor(executorId: String,
final def decommissionExecutor(
executorId: String,
decommissionInfo: ExecutorDecommissionInfo,
adjustTargetNumExecutors: Boolean): Boolean = {
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean = false): Boolean = {
val decommissionedExecutors = decommissionExecutors(
Array((executorId, decommissionInfo)),
adjustTargetNumExecutors = adjustTargetNumExecutors)
adjustTargetNumExecutors = adjustTargetNumExecutors,
triggeredByExecutor = triggeredByExecutor)
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
}

View file

@ -580,7 +580,10 @@ private[spark] class ExecutorAllocationManager(
if (decommissionEnabled) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
client.decommissionExecutors(
executorIdsWithoutHostLoss,
adjustTargetNumExecutors = false,
triggeredByExecutor = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)

View file

@ -61,13 +61,35 @@ private[deploy] object DeployMessages {
}
/**
* @param id the worker id
* @param worker the worker endpoint ref
* An internal message that used by Master itself, in order to handle the
* `DecommissionWorkersOnHosts` request from `MasterWebUI` asynchronously.
* @param ids A collection of Worker ids, which should be decommissioned.
*/
case class WorkerDecommission(
id: String,
worker: RpcEndpointRef)
extends DeployMessage
case class DecommissionWorkers(ids: Seq[String]) extends DeployMessage
/**
* A message that sent from Master to Worker to decommission the Worker.
* It's used for the case where decommission is triggered at MasterWebUI.
*
* Note that decommission a Worker will cause all the executors on that Worker
* to be decommissioned as well.
*/
object DecommissionWorker extends DeployMessage
/**
* A message that sent by the Worker to itself when it receives PWR signal,
* indicating the Worker starts to decommission.
*/
object WorkerSigPWRReceived extends DeployMessage
/**
* A message sent from Worker to Master to tell Master that the Worker has started
* decommissioning. It's used for the case where decommission is triggered at Worker.
*
* @param id the worker id
* @param workerRef the worker endpoint ref
*/
case class WorkerDecommissioning(id: String, workerRef: RpcEndpointRef) extends DeployMessage
case class ExecutorStateChanged(
appId: String,

View file

@ -245,8 +245,7 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
case WorkerDecommission(id, workerRef) =>
logInfo("Recording worker %s decommissioning".format(id))
case WorkerDecommissioning(id, workerRef) =>
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else {
@ -254,6 +253,19 @@ private[deploy] class Master(
idToWorker.get(id).foreach(decommissionWorker)
}
case DecommissionWorkers(ids) =>
// The caller has already checked the state when handling DecommissionWorkersOnHosts,
// so it should not be the STANDBY
assert(state != RecoveryState.STANDBY)
ids.foreach ( id =>
// We use foreach since get gives us an option and we can skip the failures.
idToWorker.get(id).foreach { w =>
decommissionWorker(w)
// Also send a message to the worker node to notify.
w.endpoint.send(DecommissionWorker)
}
)
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
masterAddress, resources) =>
@ -891,10 +903,7 @@ private[deploy] class Master(
logInfo(s"Decommissioning the workers with host:ports ${workersToRemoveHostPorts}")
// The workers are removed async to avoid blocking the receive loop for the entire batch
workersToRemove.foreach(wi => {
logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}")
self.send(WorkerDecommission(wi.id, wi.endpoint))
})
self.send(DecommissionWorkers(workersToRemove.map(_.id).toSeq))
// Return the count of workers actually removed
workersToRemove.size

View file

@ -70,7 +70,10 @@ private[deploy] class Worker(
if (conf.get(config.DECOMMISSION_ENABLED)) {
logInfo("Registering SIGPWR handler to trigger decommissioning.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
"disabling worker decommission feature.")(decommissionSelf)
"disabling worker decommission feature.") {
self.send(WorkerSigPWRReceived)
true
}
} else {
logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
}
@ -137,7 +140,8 @@ private[deploy] class Worker(
private var registered = false
private var connected = false
private var decommissioned = false
private val workerId = generateWorkerId()
// expose for test
private[spark] val workerId = generateWorkerId()
private val sparkHome =
if (sys.props.contains(IS_TESTING.key)) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
@ -668,8 +672,14 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)
case WorkerDecommission(_, _) =>
case DecommissionWorker =>
decommissionSelf()
case WorkerSigPWRReceived =>
decommissionSelf()
// Tell the Master that we are starting decommissioning
// so it stops trying to launch executor/driver on us
sendToMaster(WorkerDecommissioning(workerId, self))
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@ -768,16 +778,15 @@ private[deploy] class Worker(
}
}
private[deploy] def decommissionSelf(): Boolean = {
if (conf.get(config.DECOMMISSION_ENABLED)) {
logDebug("Decommissioning self")
private[deploy] def decommissionSelf(): Unit = {
if (conf.get(config.DECOMMISSION_ENABLED) && !decommissioned) {
decommissioned = true
sendToMaster(WorkerDecommission(workerId, self))
logInfo(s"Decommission worker $workerId.")
} else if (decommissioned) {
logWarning(s"Worker $workerId already started decommissioning.")
} else {
logWarning("Asked to decommission self, but decommissioning not enabled")
logWarning(s"Receive decommission request, but decommission feature is disabled.")
}
// Return true since can be called as a signal handler
true
}
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {

View file

@ -40,7 +40,7 @@ import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
@ -79,12 +79,14 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
@volatile private var decommissioned = false
private var decommissioned = false
override def onStart(): Unit = {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
"disabling decommission feature.")(decommissionSelf)
if (env.conf.get(DECOMMISSION_ENABLED)) {
logInfo("Registering PWR handler to trigger decommissioning.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
"disabling executor decommission feature.") (self.askSync[Boolean](ExecutorSigPWRReceived))
}
logInfo("Connecting to driver: " + driverUrl)
try {
@ -166,17 +168,6 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
if (decommissioned) {
val msg = "Asked to launch a task while decommissioned."
logError(msg)
driver match {
case Some(endpoint) =>
logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg)))
case _ =>
logError("No registered driver to send Decommission to.")
}
}
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
@ -213,11 +204,31 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
case DecommissionSelf =>
logInfo("Received decommission self")
case DecommissionExecutor =>
decommissionSelf()
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case ExecutorSigPWRReceived =>
var driverNotified = false
try {
driver.foreach { driverRef =>
// Tell driver that we are starting decommissioning so it stops trying to schedule us
driverNotified = driverRef.askSync[Boolean](ExecutorDecommissioning(executorId))
if (driverNotified) decommissionSelf()
}
} catch {
case e: Exception =>
if (driverNotified) {
logError("Fail to decommission self (but driver has been notified).", e)
} else {
logError("Fail to tell driver that we are starting decommissioning", e)
}
decommissioned = false
}
context.reply(decommissioned)
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
@ -264,17 +275,20 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}
private def decommissionSelf(): Boolean = {
val msg = "Decommissioning self w/sync"
private def decommissionSelf(): Unit = {
if (!env.conf.get(DECOMMISSION_ENABLED)) {
logWarning(s"Receive decommission request, but decommission feature is disabled.")
return
} else if (decommissioned) {
logWarning(s"Executor $executorId already started decommissioning.")
return
}
val msg = s"Decommission executor $executorId."
logInfo(msg)
try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(
executorId, ExecutorDecommissionInfo(msg)))
} else {
logError("No driver to message decommissioning.")
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
env.blockManager.decommissionBlockManager()
}
if (executor != null) {
executor.decommission()
@ -333,12 +347,10 @@ private[spark] class CoarseGrainedExecutorBackend(
shutdownThread.start()
logInfo("Will exit when finished decommissioning")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
decommissioned = false
logError("Unexpected error while decommissioning self", e)
false
}
}
}

View file

@ -95,8 +95,17 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
extends CoarseGrainedClusterMessage
case class DecommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo)
extends CoarseGrainedClusterMessage
// A message that sent from executor to driver to tell driver that the executor has started
// decommissioning. It's used for the case where decommission is triggered at executor (e.g., K8S)
case class ExecutorDecommissioning(executorId: String) extends CoarseGrainedClusterMessage
// A message that sent from driver to executor to decommission that executor.
// It's used for Standalone's cases, where decommission is triggered at MasterWebUI or Worker.
object DecommissionExecutor extends CoarseGrainedClusterMessage
// A message that sent to the executor itself when it receives PWR signal,
// indicating the executor starts to decommission.
object ExecutorSigPWRReceived extends CoarseGrainedClusterMessage
case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage
@ -136,7 +145,4 @@ private[spark] object CoarseGrainedClusterMessages {
// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage
// Used to ask an executor to decommission itself. (Can be an internal message)
case object DecommissionSelf extends CoarseGrainedClusterMessage
}

View file

@ -191,11 +191,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
// Do not change this code without running the K8s integration suites
case DecommissionExecutor(executorId, decommissionInfo) =>
logError(s"Received decommission executor message ${executorId}: $decommissionInfo")
decommissionExecutor(executorId, decommissionInfo, adjustTargetNumExecutors = false)
case RemoveWorker(workerId, host, message) =>
removeWorker(workerId, host, message)
@ -274,10 +269,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)
// Do not change this code without running the K8s integration suites
case DecommissionExecutor(executorId, decommissionInfo) =>
logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.")
context.reply(decommissionExecutor(executorId, decommissionInfo,
adjustTargetNumExecutors = false))
case ExecutorDecommissioning(executorId) =>
logWarning(s"Received executor $executorId decommissioned message")
context.reply(
decommissionExecutor(
executorId,
ExecutorDecommissionInfo(s"Executor $executorId is decommissioned."),
adjustTargetNumExecutors = false,
triggeredByExecutor = true))
case RetrieveSparkAppConfig(resourceProfileId) =>
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
@ -465,72 +464,50 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @param executorsAndDecomInfo Identifiers of executors & decommission info.
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been decommissioned.
* @param triggeredByExecutor whether the decommission is triggered at executor.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
override def decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean): Seq[String] = {
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean): Seq[String] = withLock {
// Do not change this code without running the K8s integration suites
val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) =>
CoarseGrainedSchedulerBackend.this.synchronized {
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
executorsPendingDecommission(executorId) = decomInfo.workerHost
true
} else {
false
}
val executorsToDecommission = executorsAndDecomInfo.flatMap { case (executorId, decomInfo) =>
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
scheduler.executorDecommission(executorId, decomInfo)
executorsPendingDecommission(executorId) = decomInfo.workerHost
Some(executorId)
} else {
None
}
}
logInfo(s"Decommission executors: ${executorsToDecommission.mkString(", ")}")
// If we don't want to replace the executors we are decommissioning
if (adjustTargetNumExecutors) {
adjustExecutors(executorsToDecommission.map(_._1))
adjustExecutors(executorsToDecommission)
}
executorsToDecommission.filter { case (executorId, decomInfo) =>
doDecommission(executorId, decomInfo)
}.map(_._1)
}
// Mark those corresponding BlockManagers as decommissioned first before we sending
// decommission notification to executors. So, it's less likely to lead to the race
// condition where `getPeer` request from the decommissioned executor comes first
// before the BlockManagers are marked as decommissioned.
// Note that marking BlockManager as decommissioned doesn't need depend on
// `spark.storage.decommission.enabled`. Because it's meaningless to save more blocks
// for the BlockManager since the executor will be shutdown soon.
scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
// Do not change this code without running the K8s integration suites
private def doDecommission(executorId: String,
decomInfo: ExecutorDecommissionInfo): Boolean = {
logInfo(s"Asking executor $executorId to decommissioning.")
scheduler.executorDecommission(executorId, decomInfo)
// Send decommission message to the executor (it could have originated on the executor
// but not necessarily).
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(DecommissionSelf)
case None =>
// Ignoring the executor since it is not registered.
logWarning(s"Attempted to decommission unknown executor $executorId.")
return false
if (!triggeredByExecutor) {
executorsToDecommission.foreach { executorId =>
logInfo(s"Notify executor $executorId to decommissioning.")
executorDataMap(executorId).executorEndpoint.send(DecommissionExecutor)
}
}
logInfo(s"Asked executor $executorId to decommission.")
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
try {
logInfo(s"Asking block manager corresponding to executor $executorId to decommission.")
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
} catch {
case e: Exception =>
logError("Unexpected error during block manager " +
s"decommissioning for executor $executorId: ${e.toString}", e)
return false
}
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
}
true
executorsToDecommission
}
override def start(): Unit = {
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager = createTokenManager()

View file

@ -178,9 +178,12 @@ private[spark] class StandaloneSchedulerBackend(
}
override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) {
logInfo("Asked to decommission executor")
logInfo(s"Asked to decommission executor $fullId")
val execId = fullId.split("/")(1)
decommissionExecutors(Array((execId, decommissionInfo)), adjustTargetNumExecutors = false)
decommissionExecutors(
Array((execId, decommissionInfo)),
adjustTargetNumExecutors = false,
triggeredByExecutor = false)
logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
}

View file

@ -56,7 +56,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter}
import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter}
import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
import org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager, ReplicateBlock}
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
@ -243,8 +243,9 @@ private[spark] class BlockManager(
private var blockReplicationPolicy: BlockReplicationPolicy = _
// visible for test
// This is volatile since if it's defined we should not accept remote blocks.
@volatile private var decommissioner: Option[BlockManagerDecommissioner] = None
@volatile private[spark] var decommissioner: Option[BlockManagerDecommissioner] = None
// A DownloadFileManager used to track all the files of remote blocks which are above the
// specified memory threshold. Files will be deleted automatically based on weak reference.
@ -1809,7 +1810,9 @@ private[spark] class BlockManager(
blocksToRemove.size
}
def decommissionBlockManager(): Unit = synchronized {
def decommissionBlockManager(): Unit = storageEndpoint.ask(DecommissionBlockManager)
private[spark] def decommissionSelf(): Unit = synchronized {
decommissioner match {
case None =>
logInfo("Starting block manager decommissioning process...")

View file

@ -163,8 +163,14 @@ class BlockManagerMasterEndpoint(
context.reply(true)
case DecommissionBlockManagers(executorIds) =>
val bmIds = executorIds.flatMap(blockManagerIdByExecutor.get)
decommissionBlockManagers(bmIds)
// Mark corresponding BlockManagers as being decommissioning by adding them to
// decommissioningBlockManagerSet, so they won't be used to replicate or migrate blocks.
// Note that BlockManagerStorageEndpoint will be notified about decommissioning when the
// executor is notified(see BlockManager.decommissionSelf), so we don't need to send the
// notification here.
val bms = executorIds.flatMap(blockManagerIdByExecutor.get)
logInfo(s"Mark BlockManagers (${bms.mkString(", ")}) as being decommissioning.")
decommissioningBlockManagerSet ++= bms
context.reply(true)
case GetReplicateInfoForRDDBlocks(blockManagerId) =>
@ -359,21 +365,6 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}
/**
* Decommission the given Seq of blockmanagers
* - Adds these block managers to decommissioningBlockManagerSet Set
* - Sends the DecommissionBlockManager message to each of the [[BlockManagerReplicaEndpoint]]
*/
def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = {
val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet)
val futures = newBlockManagersToDecommission.map { blockManagerId =>
decommissioningBlockManagerSet.add(blockManagerId)
val info = blockManagerInfo(blockManagerId)
info.storageEndpoint.ask[Unit](DecommissionBlockManager)
}
Future.sequence{ futures.toSeq }
}
/**
* Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId
* @param blockManagerId - block manager id for which ReplicateBlock info is needed

View file

@ -62,7 +62,7 @@ class BlockManagerStorageEndpoint(
}
case DecommissionBlockManager =>
context.reply(blockManager.decommissionBlockManager())
context.reply(blockManager.decommissionSelf())
case RemoveBroadcast(broadcastId, _) =>
doAsync[Int]("removing broadcast " + broadcastId, context) {

View file

@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState, WorkerDecommission}
import org.apache.spark.deploy.DeployMessages.{DecommissionWorkers, MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, Master, WorkerInfo}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.{config, Logging}
@ -414,7 +414,7 @@ class DecommissionWorkerSuite
def decommissionWorkerOnMaster(workerInfo: WorkerInfo, reason: String): Unit = {
logInfo(s"Trying to decommission worker ${workerInfo.id} for reason `$reason`")
master.self.send(WorkerDecommission(workerInfo.id, workerInfo.endpoint))
master.self.send(DecommissionWorkers(Seq(workerInfo.id)))
}
def killWorkerAfterTimeout(workerInfo: WorkerInfo, secondsToWait: Int): Unit = {

View file

@ -27,7 +27,7 @@ import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.apache.spark._
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState, WorkerDecommissioning}
import org.apache.spark.deploy.master.{ApplicationInfo, Master}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.{config, Logging}
@ -122,7 +122,11 @@ class AppClientSuite
// Send a decommission self to all the workers
// Note: normally the worker would send this on their own.
workers.foreach(worker => worker.decommissionSelf())
workers.foreach { worker =>
worker.decommissionSelf()
// send the notice to Master to tell the decommission of Workers
master.self.send(WorkerDecommissioning(worker.workerId, worker.self))
}
// Decommissioning is async.
eventually(timeout(1.seconds), interval(10.millis)) {

View file

@ -72,6 +72,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
})
}
var decommissioned = false
var appDesc = DeployTestUtils.createAppDesc()
val drivers = mutable.HashSet[String]()
val driverResources = new mutable.HashMap[String, Map[String, Set[String]]]
@ -96,6 +97,8 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
case None =>
}
driverIdToAppId.remove(driverId)
case DecommissionWorker =>
decommissioned = true
}
}
@ -742,9 +745,9 @@ class MasterSuite extends SparkFunSuite
hostnames: Seq[String]): Unit = {
val conf = new SparkConf()
val master = makeAliveMaster(conf)
val workerRegs = (1 to numWorkers).map{idx =>
val workers = (1 to numWorkers).map { idx =>
val worker = new MockWorker(master.self, conf)
worker.rpcEnv.setupEndpoint("worker", worker)
worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
val workerReg = RegisterWorker(
worker.id,
"localhost",
@ -755,14 +758,14 @@ class MasterSuite extends SparkFunSuite
"http://localhost:8080",
RpcAddress("localhost", 10000))
master.self.send(workerReg)
workerReg
worker
}
eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
assert(masterState.workers.length === numWorkers)
assert(masterState.workers.forall(_.state == WorkerState.ALIVE))
assert(masterState.workers.map(_.id).toSet == workerRegs.map(_.id).toSet)
assert(masterState.workers.map(_.id).toSet == workers.map(_.id).toSet)
}
val decomWorkersCount = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
@ -773,8 +776,11 @@ class MasterSuite extends SparkFunSuite
eventually(timeout(30.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
assert(masterState.workers.length === numWorkers)
val workersActuallyDecomed = masterState.workers.count(_.state == WorkerState.DECOMMISSIONED)
assert(workersActuallyDecomed === numWorkersExpectedToDecom)
val workersActuallyDecomed = masterState.workers
.filter(_.state == WorkerState.DECOMMISSIONED).map(_.id)
val decommissionedWorkers = workers.filter(w => workersActuallyDecomed.contains(w.id))
assert(workersActuallyDecomed.length === numWorkersExpectedToDecom)
assert(decommissionedWorkers.forall(_.decommissioned))
}
// Decommissioning a worker again should return the same answer since we want this call to be

View file

@ -31,7 +31,7 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils}
class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
override def beforeEach(): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local")
val conf = new SparkConf().setAppName("test")
.set(config.DECOMMISSION_ENABLED, true)
sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
@ -78,7 +78,10 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
val execs = sched.getExecutorIds()
// Make the executors decommission, finish, exit, and not be replaced.
val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", None))).toArray
sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true)
sched.decommissionExecutors(
execsAndDecomInfo,
adjustTargetNumExecutors = true,
triggeredByExecutor = false)
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
}

View file

@ -40,6 +40,46 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
val TaskEnded = "TASK_ENDED"
val JobEnded = "JOB_ENDED"
Seq(false, true).foreach { isEnabled =>
test(s"SPARK-32850: BlockManager decommission should respect the configuration " +
s"(enabled=${isEnabled})") {
val conf = new SparkConf()
.setAppName("test-blockmanager-decommissioner")
.setMaster("local-cluster[2, 1, 1024]")
.set(config.DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, isEnabled)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 6000)
val executors = sc.getExecutorIds().toArray
val decommissionListener = new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
// ensure Tasks launched at executors before they're marked as decommissioned by driver
Thread.sleep(3000)
sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
.decommissionExecutors(
executors.map { id => (id, ExecutorDecommissionInfo("test")) },
true,
false)
}
}
sc.addSparkListener(decommissionListener)
val decommissionStatus: Seq[Boolean] = sc.parallelize(1 to 100, 2).mapPartitions { _ =>
val startTime = System.currentTimeMillis()
while (SparkEnv.get.blockManager.decommissioner.isEmpty &&
// wait at most 6 seconds for BlockManager to start to decommission (if enabled)
System.currentTimeMillis() - startTime < 6000) {
Thread.sleep(300)
}
val blockManagerDecommissionStatus =
if (SparkEnv.get.blockManager.decommissioner.isEmpty) false else true
Iterator.single(blockManagerDecommissionStatus)
}.collect()
assert(decommissionStatus.forall(_ == isEnabled))
sc.removeSparkListener(decommissionListener)
}
}
testRetry(s"verify that an already running task which is going to cache data succeeds " +
s"on a decommissioned executor after task start") {
runDecomTest(true, false, TaskStarted)

View file

@ -40,9 +40,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
mainClass = "",
expectedLogOnCompletion = Seq(
"Finished waiting, stopping Spark",
"Received decommission executor message",
"Acknowledged decommissioning block manager",
": Executor decommission.",
"Decommission executors",
"Final accumulator value is: 100"),
appArgs = Array.empty[String],
driverPodChecker = doBasicDriverPyPodCheck,
@ -73,9 +71,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
mainClass = "",
expectedLogOnCompletion = Seq(
"Finished waiting, stopping Spark",
"Received decommission executor message",
"Acknowledged decommissioning block manager",
": Executor decommission."),
"Decommission executors"),
appArgs = Array.empty[String],
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
@ -110,9 +106,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
mainClass = "",
expectedLogOnCompletion = Seq(
"Finished waiting, stopping Spark",
"Received decommission executor message",
"Acknowledged decommissioning block manager",
": Executor decommission."),
"Decommission executors"),
appArgs = Array.empty[String],
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,

View file

@ -17,7 +17,7 @@
package org.apache.spark.streaming.scheduler
import org.mockito.ArgumentMatchers.{eq => meq}
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{never, reset, times, verify, when}
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
import org.scalatest.concurrent.Eventually.{eventually, timeout}
@ -101,12 +101,12 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
val decomInfo = ExecutorDecommissionInfo("spark scale down", None)
if (decommissioning) {
verify(allocationClient, times(1)).decommissionExecutor(
meq(expectedExec.get), meq(decomInfo), meq(true))
meq(expectedExec.get), meq(decomInfo), meq(true), any())
verify(allocationClient, never).killExecutor(meq(expectedExec.get))
} else {
verify(allocationClient, times(1)).killExecutor(meq(expectedExec.get))
verify(allocationClient, never).decommissionExecutor(
meq(expectedExec.get), meq(decomInfo), meq(true))
meq(expectedExec.get), meq(decomInfo), meq(true), any())
}
} else {
if (decommissioning) {