[SPARK-28487][K8S] More responsive dynamic allocation with K8S

This change implements a few changes to the k8s pod allocator so
that it behaves a little better when dynamic allocation is on.

(i) Allow the application to ramp up immediately when there's a
change in the target number of executors. Without this change,
scaling would only trigger when a change happened in the state of
the cluster, e.g. an executor going down, or when the periodical
snapshot was taken (default every 30s).

(ii) Get rid of pending pod requests, both acknowledged (i.e. Spark
knows that a pod is pending resource allocation) and unacknowledged
(i.e. Spark has requested the pod but the API server hasn't created it
yet), when they're not needed anymore. This avoids starting those
executors to just remove them after the idle timeout, wasting resources
in the meantime.

(iii) Re-work some of the code to avoid unnecessary logging. While not
bad without dynamic allocation, the existing logging was very chatty
when dynamic allocation was on. With the changes, all the useful
information is still there, but only when interesting changes happen.

(iv) Gracefully shut down executors when they become idle. Just deleting
the pod causes a lot of ugly logs to show up, so it's better to ask pods
to exit nicely. That also allows Spark to respect the "don't delete
pods" option when dynamic allocation is on.

Tested on a small k8s cluster running different TPC-DS workloads.

Closes #25236 from vanzin/SPARK-28487.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Marcelo Vanzin 2019-08-13 17:29:54 -07:00
parent 331f2657d9
commit 0343854f54
11 changed files with 323 additions and 142 deletions

View file

@ -193,8 +193,10 @@ private[spark] class ExecutorMonitor(
}
}
logDebug(s"Activated executors $activatedExecs due to shuffle data needed by new job" +
s"${event.jobId}.")
if (activatedExecs.nonEmpty) {
logDebug(s"Activated executors $activatedExecs due to shuffle data needed by new job" +
s"${event.jobId}.")
}
if (needTimeoutUpdate) {
nextTimeout.set(Long.MinValue)
@ -243,8 +245,10 @@ private[spark] class ExecutorMonitor(
}
}
logDebug(s"Executors $deactivatedExecs do not have active shuffle data after job " +
s"${event.jobId} finished.")
if (deactivatedExecs.nonEmpty) {
logDebug(s"Executors $deactivatedExecs do not have active shuffle data after job " +
s"${event.jobId} finished.")
}
}
jobToStageIDs.remove(event.jobId).foreach { stages =>
@ -506,6 +510,8 @@ private[spark] class ExecutorMonitor(
}
}
def nonEmpty: Boolean = ids != null && ids.nonEmpty
override def toString(): String = {
ids.mkString(",") + (if (excess > 0) s" (and $excess more)" else "")
}

View file

