[SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support

This PR is based on an existing/previou PR - https://github.com/apache/spark/pull/19045

### What changes were proposed in this pull request?

This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new blocks to cache.

There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing

### Why are the changes needed?

With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required.

### Does this PR introduce any user-facing change?

There is no API change, however an additional configuration flag is added to enable/disable this behaviour.

### How was this patch tested?

New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s.

Closes #26440 from holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Signed-off-by: Holden Karau <hkarau@apple.com>
This commit is contained in:
Holden Karau 2020-02-14 12:36:52 -08:00
parent b343757b1b
commit d273a2bb0f
36 changed files with 694 additions and 50 deletions

View file

@ -60,6 +60,15 @@ private[deploy] object DeployMessages {
assert (port > 0)
}
/**
* @param id the worker id
* @param worker the worker endpoint ref
*/
case class WorkerDecommission(
id: String,
worker: RpcEndpointRef)
extends DeployMessage
case class ExecutorStateChanged(
appId: String,
execId: Int,
@ -149,6 +158,8 @@ private[deploy] object DeployMessages {
case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.
// AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)

View file

@ -19,9 +19,13 @@ package org.apache.spark.deploy
private[deploy] object ExecutorState extends Enumeration {
val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value
type ExecutorState = Value
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
// DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from
// the worker and the executor still exists - but we do want to avoid scheduling new tasks on it.
private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED)
def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state)
}

View file

@ -180,6 +180,8 @@ private[spark] class StandaloneAppClient(
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId, message.getOrElse(""))
}
case WorkerRemoved(id, host, message) =>

View file

@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener {
def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
def executorDecommissioned(fullId: String, message: String): Unit
def workerRemoved(workerId: String, host: String, message: String): Unit
}

View file

