From eb09be9e68737bf2f29ca5391874f4c5fa0de3e2 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Tue, 10 Aug 2021 20:16:21 -0700 Subject: [PATCH] [SPARK-36052][K8S] Introducing a limit for pending PODs Introducing a limit for pending PODs (newly created/requested executors included). This limit is global for all the resource profiles. So first we have to count all the newly created and pending PODs (decreased by the ones which requested to be deleted) then we can share the remaining pending POD slots among the resource profiles. Without this PR dynamic allocation could request too many PODs and the K8S scheduler could be overloaded and scheduling of PODs will be affected by the load. No. With new unit tests. Closes #33492 from attilapiros/SPARK-36052. Authored-by: attilapiros Signed-off-by: Dongjoon Hyun (cherry picked from commit 1dced492fb286a7ada73d886fe264f5df0e2b3da) Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/deploy/k8s/Config.scala | 12 +++ .../cluster/k8s/ExecutorPodsAllocator.scala | 85 +++++++++++++------ .../k8s/ExecutorPodsAllocatorSuite.scala | 77 +++++++++++++++++ 3 files changed, 148 insertions(+), 26 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 9fceca783a..4d352f721f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -543,6 +543,18 @@ private[spark] object Config extends Logging { .checkValue(delay => delay > 0, "delay must be a positive time value") .createWithDefaultString("30s") + val KUBERNETES_MAX_PENDING_PODS = + ConfigBuilder("spark.kubernetes.allocation.maxPendingPods") + .doc("Maximum number of pending PODs allowed during executor allocation for this " + + "application. Those newly requested executors which are unknown by Kubernetes yet are " + + "also counted into this limit as they will change into pending PODs by time. " + + "This limit is independent from the resource profiles as it limits the sum of all " + + "allocation for all the used resource profiles.") + .version("3.2.0") + .intConf + .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") + .createWithDefault(Int.MaxValue) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index d6dc13e22e..cee53600c2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -57,6 +57,8 @@ private[spark] class ExecutorPodsAllocator( private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS) + private val podCreationTimeout = math.max( podAllocationDelay * 5, conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT)) @@ -217,9 +219,15 @@ private[spark] class ExecutorPodsAllocator( } } + // sum of all the pending pods unknown by the scheduler (total for all the resources) var totalPendingCount = 0 - // The order we request executors for each ResourceProfile is not guaranteed. - totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, targetNum) => + // total not running pods (including scheduler known & unknown, pending & newly requested ones) + var totalNotRunningPodCount = 0 + val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId + .asScala + .toSeq + .sortBy(_._1) + .flatMap { case (rpId, targetNum) => val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty) val currentRunningCount = podsForRpId.values.count { @@ -235,7 +243,7 @@ private[spark] class ExecutorPodsAllocator( } // This variable is used later to print some debug logs. It's updated when cleaning up // excess pod requests, since currentPendingExecutorsForRpId is immutable. - var knownPendingCount = currentPendingExecutorsForRpId.size + var pendingCountForRpId = currentPendingExecutorsForRpId.size val newlyCreatedExecutorsForRpId = newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) => @@ -248,12 +256,12 @@ private[spark] class ExecutorPodsAllocator( } if (podsForRpId.nonEmpty) { - logDebug(s"ResourceProfile Id: $rpId " + + logDebug(s"ResourceProfile Id: $rpId (" + s"pod allocation status: $currentRunningCount running, " + s"${currentPendingExecutorsForRpId.size} unknown pending, " + s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " + s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " + - s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created.") + s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created)") } // It's possible that we have outstanding pods that are outdated when dynamic allocation @@ -264,21 +272,22 @@ private[spark] class ExecutorPodsAllocator( // // TODO: with dynamic allocation off, handle edge cases if we end up with more running // executors than expected. - val knownPodCount = currentRunningCount + + var notRunningPodCountForRpId = currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size + newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size + val podCountForRpId = currentRunningCount + notRunningPodCountForRpId - if (knownPodCount > targetNum) { - val excess = knownPodCount - targetNum + if (podCountForRpId > targetNum) { + val excess = podCountForRpId - targetNum val newlyCreatedToDelete = newlyCreatedExecutorsForRpId .filter { case (_, (_, createTime)) => currentTime - createTime > executorIdleTimeout }.keys.take(excess).toList - val knownPendingToDelete = currentPendingExecutorsForRpId + val pendingToDelete = currentPendingExecutorsForRpId .filter(x => isExecutorIdleTimedOut(x._2, currentTime)) .take(excess - newlyCreatedToDelete.size) .map { case (id, _) => id } - val toDelete = newlyCreatedToDelete ++ knownPendingToDelete + val toDelete = newlyCreatedToDelete ++ pendingToDelete if (toDelete.nonEmpty) { logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).") @@ -293,32 +302,49 @@ private[spark] class ExecutorPodsAllocator( .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*) .delete() newlyCreatedExecutors --= newlyCreatedToDelete - knownPendingCount -= knownPendingToDelete.size + pendingCountForRpId -= pendingToDelete.size + notRunningPodCountForRpId -= toDelete.size } } } - - if (newlyCreatedExecutorsForRpId.isEmpty - && knownPodCount < targetNum) { - requestNewExecutors(targetNum, knownPodCount, applicationId, rpId, k8sKnownPVCNames) - } - totalPendingCount += knownPendingCount + totalPendingCount += pendingCountForRpId + totalNotRunningPodCount += notRunningPodCountForRpId // The code below just prints debug messages, which are only useful when there's a change // in the snapshot state. Since the messages are a little spammy, avoid them when we know // there are no useful updates. if (log.isDebugEnabled && snapshots.nonEmpty) { - val outstanding = knownPendingCount + newlyCreatedExecutorsForRpId.size + val outstanding = pendingCountForRpId + newlyCreatedExecutorsForRpId.size if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) { logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " + "equal to the number of requested executors. Not scaling up further.") } else { - if (outstanding > 0) { - logDebug(s"Still waiting for $outstanding executors for ResourceProfile " + - s"Id $rpId before requesting more.") + if (newlyCreatedExecutorsForRpId.nonEmpty) { + logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} executors for " + + s"ResourceProfile Id $rpId before requesting more.") } } } + if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) { + Some(rpId, podCountForRpId, targetNum) + } else { + // for this resource profile we do not request more PODs + None + } + } + + val remainingSlotFromPendingPods = maxPendingPods - totalNotRunningPodCount + if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0) { + ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, remainingSlotFromPendingPods) + .foreach { case ((rpId, podCountForRpId, targetNum), sharedSlotFromPendingPods) => + val numMissingPodsForRpId = targetNum - podCountForRpId + val numExecutorsToAllocate = + math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods) + logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + + s"ResourceProfile Id: $rpId, target: $targetNum, known: $podCountForRpId, " + + s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.") + requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames) + } } deletedExecutorIds = _deletedExecutorIds @@ -347,14 +373,10 @@ private[spark] class ExecutorPodsAllocator( } private def requestNewExecutors( - expected: Int, - running: Int, + numExecutorsToAllocate: Int, applicationId: String, resourceProfileId: Int, pvcsInUse: Seq[String]): Unit = { - val numExecutorsToAllocate = math.min(expected - running, podAllocationSize) - logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + - s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) for ( _ <- 0 until numExecutorsToAllocate) { @@ -440,3 +462,14 @@ private[spark] class ExecutorPodsAllocator( } } } + +private[spark] object ExecutorPodsAllocator { + + // A utility function to split the available slots among the specified consumers + def splitSlots[T](consumers: Seq[T], slots: Int): Seq[(T, Int)] = { + val d = slots / consumers.size + val r = slots % consumers.size + consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - r).map((_, d)) + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index a54f1a105b..5b33da6d2b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -117,6 +117,83 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) } + test("SPARK-36052: test splitSlots") { + val seq1 = Seq("a") + assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0))) + assert(ExecutorPodsAllocator.splitSlots(seq1, 1) === Seq(("a", 1))) + assert(ExecutorPodsAllocator.splitSlots(seq1, 2) === Seq(("a", 2))) + + val seq2 = Seq("a", "b", "c") + assert(ExecutorPodsAllocator.splitSlots(seq2, 0) === Seq(("a", 0), ("b", 0), ("c", 0))) + assert(ExecutorPodsAllocator.splitSlots(seq2, 1) === Seq(("a", 1), ("b", 0), ("c", 0))) + assert(ExecutorPodsAllocator.splitSlots(seq2, 2) === Seq(("a", 1), ("b", 1), ("c", 0))) + assert(ExecutorPodsAllocator.splitSlots(seq2, 3) === Seq(("a", 1), ("b", 1), ("c", 1))) + assert(ExecutorPodsAllocator.splitSlots(seq2, 4) === Seq(("a", 2), ("b", 1), ("c", 1))) + } + + test("SPARK-36052: pending pod limit with multiple resource profiles") { + when(podOperations + .withField("status.phase", "Pending")) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(podOperations) + when(podOperations + .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) + .thenReturn(podOperations) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + val rpb = new ResourceProfileBuilder() + val ereq = new ExecutorResourceRequests() + val treq = new TaskResourceRequests() + ereq.cores(4).memory("2g") + treq.cpus(2) + rpb.require(ereq).require(treq) + val rp = rpb.build() + + val confWithLowMaxPendingPods = conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3") + podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr, + executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(3, rp.id)) + + // Mark executor 2 and 3 as pending, leave 1 as newly created but this does not free up + // any pending pod slot so no new pod is requested + snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id)) + snapshotsStore.updatePod(pendingExecutor(3, rp.id)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations, times(3)).create(any()) + verify(podOperations, never()).delete() + + // Downscaling for defaultProfile resource ID with 1 executor to make one free slot + // for pendings pods + waitForExecutorPodsClock.advance(executorIdleTimeout * 2) + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations).create(podWithAttachedContainerForId(4, rp.id)) + verify(podOperations, times(1)).delete() + + // Make one pod running this way we have one more free slot for pending pods + snapshotsStore.updatePod(runningExecutor(3, rp.id)) + snapshotsStore.updatePod(pendingExecutor(4, rp.id)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations).create(podWithAttachedContainerForId(5, rp.id)) + verify(podOperations, times(1)).delete() + } + test("Initially request executors in batches. Do not request another batch if the" + " first has not finished.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))