[SPARK-36052][K8S] Introducing a limit for pending PODs

Introducing a limit for pending PODs (newly created/requested executors included).
This limit is global for all the resource profiles. So first we have to count all the newly created and pending PODs (decreased by the ones which requested to be deleted) then we can share the remaining pending POD slots among the resource profiles.

Without this PR dynamic allocation could request too many PODs and the K8S scheduler could be overloaded and scheduling of PODs will be affected by the load.

No.

With new unit tests.

Closes #33492 from attilapiros/SPARK-36052.

Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 1dced492fb)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
attilapiros 2021-08-10 20:16:21 -07:00 committed by Dongjoon Hyun
parent 41e5144b53
commit eb09be9e68
3 changed files with 148 additions and 26 deletions

View file

@ -543,6 +543,18 @@ private[spark] object Config extends Logging {
.checkValue(delay => delay > 0, "delay must be a positive time value") .checkValue(delay => delay > 0, "delay must be a positive time value")
.createWithDefaultString("30s") .createWithDefaultString("30s")
val KUBERNETES_MAX_PENDING_PODS =
ConfigBuilder("spark.kubernetes.allocation.maxPendingPods")
.doc("Maximum number of pending PODs allowed during executor allocation for this " +
"application. Those newly requested executors which are unknown by Kubernetes yet are " +
"also counted into this limit as they will change into pending PODs by time. " +
"This limit is independent from the resource profiles as it limits the sum of all " +
"allocation for all the used resource profiles.")
.version("3.2.0")
.intConf
.checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer")
.createWithDefault(Int.MaxValue)
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation." val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation."

View file

@ -57,6 +57,8 @@ private[spark] class ExecutorPodsAllocator(
private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)
private val podCreationTimeout = math.max( private val podCreationTimeout = math.max(
podAllocationDelay * 5, podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT)) conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
@ -217,9 +219,15 @@ private[spark] class ExecutorPodsAllocator(
} }
} }
// sum of all the pending pods unknown by the scheduler (total for all the resources)
var totalPendingCount = 0 var totalPendingCount = 0
// The order we request executors for each ResourceProfile is not guaranteed. // total not running pods (including scheduler known & unknown, pending & newly requested ones)
totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, targetNum) => var totalNotRunningPodCount = 0
val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId
.asScala
.toSeq
.sortBy(_._1)
.flatMap { case (rpId, targetNum) =>
val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty) val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty)
val currentRunningCount = podsForRpId.values.count { val currentRunningCount = podsForRpId.values.count {
@ -235,7 +243,7 @@ private[spark] class ExecutorPodsAllocator(
} }
// This variable is used later to print some debug logs. It's updated when cleaning up // This variable is used later to print some debug logs. It's updated when cleaning up
// excess pod requests, since currentPendingExecutorsForRpId is immutable. // excess pod requests, since currentPendingExecutorsForRpId is immutable.
var knownPendingCount = currentPendingExecutorsForRpId.size var pendingCountForRpId = currentPendingExecutorsForRpId.size
val newlyCreatedExecutorsForRpId = val newlyCreatedExecutorsForRpId =
newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) => newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
@ -248,12 +256,12 @@ private[spark] class ExecutorPodsAllocator(
} }
if (podsForRpId.nonEmpty) { if (podsForRpId.nonEmpty) {
logDebug(s"ResourceProfile Id: $rpId " + logDebug(s"ResourceProfile Id: $rpId (" +
s"pod allocation status: $currentRunningCount running, " + s"pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutorsForRpId.size} unknown pending, " + s"${currentPendingExecutorsForRpId.size} unknown pending, " +
s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " + s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " +
s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " + s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " +
s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created.") s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created)")
} }
// It's possible that we have outstanding pods that are outdated when dynamic allocation // It's possible that we have outstanding pods that are outdated when dynamic allocation
@ -264,21 +272,22 @@ private[spark] class ExecutorPodsAllocator(
// //
// TODO: with dynamic allocation off, handle edge cases if we end up with more running // TODO: with dynamic allocation off, handle edge cases if we end up with more running
// executors than expected. // executors than expected.
val knownPodCount = currentRunningCount + var notRunningPodCountForRpId =
currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size + currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size +
newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size
val podCountForRpId = currentRunningCount + notRunningPodCountForRpId
if (knownPodCount > targetNum) { if (podCountForRpId > targetNum) {
val excess = knownPodCount - targetNum val excess = podCountForRpId - targetNum
val newlyCreatedToDelete = newlyCreatedExecutorsForRpId val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
.filter { case (_, (_, createTime)) => .filter { case (_, (_, createTime)) =>
currentTime - createTime > executorIdleTimeout currentTime - createTime > executorIdleTimeout
}.keys.take(excess).toList }.keys.take(excess).toList
val knownPendingToDelete = currentPendingExecutorsForRpId val pendingToDelete = currentPendingExecutorsForRpId
.filter(x => isExecutorIdleTimedOut(x._2, currentTime)) .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
.take(excess - newlyCreatedToDelete.size) .take(excess - newlyCreatedToDelete.size)
.map { case (id, _) => id } .map { case (id, _) => id }
val toDelete = newlyCreatedToDelete ++ knownPendingToDelete val toDelete = newlyCreatedToDelete ++ pendingToDelete
if (toDelete.nonEmpty) { if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).") logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
@ -293,32 +302,49 @@ private[spark] class ExecutorPodsAllocator(
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete() .delete()
newlyCreatedExecutors --= newlyCreatedToDelete newlyCreatedExecutors --= newlyCreatedToDelete
knownPendingCount -= knownPendingToDelete.size pendingCountForRpId -= pendingToDelete.size
notRunningPodCountForRpId -= toDelete.size
} }
} }
} }
totalPendingCount += pendingCountForRpId
if (newlyCreatedExecutorsForRpId.isEmpty totalNotRunningPodCount += notRunningPodCountForRpId
&& knownPodCount < targetNum) {
requestNewExecutors(targetNum, knownPodCount, applicationId, rpId, k8sKnownPVCNames)
}
totalPendingCount += knownPendingCount
// The code below just prints debug messages, which are only useful when there's a change // The code below just prints debug messages, which are only useful when there's a change
// in the snapshot state. Since the messages are a little spammy, avoid them when we know // in the snapshot state. Since the messages are a little spammy, avoid them when we know
// there are no useful updates. // there are no useful updates.
if (log.isDebugEnabled && snapshots.nonEmpty) { if (log.isDebugEnabled && snapshots.nonEmpty) {
val outstanding = knownPendingCount + newlyCreatedExecutorsForRpId.size val outstanding = pendingCountForRpId + newlyCreatedExecutorsForRpId.size
if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) { if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " + logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " +
"equal to the number of requested executors. Not scaling up further.") "equal to the number of requested executors. Not scaling up further.")
} else { } else {
if (outstanding > 0) { if (newlyCreatedExecutorsForRpId.nonEmpty) {
logDebug(s"Still waiting for $outstanding executors for ResourceProfile " + logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} executors for " +
s"Id $rpId before requesting more.") s"ResourceProfile Id $rpId before requesting more.")
} }
} }
} }
if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) {
Some(rpId, podCountForRpId, targetNum)
} else {
// for this resource profile we do not request more PODs
None
}
}
val remainingSlotFromPendingPods = maxPendingPods - totalNotRunningPodCount
if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0) {
ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, remainingSlotFromPendingPods)
.foreach { case ((rpId, podCountForRpId, targetNum), sharedSlotFromPendingPods) =>
val numMissingPodsForRpId = targetNum - podCountForRpId
val numExecutorsToAllocate =
math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
s"ResourceProfile Id: $rpId, target: $targetNum, known: $podCountForRpId, " +
s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.")
requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames)
}
} }
deletedExecutorIds = _deletedExecutorIds deletedExecutorIds = _deletedExecutorIds
@ -347,14 +373,10 @@ private[spark] class ExecutorPodsAllocator(
} }
private def requestNewExecutors( private def requestNewExecutors(
expected: Int, numExecutorsToAllocate: Int,
running: Int,
applicationId: String, applicationId: String,
resourceProfileId: Int, resourceProfileId: Int,
pvcsInUse: Seq[String]): Unit = { pvcsInUse: Seq[String]): Unit = {
val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
// Check reusable PVCs for this executor allocation batch // Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
for ( _ <- 0 until numExecutorsToAllocate) { for ( _ <- 0 until numExecutorsToAllocate) {
@ -440,3 +462,14 @@ private[spark] class ExecutorPodsAllocator(
} }
} }
} }
private[spark] object ExecutorPodsAllocator {
// A utility function to split the available slots among the specified consumers
def splitSlots[T](consumers: Seq[T], slots: Int): Seq[(T, Int)] = {
val d = slots / consumers.size
val r = slots % consumers.size
consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - r).map((_, d))
}
}