@ -243,6 +243,15 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
case WorkerDecommission(id, workerRef) =>
logInfo("Recording worker %s decommissioning".format(id))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else {
// We use foreach since get gives us an option and we can skip the failures.
idToWorker.get(id).foreach(decommissionWorker)
}
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
masterAddress, resources) =>
@ -313,7 +322,9 @@ private[deploy] class Master(
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
// We also don't count failures from decommissioned workers since they are "expected."
if (!normalExit
&& oldState != ExecutorState.DECOMMISSIONED
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
@ -850,6 +861,26 @@ private[deploy] class Master(
true
}
private def decommissionWorker(worker: WorkerInfo): Unit = {
if (worker.state != WorkerState.DECOMMISSIONED) {
logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))
worker.setState(WorkerState.DECOMMISSIONED)
for (exec <- worker.executors.values) {
logInfo("Telling app of decommission executors")
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None, workerLost = false))
exec.state = ExecutorState.DECOMMISSIONED
exec.application.removeExecutor(exec)
}
// On recovery do not add a decommissioned executor
persistenceEngine.removeWorker(worker)
} else {
logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned".
format(worker.id, worker.host, worker.port))
}
}
private def removeWorker(worker: WorkerInfo, msg: String): Unit = {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
worker.setState(WorkerState.DEAD)

View file

@ -67,6 +67,14 @@ private[deploy] class Worker(
Utils.checkHost(host)
assert (port > 0)
// If worker decommissioning is enabled register a handler on PWR to shutdown.
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
logInfo("Registering SIGPWR handler to trigger decommissioning.")
SignalUtils.register("PWR")(decommissionSelf)
} else {
logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
}
// A scheduled executor used to send messages at the specified time.
private val forwardMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
@ -128,6 +136,7 @@ private[deploy] class Worker(
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
private var decommissioned = false
private val workerId = generateWorkerId()
private val sparkHome =
if (sys.props.contains(IS_TESTING.key)) {
@ -549,6 +558,8 @@ private[deploy] class Worker(
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else if (decommissioned) {
logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
@ -672,6 +683,9 @@ private[deploy] class Worker(
case ApplicationFinished(id) =>
finishedApps += id
maybeCleanupApplication(id)
case DecommissionSelf =>
decommissionSelf()
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@ -771,6 +785,18 @@ private[deploy] class Worker(
}
}
private[deploy] def decommissionSelf(): Boolean = {
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
logDebug("Decommissioning self")
decommissioned = true
sendToMaster(WorkerDecommission(workerId, self))
} else {
logWarning("Asked to decommission self, but decommissioning not enabled")
}
// Return true since can be called as a signal handler
true
}
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception

View file

@ -43,7 +43,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, ThreadUtils, Utils}
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
@ -64,6 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile private var decommissioned = false
@volatile var driver: Option[RpcEndpointRef] = None
// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
@ -80,6 +81,9 @@ private[spark] class CoarseGrainedExecutorBackend(
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
override def onStart(): Unit = {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR")(decommissionSelf)
logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
@ -160,6 +164,16 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
if (decommissioned) {
logError("Asked to launch a task while decommissioned.")
driver match {
case Some(endpoint) =>
logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(DecommissionExecutor(executorId))
case _ =>
logError("No registered driver to send Decommission to.")
}
}
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
@ -242,6 +256,29 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}
private def decommissionSelf(): Boolean = {
logInfo("Decommissioning self w/sync")
try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
} else {
logError("No driver to message decommissioning.")
}
if (executor != null) {
executor.decommission()
}
logInfo("Done decommissioning self.")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
false
}
}
}
private[spark] object CoarseGrainedExecutorBackend extends Logging {

View file

@ -216,16 +216,32 @@ private[spark] class Executor(
*/
private var heartbeatFailures = 0
/**
* Flag to prevent launching new tasks while decommissioned. There could be a race condition
* accessing this, but decommissioning is only intended to help not be a hard stop.
*/
private var decommissioned = false
heartbeater.start()
metricsPoller.start()
private[executor] def numRunningTasks: Int = runningTasks.size()
/**
* Mark an executor for decommissioning and avoid launching new tasks.
*/
private[spark] def decommission(): Unit = {
decommissioned = true
}
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
}
def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {

View file

@ -71,4 +71,9 @@ private[spark] object Worker {
ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
.intConf
.createWithDefault(100)
private[spark] val WORKER_DECOMMISSION_ENABLED =
ConfigBuilder("spark.worker.decommission.enabled")
.booleanConf
.createWithDefault(false)
}

View file

@ -361,6 +361,7 @@ abstract class RDD[T: ClassTag](
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
// Block hit.
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
@ -374,6 +375,7 @@ abstract class RDD[T: ClassTag](
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
// Need to compute the block.
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}

View file

@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
private[spark]
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
extends ExecutorLossReason(_message)
/**
* A loss reason that means the executor is marked for decommissioning.
*
* This is used by the task scheduler to remove state associated with the executor, but
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
*/
private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")

View file

@ -88,6 +88,10 @@ private[spark] class Pool(
schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}
override def executorDecommission(executorId: String): Unit = {
schedulableQueue.asScala.foreach(_.executorDecommission(executorId))
}
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue.asScala) {

View file

@ -43,6 +43,7 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def executorDecommission(executorId: String): Unit
def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}

View file

@ -27,6 +27,9 @@ private[spark] trait SchedulerBackend {
def start(): Unit
def stop(): Unit
/**
* Update the current offers and schedule tasks
*/
def reviveOffers(): Unit
def defaultParallelism(): Int

View file

@ -98,6 +98,11 @@ private[spark] trait TaskScheduler {
*/
def applicationId(): String = appId
/**
* Process a decommissioning executor.
*/
def executorDecommission(executorId: String): Unit
/**
* Process a lost executor
*/

View file

@ -734,6 +734,11 @@ private[spark] class TaskSchedulerImpl(
}
}
override def executorDecommission(executorId: String): Unit = {
rootPool.executorDecommission(executorId)
backend.reviveOffers()
}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None

View file

@ -1083,6 +1083,12 @@ private[spark] class TaskSetManager(
levels.toArray
}
def executorDecommission(execId: String): Unit = {
recomputeLocality()
// Future consideration: if an executor is decommissioned it may make sense to add the current
// tasks to the spec exec queue.
}
def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return

View file

@ -94,6 +94,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
extends CoarseGrainedClusterMessage
case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage
case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage

View file

@ -92,6 +92,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
private val executorsPendingLossReason = new HashSet[String]
// Executors which are being decommissioned
protected val executorsPendingDecommission = new HashSet[String]
// A map of ResourceProfile id to map of hostname with its possible task number running on it
@GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var rpHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty
@ -185,11 +188,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
case DecommissionExecutor(executorId) =>
logError(s"Received decommission executor message ${executorId}.")
decommissionExecutor(executorId)
case RemoveWorker(workerId, host, message) =>
removeWorker(workerId, host, message)
case LaunchedExecutor(executorId) =>
executorDataMap.get(executorId).foreach { data =>
data.freeCores = data.totalCores
}
makeOffers(executorId)
case e =>
logError(s"Received unexpected message. ${e}")
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@ -257,6 +269,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
removeWorker(workerId, host, message)
context.reply(true)
case DecommissionExecutor(executorId) =>
logError(s"Received decommission executor message ${executorId}.")
decommissionExecutor(executorId)
context.reply(true)
case RetrieveSparkAppConfig(resourceProfileId) =>
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
val reply = SparkAppConfig(
@ -265,6 +282,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
Option(delegationTokens.get()),
rp)
context.reply(reply)
case e =>
logError(s"Received unexpected ask ${e}")
}
// Make fake resource offers on all executors
@ -365,6 +384,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
executorsPendingDecommission -= executorId
executorsPendingToRemove.remove(executorId).getOrElse(false)
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
@ -389,6 +409,35 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.workerRemoved(workerId, host, message)
}
/**
* Mark a given executor as decommissioned and stop making resource offers for it.
*/
private def decommissionExecutor(executorId: String): Boolean = {
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
executorsPendingDecommission += executorId
true
} else {
false
}
}
if (shouldDisable) {
logInfo(s"Starting decommissioning executor $executorId.")
try {
scheduler.executorDecommission(executorId)
} catch {
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
logInfo(s"Finished decommissioning executor $executorId.")
} else {
logInfo(s"Skipping decommissioning of executor $executorId.")
}
shouldDisable
}
/**
* Stop making resource offers for the given executor. The executor is marked as lost with
* the loss reason still pending.
@ -511,8 +560,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
protected def removeWorker(workerId: String, host: String, message: String): Unit = {
driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t =>
logError(t.getMessage, t))(ThreadUtils.sameThread)
driverEndpoint.send(RemoveWorker(workerId, host, message))
}
/**
* Called by subclasses when notified of a decommissioning executor.
*/
private[spark] def decommissionExecutor(executorId: String): Unit = {
if (driverEndpoint != null) {
logInfo("Propegating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId))
}
}
def sufficientResourcesRegistered(): Boolean = true
@ -543,7 +601,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def isExecutorActive(id: String): Boolean = synchronized {
executorDataMap.contains(id) &&
!executorsPendingToRemove.contains(id) &&
!executorsPendingLossReason.contains(id)
!executorsPendingLossReason.contains(id) &&
!executorsPendingDecommission.contains(id)
}
override def maxNumConcurrentTasks(): Int = synchronized {

View file

@ -174,6 +174,12 @@ private[spark] class StandaloneSchedulerBackend(
removeExecutor(fullId.split("/")(1), reason)
}
override def executorDecommissioned(fullId: String, message: String) {
logInfo("Asked to decommission executor")
decommissionExecutor(fullId.split("/")(1))
logInfo("Executor %s decommissioned: %s".format(fullId, message))
}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {
logInfo("Worker %s removed: %s".format(workerId, message))
removeWorker(workerId, host, message)

View file

@ -60,7 +60,7 @@ private[spark] object SignalUtils extends Logging {
if (SystemUtils.IS_OS_UNIX) {
try {
val handler = handlers.getOrElseUpdate(signal, {
logInfo("Registered signal handler for " + signal)
logInfo("Registering signal handler for " + signal)
new ActionHandler(new Signal(signal))
})
handler.register(action)

View file

@ -30,7 +30,7 @@ import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, Master}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.Utils
@ -44,13 +44,13 @@ class AppClientSuite
with Eventually
with ScalaFutures {
private val numWorkers = 2
private val conf = new SparkConf()
private val securityManager = new SecurityManager(conf)
private var conf: SparkConf = null
private var masterRpcEnv: RpcEnv = null
private var workerRpcEnvs: Seq[RpcEnv] = null
private var master: Master = null
private var workers: Seq[Worker] = null
private var securityManager: SecurityManager = null
/**
* Start the local cluster.
@ -58,6 +58,8 @@ class AppClientSuite
*/
override def beforeAll(): Unit = {
super.beforeAll()
conf = new SparkConf().set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true")
securityManager = new SecurityManager(conf)
masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
workerRpcEnvs = (0 until numWorkers).map { i =>
RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
@ -111,8 +113,23 @@ class AppClientSuite
assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed")
}
// Save the executor id before decommissioning so we can kill it
val application = getApplications().head
val executors = application.executors
val executorId: String = executors.head._2.fullId
// Send a decommission self to all the workers
// Note: normally the worker would send this on their own.
workers.foreach(worker => worker.decommissionSelf())
// Decommissioning is async.
eventually(timeout(1.seconds), interval(10.millis)) {
// We only record decommissioning for the executor we've requested
assert(ci.listener.execDecommissionedList.size === 1)
}
// Send request to kill executor, verify request was made
val executorId: String = getApplications().head.executors.head._2.fullId
whenReady(
ci.client.killExecutors(Seq(executorId)),
timeout(10.seconds),
@ -120,6 +137,15 @@ class AppClientSuite
assert(acknowledged)
}
// Verify that asking for executors on the decommissioned workers fails
whenReady(
ci.client.requestTotalExecutors(numExecutorsRequested),
timeout(10.seconds),
interval(10.millis)) { acknowledged =>
assert(acknowledged)
}
assert(getApplications().head.executors.size === 0)
// Issue stop command for Client to disconnect from Master
ci.client.stop()
@ -189,6 +215,7 @@ class AppClientSuite
val deadReasonList = new ConcurrentLinkedQueue[String]()
val execAddedList = new ConcurrentLinkedQueue[String]()
val execRemovedList = new ConcurrentLinkedQueue[String]()
val execDecommissionedList = new ConcurrentLinkedQueue[String]()
def connected(id: String): Unit = {
connectedIdList.add(id)
@ -218,6 +245,10 @@ class AppClientSuite
execRemovedList.add(id)
}
def executorDecommissioned(id: String, message: String): Unit = {
execDecommissionedList.add(id)
}
def workerRemoved(workerId: String, host: String, message: String): Unit = {}
}

View file

@ -167,6 +167,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorDecommission(executorId: String) = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
@ -707,6 +708,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true
override def executorDecommission(executorId: String): Unit = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None

View file

@ -89,6 +89,7 @@ private class DummyTaskScheduler extends TaskScheduler {
override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorDecommission(executorId: String): Unit = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None

View file

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import java.util.concurrent.Semaphore
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.internal.config
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils}
class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
override def beforeEach(): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local")
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
}
test("verify task with no decommissioning works as expected") {
val input = sc.parallelize(1 to 10)
input.count()
val sleepyRdd = input.mapPartitions{ x =>
Thread.sleep(100)
x
}
assert(sleepyRdd.count() === 10)
}
test("verify a task with all workers decommissioned succeeds") {
val input = sc.parallelize(1 to 10)
// Do a count to wait for the executors to be registered.
input.count()
val sleepyRdd = input.mapPartitions{ x =>
Thread.sleep(50)
x
}
// Listen for the job
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
sem.release()
}
})
// Start the task.
val asyncCount = sleepyRdd.countAsync()
// Wait for the job to have started
sem.acquire(1)
// Decommission all the executors, this should not halt the current task.
// decom.sh message passing is tested manually.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
execs.foreach(execId => sched.decommissionExecutor(execId))
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 2.seconds)
assert(asyncCountResult === 10)
// Try and launch task after decommissioning, this should fail
val postDecommissioned = input.map(x => x)
val postDecomAsyncCount = postDecommissioned.countAsync()
val thrown = intercept[java.util.concurrent.TimeoutException]{
val result = ThreadUtils.awaitResult(postDecomAsyncCount, 2.seconds)
}
assert(postDecomAsyncCount.isCompleted === false,
"After exec decommission new task could not launch")
}
}

View file

@ -55,6 +55,9 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
}
}
def workerDecommissioning: Boolean =
sparkConf.get(org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED)
def nodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)