@ -330,6 +330,12 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(true)
val KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD =
ConfigBuilder("spark.kubernetes.dynamicAllocation.deleteGracePeriod")
.doc("How long to wait for executors to shut down gracefully before a forceful kill.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val KUBERNETES_SUBMIT_GRACE_PERIOD =
ConfigBuilder("spark.kubernetes.appKillPodDeletionGracePeriod")
.doc("Time to wait for graceful deletion of Spark pods when spark-submit" +

View file

@ -16,7 +16,7 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
@ -66,15 +66,28 @@ private[spark] class ExecutorPodsAllocator(
// snapshot yet. Mapped to the timestamp when they were created.
private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)
private val hasPendingPods = new AtomicBoolean()
private var lastSnapshot = ExecutorPodsSnapshot(Nil)
def start(applicationId: String): Unit = {
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, _)
}
}
def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total)
def setTotalExpectedExecutors(total: Int): Unit = {
totalExpectedExecutors.set(total)
if (!hasPendingPods.get()) {
snapshotsStore.notifySubscribers()
}
}
private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
private def onNewSnapshots(
applicationId: String,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
// For all executors we've created against the API but have not seen in a snapshot
// yet - check the current time. If the current time has exceeded some threshold,
@ -82,81 +95,138 @@ private[spark] class ExecutorPodsAllocator(
// handled the creation request), or the API server created the pod but we missed
// both the creation and deletion events. In either case, delete the missing pod
// if possible, and mark such a pod to be rescheduled below.
newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
val currentTime = clock.getTimeMillis()
val currentTime = clock.getTimeMillis()
val timedOut = newlyCreatedExecutors.flatMap { case (execId, timeCreated) =>
if (currentTime - timeCreated > podCreationTimeout) {
logWarning(s"Executor with id $execId was not detected in the Kubernetes" +
s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
" previous allocation attempt tried to create it. The executor may have been" +
" deleted but the application missed the deletion event.")
if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
.delete()
}
}
newlyCreatedExecutors -= execId
Some(execId)
} else {
logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
s" was created ${currentTime - timeCreated} milliseconds ago.")
None
}
}
if (timedOut.nonEmpty) {
logWarning(s"Executors with ids ${timedOut.mkString(",")} were not detected in the" +
s" Kubernetes cluster after $podCreationTimeout ms despite the fact that a previous" +
" allocation attempt tried to create them. The executors may have been deleted but the" +
" application missed the deletion event.")
newlyCreatedExecutors --= timedOut
if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
.delete()
}
}
}
if (snapshots.nonEmpty) {
// Only need to examine the cluster as of the latest snapshot, the "current" state, to see if
// we need to allocate more executors or not.
val latestSnapshot = snapshots.last
val currentRunningExecutors = latestSnapshot.executorPods.values.count {
case PodRunning(_) => true
lastSnapshot = snapshots.last
}
val currentRunningCount = lastSnapshot.executorPods.values.count {
case PodRunning(_) => true
case _ => false
}
val currentPendingExecutors = lastSnapshot.executorPods
.filter {
case (_, PodPending(_)) => true
case _ => false
}
val currentPendingExecutors = latestSnapshot.executorPods.values.count {
case PodPending(_) => true
case _ => false
}
val currentTotalExpectedExecutors = totalExpectedExecutors.get
logDebug(s"Currently have $currentRunningExecutors running executors and" +
s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" +
s" have been requested but are pending appearance in the cluster.")
if (newlyCreatedExecutors.isEmpty
&& currentPendingExecutors == 0
&& currentRunningExecutors < currentTotalExpectedExecutors) {
val numExecutorsToAllocate = math.min(
currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
conf,
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient)
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
kubernetesClient.pods().create(podWithAttachedContainer)
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
.map { case (id, _) => id }
if (snapshots.nonEmpty) {
logDebug(s"Pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutors.size} pending, " +
s"${newlyCreatedExecutors.size} unacknowledged.")
}
val currentTotalExpectedExecutors = totalExpectedExecutors.get
// This variable is used later to print some debug logs. It's updated when cleaning up
// excess pod requests, since currentPendingExecutors is immutable.
var knownPendingCount = currentPendingExecutors.size
// It's possible that we have outstanding pods that are outdated when dynamic allocation
// decides to downscale the application. So check if we can release any pending pods early
// instead of waiting for them to time out. Drop them first from the unacknowledged list,
// then from the pending.
//
// TODO: with dynamic allocation off, handle edge cases if we end up with more running
// executors than expected.
val knownPodCount = currentRunningCount + currentPendingExecutors.size +
newlyCreatedExecutors.size
if (knownPodCount > currentTotalExpectedExecutors) {
val excess = knownPodCount - currentTotalExpectedExecutors
val knownPendingToDelete = currentPendingExecutors.take(excess - newlyCreatedExecutors.size)
val toDelete = newlyCreatedExecutors.keys.take(excess).toList ++ knownPendingToDelete
if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
newlyCreatedExecutors --= toDelete
knownPendingCount -= knownPendingToDelete.size
}
} else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
// TODO handle edge cases if we end up with more running executors than expected.
logDebug("Current number of running executors is equal to the number of requested" +
" executors. Not scaling up further.")
} else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
s" executors to begin running before requesting for more executors. # of executors in" +
s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" +
s" created but we have not observed as being present in the cluster yet:" +
s" ${newlyCreatedExecutors.size}.")
}
}
if (newlyCreatedExecutors.isEmpty
&& currentPendingExecutors.isEmpty
&& currentRunningCount < currentTotalExpectedExecutors) {
val numExecutorsToAllocate = math.min(
currentTotalExpectedExecutors - currentRunningCount, podAllocationSize)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
conf,
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient)
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
kubernetesClient.pods().create(podWithAttachedContainer)
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
}
}
// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
// update method when not needed.
hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0)
// The code below just prints debug messages, which are only useful when there's a change
// in the snapshot state. Since the messages are a little spammy, avoid them when we know
// there are no useful updates.
if (!log.isDebugEnabled || snapshots.isEmpty) {
return
}
if (currentRunningCount >= currentTotalExpectedExecutors && !dynamicAllocationEnabled) {
logDebug("Current number of running executors is equal to the number of requested" +
" executors. Not scaling up further.")
} else {
val outstanding = knownPendingCount + newlyCreatedExecutors.size
if (outstanding > 0) {
logDebug(s"Still waiting for $outstanding executors before requesting more.")
}
}
}

