From d273a2bb0fac452a97f5670edd69d3e452e3e57e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 14 Feb 2020 12:36:52 -0800 Subject: [PATCH] [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support This PR is based on an existing/previou PR - https://github.com/apache/spark/pull/19045 ### What changes were proposed in this pull request? This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new blocks to cache. There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing ### Why are the changes needed? With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required. ### Does this PR introduce any user-facing change? There is no API change, however an additional configuration flag is added to enable/disable this behaviour. ### How was this patch tested? New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s. Closes #26440 from holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4. Lead-authored-by: Holden Karau Co-authored-by: Holden Karau Signed-off-by: Holden Karau --- .../apache/spark/deploy/DeployMessage.scala | 11 ++ .../apache/spark/deploy/ExecutorState.scala | 8 +- .../deploy/client/StandaloneAppClient.scala | 2 + .../client/StandaloneAppClientListener.scala | 2 + .../apache/spark/deploy/master/Master.scala | 31 +++++ .../apache/spark/deploy/worker/Worker.scala | 26 ++++ .../CoarseGrainedExecutorBackend.scala | 39 +++++- .../org/apache/spark/executor/Executor.scala | 16 +++ .../apache/spark/internal/config/Worker.scala | 5 + .../main/scala/org/apache/spark/rdd/RDD.scala | 2 + .../spark/scheduler/ExecutorLossReason.scala | 8 ++ .../org/apache/spark/scheduler/Pool.scala | 4 + .../apache/spark/scheduler/Schedulable.scala | 1 + .../spark/scheduler/SchedulerBackend.scala | 3 + .../spark/scheduler/TaskScheduler.scala | 5 + .../spark/scheduler/TaskSchedulerImpl.scala | 5 + .../spark/scheduler/TaskSetManager.scala | 6 + .../cluster/CoarseGrainedClusterMessage.scala | 2 + .../CoarseGrainedSchedulerBackend.scala | 66 ++++++++- .../cluster/StandaloneSchedulerBackend.scala | 6 + .../org/apache/spark/util/SignalUtils.scala | 2 +- .../spark/deploy/client/AppClientSuite.scala | 39 +++++- .../spark/scheduler/DAGSchedulerSuite.scala | 2 + .../ExternalClusterManagerSuite.scala | 1 + .../scheduler/WorkerDecommissionSuite.scala | 84 ++++++++++++ .../spark/deploy/k8s/KubernetesConf.scala | 3 + .../features/BasicExecutorFeatureStep.scala | 20 ++- .../src/main/dockerfiles/spark/Dockerfile | 4 +- .../src/main/dockerfiles/spark/decom.sh | 35 +++++ .../src/main/dockerfiles/spark/entrypoint.sh | 6 +- .../dev/dev-run-integration-tests.sh | 9 +- .../integrationtest/DecommissionSuite.scala | 49 +++++++ .../k8s/integrationtest/KubernetesSuite.scala | 125 +++++++++++++----- .../tests/decommissioning.py | 45 +++++++ sbin/decommission-slave.sh | 57 ++++++++ sbin/spark-daemon.sh | 15 +++ 36 files changed, 694 insertions(+), 50 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala create mode 100755 resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala create mode 100644 resource-managers/kubernetes/integration-tests/tests/decommissioning.py create mode 100644 sbin/decommission-slave.sh diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index fba371dcfb..18305ad374 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -60,6 +60,15 @@ private[deploy] object DeployMessages { assert (port > 0) } + /** + * @param id the worker id + * @param worker the worker endpoint ref + */ + case class WorkerDecommission( + id: String, + worker: RpcEndpointRef) + extends DeployMessage + case class ExecutorStateChanged( appId: String, execId: Int, @@ -149,6 +158,8 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master + case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future. + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala index 69c98e2893..0751bcf221 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala @@ -19,9 +19,13 @@ package org.apache.spark.deploy private[deploy] object ExecutorState extends Enumeration { - val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value + val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value type ExecutorState = Value - def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state) + // DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from + // the worker and the executor still exists - but we do want to avoid scheduling new tasks on it. + private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED) + + def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index 8f17159228..eedf5e969e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -180,6 +180,8 @@ private[spark] class StandaloneAppClient( logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) + } else if (state == ExecutorState.DECOMMISSIONED) { + listener.executorDecommissioned(fullId, message.getOrElse("")) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index d8bc1a883d..2e38a68478 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit + def executorDecommissioned(fullId: String, message: String): Unit + def workerRemoved(workerId: String, host: String, message: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 8d3795cae7..71df5dfa42 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -243,6 +243,15 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) + case WorkerDecommission(id, workerRef) => + logInfo("Recording worker %s decommissioning".format(id)) + if (state == RecoveryState.STANDBY) { + workerRef.send(MasterInStandby) + } else { + // We use foreach since get gives us an option and we can skip the failures. + idToWorker.get(id).foreach(decommissionWorker) + } + case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress, resources) => @@ -313,7 +322,9 @@ private[deploy] class Master( // Only retry certain number of times so we don't go into an infinite loop. // Important note: this code path is not exercised by tests, so be very careful when // changing this `if` condition. + // We also don't count failures from decommissioned workers since they are "expected." if (!normalExit + && oldState != ExecutorState.DECOMMISSIONED && appInfo.incrementRetryCount() >= maxExecutorRetries && maxExecutorRetries >= 0) { // < 0 disables this application-killing path val execs = appInfo.executors.values @@ -850,6 +861,26 @@ private[deploy] class Master( true } + private def decommissionWorker(worker: WorkerInfo): Unit = { + if (worker.state != WorkerState.DECOMMISSIONED) { + logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port)) + worker.setState(WorkerState.DECOMMISSIONED) + for (exec <- worker.executors.values) { + logInfo("Telling app of decommission executors") + exec.application.driver.send(ExecutorUpdated( + exec.id, ExecutorState.DECOMMISSIONED, + Some("worker decommissioned"), None, workerLost = false)) + exec.state = ExecutorState.DECOMMISSIONED + exec.application.removeExecutor(exec) + } + // On recovery do not add a decommissioned executor + persistenceEngine.removeWorker(worker) + } else { + logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned". + format(worker.id, worker.host, worker.port)) + } + } + private def removeWorker(worker: WorkerInfo, msg: String): Unit = { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) worker.setState(WorkerState.DEAD) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 4be495ac4f..d988bcedb4 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -67,6 +67,14 @@ private[deploy] class Worker( Utils.checkHost(host) assert (port > 0) + // If worker decommissioning is enabled register a handler on PWR to shutdown. + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + logInfo("Registering SIGPWR handler to trigger decommissioning.") + SignalUtils.register("PWR")(decommissionSelf) + } else { + logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.") + } + // A scheduled executor used to send messages at the specified time. private val forwardMessageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler") @@ -128,6 +136,7 @@ private[deploy] class Worker( private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString private var registered = false private var connected = false + private var decommissioned = false private val workerId = generateWorkerId() private val sparkHome = if (sys.props.contains(IS_TESTING.key)) { @@ -549,6 +558,8 @@ private[deploy] class Worker( case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") + } else if (decommissioned) { + logWarning("Asked to launch an executor while decommissioned. Not launching executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) @@ -672,6 +683,9 @@ private[deploy] class Worker( case ApplicationFinished(id) => finishedApps += id maybeCleanupApplication(id) + + case DecommissionSelf => + decommissionSelf() } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -771,6 +785,18 @@ private[deploy] class Worker( } } + private[deploy] def decommissionSelf(): Boolean = { + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + logDebug("Decommissioning self") + decommissioned = true + sendToMaster(WorkerDecommission(workerId, self)) + } else { + logWarning("Asked to decommission self, but decommissioning not enabled") + } + // Return true since can be called as a signal handler + true + } + private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = { val driverId = driverStateChanged.driverId val exception = driverStateChanged.exception diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 25c5b9812f..faf03a64ae 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -43,7 +43,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, ThreadUtils, Utils} +import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -64,6 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null + @volatile private var decommissioned = false @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need @@ -80,6 +81,9 @@ private[spark] class CoarseGrainedExecutorBackend( private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] override def onStart(): Unit = { + logInfo("Registering PWR handler.") + SignalUtils.register("PWR")(decommissionSelf) + logInfo("Connecting to driver: " + driverUrl) try { _resources = parseOrFindResources(resourcesFileOpt) @@ -160,6 +164,16 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { + if (decommissioned) { + logError("Asked to launch a task while decommissioned.") + driver match { + case Some(endpoint) => + logInfo("Sending DecommissionExecutor to driver.") + endpoint.send(DecommissionExecutor(executorId)) + case _ => + logError("No registered driver to send Decommission to.") + } + } val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) taskResources(taskDesc.taskId) = taskDesc.resources @@ -242,6 +256,29 @@ private[spark] class CoarseGrainedExecutorBackend( System.exit(code) } + + private def decommissionSelf(): Boolean = { + logInfo("Decommissioning self w/sync") + try { + decommissioned = true + // Tell master we are are decommissioned so it stops trying to schedule us + if (driver.nonEmpty) { + driver.get.askSync[Boolean](DecommissionExecutor(executorId)) + } else { + logError("No driver to message decommissioning.") + } + if (executor != null) { + executor.decommission() + } + logInfo("Done decommissioning self.") + // Return true since we are handling a signal + true + } catch { + case e: Exception => + logError(s"Error ${e} during attempt to decommission self") + false + } + } } private[spark] object CoarseGrainedExecutorBackend extends Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 8aeb16fe5d..2bfa1cea4b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -216,16 +216,32 @@ private[spark] class Executor( */ private var heartbeatFailures = 0 + /** + * Flag to prevent launching new tasks while decommissioned. There could be a race condition + * accessing this, but decommissioning is only intended to help not be a hard stop. + */ + private var decommissioned = false + heartbeater.start() metricsPoller.start() private[executor] def numRunningTasks: Int = runningTasks.size() + /** + * Mark an executor for decommissioning and avoid launching new tasks. + */ + private[spark] def decommission(): Unit = { + decommissioned = true + } + def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) + if (decommissioned) { + log.error(s"Launching a task while in decommissioned state.") + } } def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala index f1eaae29f1..2b175c1e14 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala @@ -71,4 +71,9 @@ private[spark] object Worker { ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize") .intConf .createWithDefault(100) + + private[spark] val WORKER_DECOMMISSION_ENABLED = + ConfigBuilder("spark.worker.decommission.enabled") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 64d2032a12..a26b5791fa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -361,6 +361,7 @@ abstract class RDD[T: ClassTag]( readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { + // Block hit. case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().inputMetrics @@ -374,6 +375,7 @@ abstract class RDD[T: ClassTag]( } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } + // Need to compute the block. case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 46a35b6a2e..ee31093ec0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los private[spark] case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false) extends ExecutorLossReason(_message) + +/** + * A loss reason that means the executor is marked for decommissioning. + * + * This is used by the task scheduler to remove state associated with the executor, but + * not yet fail any tasks that were running in the executor before the executor is "fully" lost. + */ +private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 80805df256..2e2851eb90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -88,6 +88,10 @@ private[spark] class Pool( schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason)) } + override def executorDecommission(executorId: String): Unit = { + schedulableQueue.asScala.foreach(_.executorDecommission(executorId)) + } + override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { var shouldRevive = false for (schedulable <- schedulableQueue.asScala) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index b6f88ed0a9..8cc239c81d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -43,6 +43,7 @@ private[spark] trait Schedulable { def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit + def executorDecommission(executorId: String): Unit def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 9159d2a015..4752353046 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -27,6 +27,9 @@ private[spark] trait SchedulerBackend { def start(): Unit def stop(): Unit + /** + * Update the current offers and schedule tasks + */ def reviveOffers(): Unit def defaultParallelism(): Int diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 15f5d20e9b..e9e638a364 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -98,6 +98,11 @@ private[spark] trait TaskScheduler { */ def applicationId(): String = appId + /** + * Process a decommissioning executor. + */ + def executorDecommission(executorId: String): Unit + /** * Process a lost executor */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bf92081d13..1b197c4cca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -734,6 +734,11 @@ private[spark] class TaskSchedulerImpl( } } + override def executorDecommission(executorId: String): Unit = { + rootPool.executorDecommission(executorId) + backend.reviveOffers() + } + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2ce11347ad..18684ee8eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1083,6 +1083,12 @@ private[spark] class TaskSetManager( levels.toArray } + def executorDecommission(execId: String): Unit = { + recomputeLocality() + // Future consideration: if an executor is decommissioned it may make sense to add the current + // tasks to the spec exec queue. + } + def recomputeLocality(): Unit = { // A zombie TaskSetManager may reach here while executorLost happens if (isZombie) return diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 283390814a..8db0122f17 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -94,6 +94,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage + case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage + case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 63aa04986b..6e1efdaf5b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,6 +92,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. private val executorsPendingLossReason = new HashSet[String] + // Executors which are being decommissioned + protected val executorsPendingDecommission = new HashSet[String] + // A map of ResourceProfile id to map of hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var rpHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty @@ -185,11 +188,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) + case DecommissionExecutor(executorId) => + logError(s"Received decommission executor message ${executorId}.") + decommissionExecutor(executorId) + + case RemoveWorker(workerId, host, message) => + removeWorker(workerId, host, message) + case LaunchedExecutor(executorId) => executorDataMap.get(executorId).foreach { data => data.freeCores = data.totalCores } makeOffers(executorId) + case e => + logError(s"Received unexpected message. ${e}") } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -257,6 +269,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeWorker(workerId, host, message) context.reply(true) + case DecommissionExecutor(executorId) => + logError(s"Received decommission executor message ${executorId}.") + decommissionExecutor(executorId) + context.reply(true) + case RetrieveSparkAppConfig(resourceProfileId) => val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId) val reply = SparkAppConfig( @@ -265,6 +282,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Option(delegationTokens.get()), rp) context.reply(reply) + case e => + logError(s"Received unexpected ask ${e}") } // Make fake resource offers on all executors @@ -365,6 +384,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId + executorsPendingDecommission -= executorId executorsPendingToRemove.remove(executorId).getOrElse(false) } totalCoreCount.addAndGet(-executorInfo.totalCores) @@ -389,6 +409,35 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.workerRemoved(workerId, host, message) } + /** + * Mark a given executor as decommissioned and stop making resource offers for it. + */ + private def decommissionExecutor(executorId: String): Boolean = { + val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + + if (shouldDisable) { + logInfo(s"Starting decommissioning executor $executorId.") + try { + scheduler.executorDecommission(executorId) + } catch { + case e: Exception => + logError(s"Unexpected error during decommissioning ${e.toString}", e) + } + logInfo(s"Finished decommissioning executor $executorId.") + } else { + logInfo(s"Skipping decommissioning of executor $executorId.") + } + shouldDisable + } + /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -511,8 +560,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } protected def removeWorker(workerId: String, host: String, message: String): Unit = { - driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t => - logError(t.getMessage, t))(ThreadUtils.sameThread) + driverEndpoint.send(RemoveWorker(workerId, host, message)) + } + + /** + * Called by subclasses when notified of a decommissioning executor. + */ + private[spark] def decommissionExecutor(executorId: String): Unit = { + if (driverEndpoint != null) { + logInfo("Propegating executor decommission to driver.") + driverEndpoint.send(DecommissionExecutor(executorId)) + } } def sufficientResourcesRegistered(): Boolean = true @@ -543,7 +601,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def isExecutorActive(id: String): Boolean = synchronized { executorDataMap.contains(id) && !executorsPendingToRemove.contains(id) && - !executorsPendingLossReason.contains(id) + !executorsPendingLossReason.contains(id) && + !executorsPendingDecommission.contains(id) + } override def maxNumConcurrentTasks(): Int = synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d91d78b29f..42c46464d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -174,6 +174,12 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } + override def executorDecommissioned(fullId: String, message: String) { + logInfo("Asked to decommission executor") + decommissionExecutor(fullId.split("/")(1)) + logInfo("Executor %s decommissioned: %s".format(fullId, message)) + } + override def workerRemoved(workerId: String, host: String, message: String): Unit = { logInfo("Worker %s removed: %s".format(workerId, message)) removeWorker(workerId, host, message) diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 5a24965170..230195da2a 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -60,7 +60,7 @@ private[spark] object SignalUtils extends Logging { if (SystemUtils.IS_OS_UNIX) { try { val handler = handlers.getOrElseUpdate(signal, { - logInfo("Registered signal handler for " + signal) + logInfo("Registering signal handler for " + signal) new ActionHandler(new Signal(signal)) }) handler.register(action) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index a1d3077b8f..a3e39d7f53 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, Master} import org.apache.spark.deploy.worker.Worker -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.Utils @@ -44,13 +44,13 @@ class AppClientSuite with Eventually with ScalaFutures { private val numWorkers = 2 - private val conf = new SparkConf() - private val securityManager = new SecurityManager(conf) + private var conf: SparkConf = null private var masterRpcEnv: RpcEnv = null private var workerRpcEnvs: Seq[RpcEnv] = null private var master: Master = null private var workers: Seq[Worker] = null + private var securityManager: SecurityManager = null /** * Start the local cluster. @@ -58,6 +58,8 @@ class AppClientSuite */ override def beforeAll(): Unit = { super.beforeAll() + conf = new SparkConf().set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + securityManager = new SecurityManager(conf) masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager) workerRpcEnvs = (0 until numWorkers).map { i => RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager) @@ -111,8 +113,23 @@ class AppClientSuite assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed") } + + // Save the executor id before decommissioning so we can kill it + val application = getApplications().head + val executors = application.executors + val executorId: String = executors.head._2.fullId + + // Send a decommission self to all the workers + // Note: normally the worker would send this on their own. + workers.foreach(worker => worker.decommissionSelf()) + + // Decommissioning is async. + eventually(timeout(1.seconds), interval(10.millis)) { + // We only record decommissioning for the executor we've requested + assert(ci.listener.execDecommissionedList.size === 1) + } + // Send request to kill executor, verify request was made - val executorId: String = getApplications().head.executors.head._2.fullId whenReady( ci.client.killExecutors(Seq(executorId)), timeout(10.seconds), @@ -120,6 +137,15 @@ class AppClientSuite assert(acknowledged) } + // Verify that asking for executors on the decommissioned workers fails + whenReady( + ci.client.requestTotalExecutors(numExecutorsRequested), + timeout(10.seconds), + interval(10.millis)) { acknowledged => + assert(acknowledged) + } + assert(getApplications().head.executors.size === 0) + // Issue stop command for Client to disconnect from Master ci.client.stop() @@ -189,6 +215,7 @@ class AppClientSuite val deadReasonList = new ConcurrentLinkedQueue[String]() val execAddedList = new ConcurrentLinkedQueue[String]() val execRemovedList = new ConcurrentLinkedQueue[String]() + val execDecommissionedList = new ConcurrentLinkedQueue[String]() def connected(id: String): Unit = { connectedIdList.add(id) @@ -218,6 +245,10 @@ class AppClientSuite execRemovedList.add(id) } + def executorDecommissioned(id: String, message: String): Unit = { + execDecommissionedList.add(id) + } + def workerRemoved(workerId: String, host: String, message: String): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 101e60c73e..e40b63fe13 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -167,6 +167,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 + override def executorDecommission(executorId: String) = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -707,6 +708,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true + override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 4e71ec1ea7..9f593e0039 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -89,6 +89,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 + override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala new file mode 100644 index 0000000000..15733b0d93 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.Semaphore + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} + +class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { + + override def beforeEach(): Unit = { + val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) + + sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) + } + + test("verify task with no decommissioning works as expected") { + val input = sc.parallelize(1 to 10) + input.count() + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(100) + x + } + assert(sleepyRdd.count() === 10) + } + + test("verify a task with all workers decommissioned succeeds") { + val input = sc.parallelize(1 to 10) + // Do a count to wait for the executors to be registered. + input.count() + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(50) + x + } + // Listen for the job + val sem = new Semaphore(0) + sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + sem.release() + } + }) + // Start the task. + val asyncCount = sleepyRdd.countAsync() + // Wait for the job to have started + sem.acquire(1) + // Decommission all the executors, this should not halt the current task. + // decom.sh message passing is tested manually. + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + val execs = sched.getExecutorIds() + execs.foreach(execId => sched.decommissionExecutor(execId)) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 2.seconds) + assert(asyncCountResult === 10) + // Try and launch task after decommissioning, this should fail + val postDecommissioned = input.map(x => x) + val postDecomAsyncCount = postDecommissioned.countAsync() + val thrown = intercept[java.util.concurrent.TimeoutException]{ + val result = ThreadUtils.awaitResult(postDecomAsyncCount, 2.seconds) + } + assert(postDecomAsyncCount.isCompleted === false, + "After exec decommission new task could not launch") + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 09943b7974..f42f3415ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -55,6 +55,9 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) { } } + def workerDecommissioning: Boolean = + sparkConf.get(org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED) + def nodeSelector: Map[String, String] = KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 6a26df2997..f575241de9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.rpc.RpcEndpointAddress @@ -33,7 +34,7 @@ import org.apache.spark.util.Utils private[spark] class BasicExecutorFeatureStep( kubernetesConf: KubernetesExecutorConf, secMgr: SecurityManager) - extends KubernetesFeatureConfigStep { + extends KubernetesFeatureConfigStep with Logging { // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf private val executorContainerImage = kubernetesConf @@ -186,6 +187,21 @@ private[spark] class BasicExecutorFeatureStep( .endResources() .build() }.getOrElse(executorContainer) + val containerWithLifecycle = + if (!kubernetesConf.workerDecommissioning) { + logInfo("Decommissioning not enabled, skipping shutdown script") + containerWithLimitCores + } else { + logInfo("Adding decommission script to lifecycle") + new ContainerBuilder(containerWithLimitCores).withNewLifecycle() + .withNewPreStop() + .withNewExec() + .addToCommand("/opt/decom.sh") + .endExec() + .endPreStop() + .endLifecycle() + .build() + } val ownerReference = kubernetesConf.driverPod.map { pod => new OwnerReferenceBuilder() .withController(true) @@ -213,6 +229,6 @@ private[spark] class BasicExecutorFeatureStep( kubernetesConf.get(KUBERNETES_EXECUTOR_SCHEDULER_NAME) .foreach(executorPod.getSpec.setSchedulerName) - SparkPod(executorPod, containerWithLimitCores) + SparkPod(executorPod, containerWithLifecycle) } } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 6ed37fc637..cc65a7da12 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -30,7 +30,7 @@ ARG spark_uid=185 RUN set -ex && \ apt-get update && \ ln -s /lib /lib64 && \ - apt install -y bash tini libc6 libpam-modules krb5-user libnss3 && \ + apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps && \ mkdir -p /opt/spark && \ mkdir -p /opt/spark/examples && \ mkdir -p /opt/spark/work-dir && \ @@ -45,6 +45,7 @@ COPY jars /opt/spark/jars COPY bin /opt/spark/bin COPY sbin /opt/spark/sbin COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/ +COPY kubernetes/dockerfiles/spark/decom.sh /opt/ COPY examples /opt/spark/examples COPY kubernetes/tests /opt/spark/tests COPY data /opt/spark/data @@ -53,6 +54,7 @@ ENV SPARK_HOME /opt/spark WORKDIR /opt/spark/work-dir RUN chmod g+w /opt/spark/work-dir +RUN chmod a+x /opt/decom.sh ENTRYPOINT [ "/opt/entrypoint.sh" ] diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh new file mode 100755 index 0000000000..8a5208d49a --- /dev/null +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +set -ex +echo "Asked to decommission" +# Find the pid to signal +date | tee -a ${LOG} +WORKER_PID=$(ps -o pid -C java | tail -n 1| awk '{ sub(/^[ \t]+/, ""); print }') +echo "Using worker pid $WORKER_PID" +kill -s SIGPWR ${WORKER_PID} +# For now we expect this to timeout, since we don't start exiting the backend. +echo "Waiting for worker pid to exit" +# If the worker does exit stop blocking the cleanup. +timeout 60 tail --pid=${WORKER_PID} -f /dev/null +date +echo "Done" +date +sleep 30 diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 6ee3523c8e..05ab782cae 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -30,9 +30,9 @@ set -e # If there is no passwd entry for the container UID, attempt to create one if [ -z "$uidentry" ] ; then if [ -w /etc/passwd ] ; then - echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd + echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd else - echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID" + echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID" fi fi @@ -59,7 +59,7 @@ fi # If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor. # It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s. if [ -n ${HADOOP_HOME} ] && [ -z ${SPARK_DIST_CLASSPATH} ]; then - export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) + export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) fi if ! [ -z ${HADOOP_CONF_DIR+x} ]; then diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh index 607bb24345..292abe91d3 100755 --- a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh +++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh @@ -16,7 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -set -xo errexit +set -exo errexit TEST_ROOT_DIR=$(git rev-parse --show-toplevel) DEPLOY_MODE="minikube" @@ -42,6 +42,9 @@ SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/nu | grep -v "WARNING"\ | tail -n 1) +export SCALA_VERSION +echo $SCALA_VERSION + # Parse arguments while (( "$#" )); do case $1 in @@ -110,7 +113,8 @@ while (( "$#" )); do shift ;; *) - break + echo "Unexpected command line flag $2 $1." + exit 1 ;; esac shift @@ -164,6 +168,7 @@ properties+=( -Dspark.kubernetes.test.jvmImage=$JVM_IMAGE_NAME -Dspark.kubernetes.test.pythonImage=$PYTHON_IMAGE_NAME -Dspark.kubernetes.test.rImage=$R_IMAGE_NAME + -Dlog4j.logger.org.apache.spark=DEBUG ) $TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -Pkubernetes -Pkubernetes-integration-tests ${properties[@]} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala new file mode 100644 index 0000000000..f5eab6e4bb --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.apache.spark.internal.config.Worker + +private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => + + import DecommissionSuite._ + import KubernetesSuite.k8sTestTag + + test("Test basic decommissioning", k8sTestTag) { + sparkAppConf + .set(Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + .set("spark.kubernetes.pyspark.pythonVersion", "3") + .set("spark.kubernetes.container.image", pyImage) + + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_DECOMISSIONING, + mainClass = "", + expectedLogOnCompletion = Seq("decommissioning executor", + "Finished waiting, stopping Spark"), + appArgs = Array.empty[String], + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false, + decommissioningTest = true) + } +} + +private[spark] object DecommissionSuite { + val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/" + val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py" +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 0d4fcccc35..61e1f27b55 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -42,7 +42,9 @@ import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with DepsTestsSuite with RTestsSuite with Logging with Eventually with Matchers { + with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually + with Matchers { + import KubernetesSuite._ @@ -254,6 +256,7 @@ class KubernetesSuite extends SparkFunSuite } } + // scalastyle:off argcount protected def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String, @@ -264,11 +267,78 @@ class KubernetesSuite extends SparkFunSuite appLocator: String, isJVM: Boolean, pyFiles: Option[String] = None, - executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = { + executorPatience: Option[(Option[Interval], Option[Timeout])] = None, + decommissioningTest: Boolean = false): Unit = { + + // scalastyle:on argcount val appArguments = SparkAppArguments( mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) + + val execPods = scala.collection.mutable.Map[String, Pod]() + val (patienceInterval, patienceTimeout) = { + executorPatience match { + case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) + case _ => (INTERVAL, TIMEOUT) + } + } + def checkPodReady(namespace: String, name: String) = { + val execPod = kubernetesTestComponents.kubernetesClient + .pods() + .inNamespace(namespace) + .withName(name) + .get() + val resourceStatus = execPod.getStatus + val conditions = resourceStatus.getConditions().asScala + val conditionTypes = conditions.map(_.getType()) + val readyConditions = conditions.filter{cond => cond.getType() == "Ready"} + val result = readyConditions + .map(cond => cond.getStatus() == "True") + .headOption.getOrElse(false) + result + } + val execWatcher = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", appLocator) + .withLabel("spark-role", "executor") + .watch(new Watcher[Pod] { + logDebug("Beginning watch of executors") + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending watch of executors") + override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + val name = resource.getMetadata.getName + val namespace = resource.getMetadata().getNamespace() + action match { + case Action.MODIFIED => + execPods(name) = resource + case Action.ADDED => + logDebug(s"Add event received for $name.") + execPods(name) = resource + // If testing decommissioning start a thread to simulate + // decommissioning. + if (decommissioningTest && execPods.size == 1) { + // Wait for all the containers in the pod to be running + logDebug("Waiting for first pod to become OK prior to deletion") + Eventually.eventually(patienceTimeout, patienceInterval) { + val result = checkPodReady(namespace, name) + result shouldBe (true) + } + // Sleep a small interval to allow execution of job + logDebug("Sleeping before killing pod.") + Thread.sleep(2000) + // Delete the pod to simulate cluster scale down/migration. + val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) + pod.delete() + logDebug(s"Triggered pod decom/delete: $name deleted") + } + case Action.DELETED | Action.ERROR => + execPods.remove(name) + } + } + }) + + logDebug("Starting Spark K8s job") SparkAppLauncher.launch( appArguments, sparkAppConf, @@ -284,40 +354,33 @@ class KubernetesSuite extends SparkFunSuite .list() .getItems .get(0) + driverPodChecker(driverPod) - val execPods = scala.collection.mutable.Map[String, Pod]() - val execWatcher = kubernetesTestComponents.kubernetesClient - .pods() - .withLabel("spark-app-locator", appLocator) - .withLabel("spark-role", "executor") - .watch(new Watcher[Pod] { - logInfo("Beginning watch of executors") - override def onClose(cause: KubernetesClientException): Unit = - logInfo("Ending watch of executors") - override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { - val name = resource.getMetadata.getName - action match { - case Action.ADDED | Action.MODIFIED => - execPods(name) = resource - case Action.DELETED | Action.ERROR => - execPods.remove(name) - } - } - }) - - val (patienceInterval, patienceTimeout) = { - executorPatience match { - case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) - case _ => (INTERVAL, TIMEOUT) - } - } - + // If we're testing decommissioning we delete all the executors, but we should have + // an executor at some point. Eventually.eventually(patienceTimeout, patienceInterval) { execPods.values.nonEmpty should be (true) } + // If decommissioning we need to wait and check the executors were removed + if (decommissioningTest) { + // Sleep a small interval to ensure everything is registered. + Thread.sleep(100) + // Wait for the executors to become ready + Eventually.eventually(patienceTimeout, patienceInterval) { + val anyReadyPods = ! execPods.map{ + case (name, resource) => + (name, resource.getMetadata().getNamespace()) + }.filter{ + case (name, namespace) => checkPodReady(namespace, name) + }.isEmpty + val podsEmpty = execPods.values.isEmpty + val podsReadyOrDead = anyReadyPods || podsEmpty + podsReadyOrDead shouldBe (true) + } + } execWatcher.close() execPods.values.foreach(executorPodChecker(_)) - Eventually.eventually(TIMEOUT, patienceInterval) { + Eventually.eventually(patienceTimeout, patienceInterval) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient .pods() @@ -425,5 +488,5 @@ private[spark] object KubernetesSuite { val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) - val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + val INTERVAL = PatienceConfiguration.Interval(Span(1, Seconds)) } diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py new file mode 100644 index 0000000000..f68f24d497 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +import time + +from pyspark.sql import SparkSession + + +if __name__ == "__main__": + """ + Usage: decommissioning + """ + print("Starting decom test") + spark = SparkSession \ + .builder \ + .appName("PyMemoryTest") \ + .getOrCreate() + sc = spark._sc + rdd = sc.parallelize(range(10)) + rdd.collect() + print("Waiting to give nodes time to finish.") + time.sleep(5) + rdd.collect() + print("Waiting some more....") + time.sleep(10) + rdd.collect() + print("Finished waiting, stopping Spark.") + spark.stop() + print("Done, exiting Python") + sys.exit(0) diff --git a/sbin/decommission-slave.sh b/sbin/decommission-slave.sh new file mode 100644 index 0000000000..4bbf257ff1 --- /dev/null +++ b/sbin/decommission-slave.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A shell script to decommission all workers on a single slave +# +# Environment variables +# +# SPARK_WORKER_INSTANCES The number of worker instances that should be +# running on this slave. Default is 1. + +# Usage: decommission-slave.sh [--block-until-exit] +# Decommissions all slaves on this worker machine + +set -ex + +if [ -z "${SPARK_HOME}" ]; then + export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi + +. "${SPARK_HOME}/sbin/spark-config.sh" + +. "${SPARK_HOME}/bin/load-spark-env.sh" + +if [ "$SPARK_WORKER_INSTANCES" = "" ]; then + "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker 1 +else + for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do + "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker $(( $i + 1 )) + done +fi + +# Check if --block-until-exit is set. +# This is done for systems which block on the decomissioning script and on exit +# shut down the entire system (e.g. K8s). +if [ "$1" == "--block-until-exit" ]; then + shift + # For now we only block on the 0th instance if there multiple instances. + instance=$1 + pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid" + wait $pid +fi diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index 6de67e039b..81f2fd40a7 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -215,6 +215,21 @@ case $option in fi ;; + (decommission) + + if [ -f $pid ]; then + TARGET_ID="$(cat "$pid")" + if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then + echo "decommissioning $command" + kill -s SIGPWR "$TARGET_ID" + else + echo "no $command to decommission" + fi + else + echo "no $command to decommission" + fi + ;; + (status) if [ -f $pid ]; then