View file

@ -24,6 +24,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
import org.apache.spark.rpc.RpcEndpointAddress
@ -33,7 +34,7 @@ import org.apache.spark.util.Utils
private[spark] class BasicExecutorFeatureStep(
kubernetesConf: KubernetesExecutorConf,
secMgr: SecurityManager)
extends KubernetesFeatureConfigStep {
extends KubernetesFeatureConfigStep with Logging {
// Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
private val executorContainerImage = kubernetesConf
@ -186,6 +187,21 @@ private[spark] class BasicExecutorFeatureStep(
.endResources()
.build()
}.getOrElse(executorContainer)
val containerWithLifecycle =
if (!kubernetesConf.workerDecommissioning) {
logInfo("Decommissioning not enabled, skipping shutdown script")
containerWithLimitCores
} else {
logInfo("Adding decommission script to lifecycle")
new ContainerBuilder(containerWithLimitCores).withNewLifecycle()
.withNewPreStop()
.withNewExec()
.addToCommand("/opt/decom.sh")
.endExec()
.endPreStop()
.endLifecycle()
.build()
}
val ownerReference = kubernetesConf.driverPod.map { pod =>
new OwnerReferenceBuilder()
.withController(true)
@ -213,6 +229,6 @@ private[spark] class BasicExecutorFeatureStep(
kubernetesConf.get(KUBERNETES_EXECUTOR_SCHEDULER_NAME)
.foreach(executorPod.getSpec.setSchedulerName)
SparkPod(executorPod, containerWithLimitCores)
SparkPod(executorPod, containerWithLifecycle)
}
}

View file

@ -30,7 +30,7 @@ ARG spark_uid=185
RUN set -ex && \
apt-get update && \
ln -s /lib /lib64 && \
apt install -y bash tini libc6 libpam-modules krb5-user libnss3 && \
apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/examples && \
mkdir -p /opt/spark/work-dir && \
@ -45,6 +45,7 @@ COPY jars /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
@ -53,6 +54,7 @@ ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh
ENTRYPOINT [ "/opt/entrypoint.sh" ]

View file

@ -0,0 +1,35 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -ex
echo "Asked to decommission"
# Find the pid to signal
date | tee -a ${LOG}
WORKER_PID=$(ps -o pid -C java | tail -n 1| awk '{ sub(/^[ \t]+/, ""); print }')
echo "Using worker pid $WORKER_PID"
kill -s SIGPWR ${WORKER_PID}
# For now we expect this to timeout, since we don't start exiting the backend.
echo "Waiting for worker pid to exit"
# If the worker does exit stop blocking the cleanup.
timeout 60 tail --pid=${WORKER_PID} -f /dev/null
date
echo "Done"
date
sleep 30

View file

@ -30,9 +30,9 @@ set -e
# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ] ; then
if [ -w /etc/passwd ] ; then
echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
else
echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
fi
fi
@ -59,7 +59,7 @@ fi
# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
# It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s.
if [ -n ${HADOOP_HOME} ] && [ -z ${SPARK_DIST_CLASSPATH} ]; then
export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
fi
if ! [ -z ${HADOOP_CONF_DIR+x} ]; then

View file

@ -16,7 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -xo errexit
set -exo errexit
TEST_ROOT_DIR=$(git rev-parse --show-toplevel)
DEPLOY_MODE="minikube"
@ -42,6 +42,9 @@ SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/nu
| grep -v "WARNING"\
| tail -n 1)
export SCALA_VERSION
echo $SCALA_VERSION
# Parse arguments
while (( "$#" )); do
case $1 in
@ -110,7 +113,8 @@ while (( "$#" )); do
shift
;;
*)
break
echo "Unexpected command line flag $2 $1."
exit 1
;;
esac
shift
@ -164,6 +168,7 @@ properties+=(
-Dspark.kubernetes.test.jvmImage=$JVM_IMAGE_NAME
-Dspark.kubernetes.test.pythonImage=$PYTHON_IMAGE_NAME
-Dspark.kubernetes.test.rImage=$R_IMAGE_NAME
-Dlog4j.logger.org.apache.spark=DEBUG
)
$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -Pkubernetes -Pkubernetes-integration-tests ${properties[@]}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest
import org.apache.spark.internal.config.Worker
private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
import DecommissionSuite._
import KubernetesSuite.k8sTestTag
test("Test basic decommissioning", k8sTestTag) {
sparkAppConf
.set(Worker.WORKER_DECOMMISSION_ENABLED.key, "true")
.set("spark.kubernetes.pyspark.pythonVersion", "3")
.set("spark.kubernetes.container.image", pyImage)
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_DECOMISSIONING,
mainClass = "",
expectedLogOnCompletion = Seq("decommissioning executor",
"Finished waiting, stopping Spark"),
appArgs = Array.empty[String],
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
decommissioningTest = true)
}
}
private[spark] object DecommissionSuite {
val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/"
val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py"
}