View file

@ -68,9 +68,13 @@ private[spark] class ExecutorPodsLifecycleManager(
s" pod name ${state.pod.getMetadata.getName}")
onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
case succeeded@PodSucceeded(_) =>
logDebug(s"Snapshot reported succeeded executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
s" unusual unless Spark specifically informed the executor to exit.")
if (schedulerBackend.isExecutorActive(execId.toString)) {
logInfo(s"Snapshot reported succeeded executor with id $execId, " +
"even though the application has not requested for it to be removed.")
} else {
logDebug(s"Snapshot reported succeeded executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}.")
}
onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
case _ =>
}

View file

@ -26,6 +26,8 @@ private[spark] trait ExecutorPodsSnapshotsStore {
def stop(): Unit
def notifySubscribers(): Unit
def updatePod(updatedPod: Pod): Unit
def replaceSnapshot(newSnapshot: Seq[Pod]): Unit

View file

@ -52,8 +52,8 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
private val SNAPSHOT_LOCK = new Object()
private val subscribers = mutable.Buffer.empty[SnapshotsSubscriber]
private val pollingTasks = mutable.Buffer.empty[Future[_]]
private val subscribers = new CopyOnWriteArrayList[SnapshotsSubscriber]()
private val pollingTasks = new CopyOnWriteArrayList[Future[_]]
@GuardedBy("SNAPSHOT_LOCK")
private var currentSnapshot = ExecutorPodsSnapshot()
@ -66,16 +66,24 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
SNAPSHOT_LOCK.synchronized {
newSubscriber.snapshotsBuffer.add(currentSnapshot)
}
subscribers += newSubscriber
pollingTasks += subscribersExecutor.scheduleWithFixedDelay(
subscribers.add(newSubscriber)
pollingTasks.add(subscribersExecutor.scheduleWithFixedDelay(
() => callSubscriber(newSubscriber),
0L,
processBatchIntervalMillis,
TimeUnit.MILLISECONDS)
TimeUnit.MILLISECONDS))
}
override def notifySubscribers(): Unit = SNAPSHOT_LOCK.synchronized {
subscribers.asScala.foreach { s =>
subscribersExecutor.submit(new Runnable() {
override def run(): Unit = callSubscriber(s)
})
}
}
override def stop(): Unit = {
pollingTasks.foreach(_.cancel(true))
pollingTasks.asScala.foreach(_.cancel(false))
ThreadUtils.shutdown(subscribersExecutor)
}
@ -90,7 +98,7 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
}
private def addCurrentSnapshotToSubscribers(): Unit = {
subscribers.foreach { subscriber =>
subscribers.asScala.foreach { subscriber =>
subscriber.snapshotsBuffer.add(currentSnapshot)
}
}

View file

@ -77,8 +77,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
}
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-executor-requests")
val schedulerExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-maintenance")
val subscribersExecutor = ThreadUtils
.newDaemonThreadPoolScheduledExecutor(
@ -114,7 +114,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
scheduler.asInstanceOf[TaskSchedulerImpl],
sc,
kubernetesClient,
requestExecutorsService,
schedulerExecutorService,
snapshotsStore,
executorPodsAllocator,
executorPodsLifecycleEventHandler,

View file

@ -16,9 +16,9 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.ExecutorService
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import io.fabric8.kubernetes.client.KubernetesClient
@ -27,8 +27,8 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
import org.apache.spark.util.{ThreadUtils, Utils}
@ -36,7 +36,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
kubernetesClient: KubernetesClient,
requestExecutorsService: ExecutorService,
executorService: ScheduledExecutorService,
snapshotsStore: ExecutorPodsSnapshotsStore,
podAllocator: ExecutorPodsAllocator,
lifecycleEventHandler: ExecutorPodsLifecycleManager,
@ -44,9 +44,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
private implicit val requestExecutorContext =
ExecutionContext.fromExecutorService(requestExecutorsService)
protected override val minRegisteredRatio =
if (conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) {
0.8
@ -60,7 +57,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
removeExecutor(executorId, reason)
if (isExecutorActive(executorId)) {
removeExecutor(executorId, reason)
}
}
/**
@ -76,9 +75,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def start(): Unit = {
super.start()
if (!Utils.isDynamicAllocationEnabled(conf)) {
podAllocator.setTotalExpectedExecutors(initialExecutors)
}
podAllocator.setTotalExpectedExecutors(initialExecutors)
lifecycleEventHandler.start(this)
podAllocator.start(applicationId())
watchEvents.start(applicationId())
@ -111,7 +108,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
Utils.tryLogNonFatalError {
ThreadUtils.shutdown(requestExecutorsService)
ThreadUtils.shutdown(executorService)
}
Utils.tryLogNonFatalError {
@ -119,11 +116,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
// TODO when we support dynamic allocation, the pod allocator should be told to process the
// current snapshot in order to decrease/increase the number of executors accordingly.
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
podAllocator.setTotalExpectedExecutors(requestedTotal)
true
Future.successful(true)
}
override def sufficientResourcesRegistered(): Boolean = {
@ -134,14 +129,48 @@ private[spark] class KubernetesClusterSchedulerBackend(
super.getExecutorIds()
}
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
.delete()
// Don't do anything else - let event handling from the Kubernetes API do the Spark changes
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
executorIds.foreach { id =>
removeExecutor(id, ExecutorKilled)
}
// Give some time for the executors to shut themselves down, then forcefully kill any
// remaining ones. This intentionally ignores the configuration about whether pods
// should be deleted; only executors that shut down gracefully (and are then collected
// by the ExecutorPodsLifecycleManager) will respect that configuration.
val killTask = new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {
val running = kubernetesClient
.pods()
.withField("status.phase", "Running")
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
if (!running.list().getItems().isEmpty()) {
logInfo(s"Forcefully deleting ${running.list().getItems().size()} pods " +
s"(out of ${executorIds.size}) that are still running after graceful shutdown period.")
running.delete()
}
}
}
executorService.schedule(killTask, conf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD),
TimeUnit.MILLISECONDS)
// Return an immediate success, since we can't confirm or deny that executors have been
// actually shut down without waiting too long and blocking the allocation thread, which
// waits on this future to complete, blocking further allocations / deallocations.
//
// This relies a lot on the guarantees of Spark's RPC system, that a message will be
// delivered to the destination unless there's an issue with the connection, in which
// case the executor will shut itself down (and the driver, separately, will just declare
// it as "lost"). Coupled with the allocation manager keeping track of which executors are
// pending release, returning "true" here means that eventually all the requested executors
// will be removed.
//
// The cleanup timer above is just an optimization to make sure that stuck executors don't
// stick around in the k8s server. Normally it should never delete any pods at all.
Future.successful(true)
}
override def createDriverEndpoint(): DriverEndpoint = {

View file

@ -34,7 +34,7 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
override def stop(): Unit = {}
def notifySubscribers(): Unit = {
override def notifySubscribers(): Unit = {
subscribers.foreach(_(snapshotsBuffer))
snapshotsBuffer.clear()
}

View file

@ -92,8 +92,6 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Initially request executors in batches. Do not request another batch if the" +
" first has not finished.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
for (nextId <- 1 to podAllocationSize) {
verify(podOperations).create(podWithAttachedContainerForId(nextId))
}
@ -103,8 +101,6 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Request executors in batches. Allow another batch to be requested if" +
" all pending executors start running.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
for (execId <- 1 until podAllocationSize) {
snapshotsStore.updatePod(runningExecutor(execId))
}
@ -121,8 +117,6 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("When a current batch reaches error states immediately, re-request" +
" them on the next batch.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
for (execId <- 1 until podAllocationSize) {
snapshotsStore.updatePod(runningExecutor(execId))
}
@ -134,25 +128,69 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("When an executor is requested but the API does not report it in a reasonable time, retry" +
" requesting that executor.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(1)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(podOperations)
when(podOperations
withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_EXECUTOR_ID_LABEL, "1"))
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
.thenReturn(labeledPods)
podsAllocatorUnderTest.setTotalExpectedExecutors(1)
verify(podOperations).create(podWithAttachedContainerForId(1))
waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
snapshotsStore.notifySubscribers()
verify(labeledPods).delete()
verify(podOperations).create(podWithAttachedContainerForId(2))
}
test("SPARK-28487: scale up and down on target executor count changes") {
when(podOperations
.withField("status.phase", "Pending"))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(podOperations)
when(podOperations
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
.thenReturn(podOperations)
// Target 1 executor, make sure it's requested, even with an empty initial snapshot.
podsAllocatorUnderTest.setTotalExpectedExecutors(1)
verify(podOperations).create(podWithAttachedContainerForId(1))
// Mark executor as running, verify that subsequent allocation cycle is a no-op.
snapshotsStore.updatePod(runningExecutor(1))
snapshotsStore.notifySubscribers()
verify(podOperations, times(1)).create(any())
verify(podOperations, never()).delete()
// Request 3 more executors, make sure all are requested.
podsAllocatorUnderTest.setTotalExpectedExecutors(4)
snapshotsStore.notifySubscribers()
verify(podOperations).create(podWithAttachedContainerForId(2))
verify(podOperations).create(podWithAttachedContainerForId(3))
verify(podOperations).create(podWithAttachedContainerForId(4))
// Mark 2 as running, 3 as pending. Allocation cycle should do nothing.
snapshotsStore.updatePod(runningExecutor(2))
snapshotsStore.updatePod(pendingExecutor(3))
snapshotsStore.notifySubscribers()
verify(podOperations, times(4)).create(any())
verify(podOperations, never()).delete()
// Scale down to 1. Pending executors (both acknowledged and not) should be deleted.
podsAllocatorUnderTest.setTotalExpectedExecutors(1)
snapshotsStore.notifySubscribers()
verify(podOperations, times(4)).create(any())
verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
verify(podOperations).delete()
}
private def executorPodAnswer(): Answer[SparkPod] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)

View file

@ -16,14 +16,19 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.Arrays
import java.util.concurrent.TimeUnit
import io.fabric8.kubernetes.api.model.{Pod, PodList}
import io.fabric8.kubernetes.client.KubernetesClient
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{eq => mockitoEq}
import org.mockito.Mockito.{never, verify, when}
import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
import org.mockito.Mockito.{mock, never, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
@ -34,7 +39,7 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SP
class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter {
private val requestExecutorsService = new DeterministicScheduler()
private val schedulerExecutorService = new DeterministicScheduler()
private val sparkConf = new SparkConf(false)
.set("spark.executor.instances", "3")
.set("spark.app.id", TEST_SPARK_APP_ID)
@ -98,7 +103,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
taskScheduler,
sc,
kubernetesClient,
requestExecutorsService,
schedulerExecutorService,
eventQueue,
podAllocator,
lifecycleEventHandler,
@ -127,29 +132,42 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
}
test("Remove executor") {
schedulerBackendUnderTest.start()
schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled)
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
val backend = spy(schedulerBackendUnderTest)
when(backend.isExecutorActive(any())).thenReturn(false)
when(backend.isExecutorActive(mockitoEq("2"))).thenReturn(true)
backend.start()
backend.doRemoveExecutor("1", ExecutorKilled)
verify(driverEndpointRef, never()).send(RemoveExecutor("1", ExecutorKilled))
backend.doRemoveExecutor("2", ExecutorKilled)
verify(driverEndpointRef, never()).send(RemoveExecutor("1", ExecutorKilled))
}
test("Kill executors") {
schedulerBackendUnderTest.start()
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(podOperations.withField(any(), any())).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods)
val podList = mock(classOf[PodList])
when(labeledPods.list()).thenReturn(podList)
when(podList.getItems()).thenReturn(Arrays.asList[Pod]())
schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled))
verify(labeledPods, never()).delete()
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
TimeUnit.MILLISECONDS)
verify(labeledPods, never()).delete()
when(podList.getItems()).thenReturn(Arrays.asList(mock(classOf[Pod])))
schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
verify(labeledPods, never()).delete()
requestExecutorsService.runNextPendingCommand()
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
TimeUnit.MILLISECONDS)
verify(labeledPods).delete()
}
test("Request total executors") {
schedulerBackendUnderTest.start()
schedulerBackendUnderTest.doRequestTotalExecutors(5)
verify(podAllocator).setTotalExpectedExecutors(3)
verify(podAllocator, never()).setTotalExpectedExecutors(5)
requestExecutorsService.runNextPendingCommand()
verify(podAllocator).setTotalExpectedExecutors(5)
}
}