View file

@ -117,6 +117,83 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
} }
test("SPARK-36052: test splitSlots") {
val seq1 = Seq("a")
assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0)))
assert(ExecutorPodsAllocator.splitSlots(seq1, 1) === Seq(("a", 1)))
assert(ExecutorPodsAllocator.splitSlots(seq1, 2) === Seq(("a", 2)))
val seq2 = Seq("a", "b", "c")
assert(ExecutorPodsAllocator.splitSlots(seq2, 0) === Seq(("a", 0), ("b", 0), ("c", 0)))
assert(ExecutorPodsAllocator.splitSlots(seq2, 1) === Seq(("a", 1), ("b", 0), ("c", 0)))
assert(ExecutorPodsAllocator.splitSlots(seq2, 2) === Seq(("a", 1), ("b", 1), ("c", 0)))
assert(ExecutorPodsAllocator.splitSlots(seq2, 3) === Seq(("a", 1), ("b", 1), ("c", 1)))
assert(ExecutorPodsAllocator.splitSlots(seq2, 4) === Seq(("a", 2), ("b", 1), ("c", 1)))
}
test("SPARK-36052: pending pod limit with multiple resource profiles") {
when(podOperations
.withField("status.phase", "Pending"))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(podOperations)
when(podOperations
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
.thenReturn(podOperations)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
val rpb = new ResourceProfileBuilder()
val ereq = new ExecutorResourceRequests()
val treq = new TaskResourceRequests()
ereq.cores(4).memory("2g")
treq.cpus(2)
rpb.require(ereq).require(treq)
val rp = rpb.build()
val confWithLowMaxPendingPods = conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3")
podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr,
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id))
verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
// Mark executor 2 and 3 as pending, leave 1 as newly created but this does not free up
// any pending pod slot so no new pod is requested
snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id))
snapshotsStore.updatePod(pendingExecutor(3, rp.id))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
verify(podOperations, times(3)).create(any())
verify(podOperations, never()).delete()
// Downscaling for defaultProfile resource ID with 1 executor to make one free slot
// for pendings pods
waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
verify(podOperations).create(podWithAttachedContainerForId(4, rp.id))
verify(podOperations, times(1)).delete()
// Make one pod running this way we have one more free slot for pending pods
snapshotsStore.updatePod(runningExecutor(3, rp.id))
snapshotsStore.updatePod(pendingExecutor(4, rp.id))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
verify(podOperations).create(podWithAttachedContainerForId(5, rp.id))
verify(podOperations, times(1)).delete()
}
test("Initially request executors in batches. Do not request another batch if the" + test("Initially request executors in batches. Do not request another batch if the" +
" first has not finished.") { " first has not finished.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))