View file

@ -42,7 +42,9 @@ import org.apache.spark.internal.config._
class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
with DepsTestsSuite with RTestsSuite with Logging with Eventually with Matchers {
with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually
with Matchers {
import KubernetesSuite._
@ -254,6 +256,7 @@ class KubernetesSuite extends SparkFunSuite
}
}
// scalastyle:off argcount
protected def runSparkApplicationAndVerifyCompletion(
appResource: String,
mainClass: String,
@ -264,11 +267,78 @@ class KubernetesSuite extends SparkFunSuite
appLocator: String,
isJVM: Boolean,
pyFiles: Option[String] = None,
executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = {
executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
decommissioningTest: Boolean = false): Unit = {
// scalastyle:on argcount
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
val execPods = scala.collection.mutable.Map[String, Pod]()
val (patienceInterval, patienceTimeout) = {
executorPatience match {
case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
case _ => (INTERVAL, TIMEOUT)
}
}
def checkPodReady(namespace: String, name: String) = {
val execPod = kubernetesTestComponents.kubernetesClient
.pods()
.inNamespace(namespace)
.withName(name)
.get()
val resourceStatus = execPod.getStatus
val conditions = resourceStatus.getConditions().asScala
val conditionTypes = conditions.map(_.getType())
val readyConditions = conditions.filter{cond => cond.getType() == "Ready"}
val result = readyConditions
.map(cond => cond.getStatus() == "True")
.headOption.getOrElse(false)
result
}
val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.watch(new Watcher[Pod] {
logDebug("Beginning watch of executors")
override def onClose(cause: KubernetesClientException): Unit =
logInfo("Ending watch of executors")
override def eventReceived(action: Watcher.Action, resource: Pod): Unit = {
val name = resource.getMetadata.getName
val namespace = resource.getMetadata().getNamespace()
action match {
case Action.MODIFIED =>
execPods(name) = resource
case Action.ADDED =>
logDebug(s"Add event received for $name.")
execPods(name) = resource
// If testing decommissioning start a thread to simulate
// decommissioning.
if (decommissioningTest && execPods.size == 1) {
// Wait for all the containers in the pod to be running
logDebug("Waiting for first pod to become OK prior to deletion")
Eventually.eventually(patienceTimeout, patienceInterval) {
val result = checkPodReady(namespace, name)
result shouldBe (true)
}
// Sleep a small interval to allow execution of job
logDebug("Sleeping before killing pod.")
Thread.sleep(2000)
// Delete the pod to simulate cluster scale down/migration.
val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name)
pod.delete()
logDebug(s"Triggered pod decom/delete: $name deleted")
}
case Action.DELETED | Action.ERROR =>
execPods.remove(name)
}
}
})
logDebug("Starting Spark K8s job")
SparkAppLauncher.launch(
appArguments,
sparkAppConf,
@ -284,40 +354,33 @@ class KubernetesSuite extends SparkFunSuite
.list()
.getItems
.get(0)
driverPodChecker(driverPod)
val execPods = scala.collection.mutable.Map[String, Pod]()
val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.watch(new Watcher[Pod] {
logInfo("Beginning watch of executors")
override def onClose(cause: KubernetesClientException): Unit =
logInfo("Ending watch of executors")
override def eventReceived(action: Watcher.Action, resource: Pod): Unit = {
val name = resource.getMetadata.getName
action match {
case Action.ADDED | Action.MODIFIED =>
execPods(name) = resource
case Action.DELETED | Action.ERROR =>
execPods.remove(name)
}
}
})
val (patienceInterval, patienceTimeout) = {
executorPatience match {
case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
case _ => (INTERVAL, TIMEOUT)
}
}
// If we're testing decommissioning we delete all the executors, but we should have
// an executor at some point.
Eventually.eventually(patienceTimeout, patienceInterval) {
execPods.values.nonEmpty should be (true)
}
// If decommissioning we need to wait and check the executors were removed
if (decommissioningTest) {
// Sleep a small interval to ensure everything is registered.
Thread.sleep(100)
// Wait for the executors to become ready
Eventually.eventually(patienceTimeout, patienceInterval) {
val anyReadyPods = ! execPods.map{
case (name, resource) =>
(name, resource.getMetadata().getNamespace())
}.filter{
case (name, namespace) => checkPodReady(namespace, name)
}.isEmpty
val podsEmpty = execPods.values.isEmpty
val podsReadyOrDead = anyReadyPods || podsEmpty
podsReadyOrDead shouldBe (true)
}
}
execWatcher.close()
execPods.values.foreach(executorPodChecker(_))
Eventually.eventually(TIMEOUT, patienceInterval) {
Eventually.eventually(patienceTimeout, patienceInterval) {
expectedLogOnCompletion.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
@ -425,5 +488,5 @@ private[spark] object KubernetesSuite {
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val INTERVAL = PatienceConfiguration.Interval(Span(1, Seconds))
}

View file

@ -0,0 +1,45 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
import time
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: decommissioning
"""
print("Starting decom test")
spark = SparkSession \
.builder \
.appName("PyMemoryTest") \
.getOrCreate()
sc = spark._sc
rdd = sc.parallelize(range(10))
rdd.collect()
print("Waiting to give nodes time to finish.")
time.sleep(5)
rdd.collect()
print("Waiting some more....")
time.sleep(10)
rdd.collect()
print("Finished waiting, stopping Spark.")
spark.stop()
print("Done, exiting Python")
sys.exit(0)

View file

@ -0,0 +1,57 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A shell script to decommission all workers on a single slave
#
# Environment variables
#
# SPARK_WORKER_INSTANCES The number of worker instances that should be
# running on this slave. Default is 1.
# Usage: decommission-slave.sh [--block-until-exit]
# Decommissions all slaves on this worker machine
set -ex
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}/sbin/spark-config.sh"
. "${SPARK_HOME}/bin/load-spark-env.sh"
if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
"${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker 1
else
for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
"${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker $(( $i + 1 ))
done
fi
# Check if --block-until-exit is set.
# This is done for systems which block on the decomissioning script and on exit
# shut down the entire system (e.g. K8s).
if [ "$1" == "--block-until-exit" ]; then
shift
# For now we only block on the 0th instance if there multiple instances.
instance=$1
pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"
wait $pid
fi

View file

@ -215,6 +215,21 @@ case $option in
fi
;;
(decommission)
if [ -f $pid ]; then
TARGET_ID="$(cat "$pid")"
if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
echo "decommissioning $command"
kill -s SIGPWR "$TARGET_ID"
else
echo "no $command to decommission"
fi
else
echo "no $command to decommission"
fi
;;
(status)
if [ -f $pid ]; then