diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 417c545992..2af05f1b74 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -594,6 +594,18 @@ private[spark] object Config extends Logging { .checkValue(delay => delay > 0, "delay must be a positive time value") .createWithDefaultString("30s") + val KUBERNETES_MAX_PENDING_PODS = + ConfigBuilder("spark.kubernetes.allocation.maxPendingPods") + .doc("Maximum number of pending PODs allowed during executor allocation for this " + + "application. Those newly requested executors which are unknown by Kubernetes yet are " + + "also counted into this limit as they will change into pending PODs by time. " + + "This limit is independent from the resource profiles as it limits the sum of all " + + "allocation for all the used resource profiles.") + .version("3.3.0") + .intConf + .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") + .createWithDefault(Int.MaxValue) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index d6dc13e22e..cee53600c2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -57,6 +57,8 @@ private[spark] class ExecutorPodsAllocator( private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS) + private val podCreationTimeout = math.max( podAllocationDelay * 5, conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT)) @@ -217,9 +219,15 @@ private[spark] class ExecutorPodsAllocator( } } + // sum of all the pending pods unknown by the scheduler (total for all the resources) var totalPendingCount = 0 - // The order we request executors for each ResourceProfile is not guaranteed. - totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, targetNum) => + // total not running pods (including scheduler known & unknown, pending & newly requested ones) + var totalNotRunningPodCount = 0 + val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId + .asScala + .toSeq + .sortBy(_._1) + .flatMap { case (rpId, targetNum) => val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty) val currentRunningCount = podsForRpId.values.count { @@ -235,7 +243,7 @@ private[spark] class ExecutorPodsAllocator( } // This variable is used later to print some debug logs. It's updated when cleaning up // excess pod requests, since currentPendingExecutorsForRpId is immutable. - var knownPendingCount = currentPendingExecutorsForRpId.size + var pendingCountForRpId = currentPendingExecutorsForRpId.size val newlyCreatedExecutorsForRpId = newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) => @@ -248,12 +256,12 @@ private[spark] class ExecutorPodsAllocator( } if (podsForRpId.nonEmpty) { - logDebug(s"ResourceProfile Id: $rpId " + + logDebug(s"ResourceProfile Id: $rpId (" + s"pod allocation status: $currentRunningCount running, " + s"${currentPendingExecutorsForRpId.size} unknown pending, " + s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " + s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " + - s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created.") + s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created)") } // It's possible that we have outstanding pods that are outdated when dynamic allocation @@ -264,21 +272,22 @@ private[spark] class ExecutorPodsAllocator( // // TODO: with dynamic allocation off, handle edge cases if we end up with more running // executors than expected. - val knownPodCount = currentRunningCount + + var notRunningPodCountForRpId = currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size + newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size + val podCountForRpId = currentRunningCount + notRunningPodCountForRpId - if (knownPodCount > targetNum) { - val excess = knownPodCount - targetNum + if (podCountForRpId > targetNum) { + val excess = podCountForRpId - targetNum val newlyCreatedToDelete = newlyCreatedExecutorsForRpId .filter { case (_, (_, createTime)) => currentTime - createTime > executorIdleTimeout }.keys.take(excess).toList - val knownPendingToDelete = currentPendingExecutorsForRpId + val pendingToDelete = currentPendingExecutorsForRpId .filter(x => isExecutorIdleTimedOut(x._2, currentTime)) .take(excess - newlyCreatedToDelete.size) .map { case (id, _) => id } - val toDelete = newlyCreatedToDelete ++ knownPendingToDelete + val toDelete = newlyCreatedToDelete ++ pendingToDelete if (toDelete.nonEmpty) { logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).") @@ -293,32 +302,49 @@ private[spark] class ExecutorPodsAllocator( .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*) .delete() newlyCreatedExecutors --= newlyCreatedToDelete - knownPendingCount -= knownPendingToDelete.size + pendingCountForRpId -= pendingToDelete.size + notRunningPodCountForRpId -= toDelete.size } } } - - if (newlyCreatedExecutorsForRpId.isEmpty - && knownPodCount < targetNum) { - requestNewExecutors(targetNum, knownPodCount, applicationId, rpId, k8sKnownPVCNames) - } - totalPendingCount += knownPendingCount + totalPendingCount += pendingCountForRpId + totalNotRunningPodCount += notRunningPodCountForRpId // The code below just prints debug messages, which are only useful when there's a change // in the snapshot state. Since the messages are a little spammy, avoid them when we know // there are no useful updates. if (log.isDebugEnabled && snapshots.nonEmpty) { - val outstanding = knownPendingCount + newlyCreatedExecutorsForRpId.size + val outstanding = pendingCountForRpId + newlyCreatedExecutorsForRpId.size if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) { logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " + "equal to the number of requested executors. Not scaling up further.") } else { - if (outstanding > 0) { - logDebug(s"Still waiting for $outstanding executors for ResourceProfile " + - s"Id $rpId before requesting more.") + if (newlyCreatedExecutorsForRpId.nonEmpty) { + logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} executors for " + + s"ResourceProfile Id $rpId before requesting more.") } } } + if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) { + Some(rpId, podCountForRpId, targetNum) + } else { + // for this resource profile we do not request more PODs + None + } + } + + val remainingSlotFromPendingPods = maxPendingPods - totalNotRunningPodCount + if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0) { + ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, remainingSlotFromPendingPods) + .foreach { case ((rpId, podCountForRpId, targetNum), sharedSlotFromPendingPods) => + val numMissingPodsForRpId = targetNum - podCountForRpId + val numExecutorsToAllocate = + math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods) + logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + + s"ResourceProfile Id: $rpId, target: $targetNum, known: $podCountForRpId, " + + s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.") + requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames) + } } deletedExecutorIds = _deletedExecutorIds @@ -347,14 +373,10 @@ private[spark] class ExecutorPodsAllocator( } private def requestNewExecutors( - expected: Int, - running: Int, + numExecutorsToAllocate: Int, applicationId: String, resourceProfileId: Int, pvcsInUse: Seq[String]): Unit = { - val numExecutorsToAllocate = math.min(expected - running, podAllocationSize) - logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + - s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) for ( _ <- 0 until numExecutorsToAllocate) { @@ -440,3 +462,14 @@ private[spark] class ExecutorPodsAllocator( } } } + +private[spark] object ExecutorPodsAllocator { + + // A utility function to split the available slots among the specified consumers + def splitSlots[T](consumers: Seq[T], slots: Int): Seq[(T, Int)] = { + val d = slots / consumers.size + val r = slots % consumers.size + consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - r).map((_, d)) + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index a54f1a105b..5b33da6d2b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -117,6 +117,83 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) } + test("SPARK-36052: test splitSlots") { + val seq1 = Seq("a") + assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0))) + assert(ExecutorPodsAllocator.splitSlots(seq1, 1) === Seq(("a", 1))) + assert(ExecutorPodsAllocator.splitSlots(seq1, 2) === Seq(("a", 2))) + + val seq2 = Seq("a", "b", "c") + assert(ExecutorPodsAllocator.splitSlots(seq2, 0) === Seq(("a", 0), ("b", 0), ("c", 0))) + assert(ExecutorPodsAllocator.splitSlots(seq2, 1) === Seq(("a", 1), ("b", 0), ("c", 0))) + assert(ExecutorPodsAllocator.splitSlots(seq2, 2) === Seq(("a", 1), ("b", 1), ("c", 0))) + assert(ExecutorPodsAllocator.splitSlots(seq2, 3) === Seq(("a", 1), ("b", 1), ("c", 1))) + assert(ExecutorPodsAllocator.splitSlots(seq2, 4) === Seq(("a", 2), ("b", 1), ("c", 1))) + } + + test("SPARK-36052: pending pod limit with multiple resource profiles") { + when(podOperations + .withField("status.phase", "Pending")) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(podOperations) + when(podOperations + .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) + .thenReturn(podOperations) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + val rpb = new ResourceProfileBuilder() + val ereq = new ExecutorResourceRequests() + val treq = new TaskResourceRequests() + ereq.cores(4).memory("2g") + treq.cpus(2) + rpb.require(ereq).require(treq) + val rp = rpb.build() + + val confWithLowMaxPendingPods = conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3") + podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr, + executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(3, rp.id)) + + // Mark executor 2 and 3 as pending, leave 1 as newly created but this does not free up + // any pending pod slot so no new pod is requested + snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id)) + snapshotsStore.updatePod(pendingExecutor(3, rp.id)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations, times(3)).create(any()) + verify(podOperations, never()).delete() + + // Downscaling for defaultProfile resource ID with 1 executor to make one free slot + // for pendings pods + waitForExecutorPodsClock.advance(executorIdleTimeout * 2) + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations).create(podWithAttachedContainerForId(4, rp.id)) + verify(podOperations, times(1)).delete() + + // Make one pod running this way we have one more free slot for pending pods + snapshotsStore.updatePod(runningExecutor(3, rp.id)) + snapshotsStore.updatePod(pendingExecutor(4, rp.id)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations).create(podWithAttachedContainerForId(5, rp.id)) + verify(podOperations, times(1)).delete() + } + test("Initially request executors in batches. Do not request another batch if the" + " first has not finished.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))