From e1909c96fbfc3d3f7808f6ddcadec88cc4d11fb9 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 9 Oct 2020 02:50:38 -0700 Subject: [PATCH] [SPARK-33099][K8S] Respect executor idle timeout conf in ExecutorPodsAllocator ### What changes were proposed in this pull request? This PR aims to protect the executor pod request or pending pod during executor idle timeout. ### Why are the changes needed? In case of dynamic allocation, Apache Spark K8s `ExecutorPodsAllocator` cancels the pod requests or pending pods too eagerly. Like the following example, `ExecutorPodsAllocator` received the new total executor adjust request rapidly in two minutes. Sometimes, it's called 3 times in a single second. It repeats `request` and `delete` on that request or pending pod frequently. This PR is reusing `spark.dynamicAllocation.executorIdleTimeout (default: 60s)` to keep the pod request or pending pod. ``` 20/10/08 05:58:08 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3 20/10/08 05:58:08 INFO ExecutorPodsAllocator: Going to request 3 executors from Kubernetes. 20/10/08 05:58:09 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3 20/10/08 05:58:43 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 1 20/10/08 05:58:47 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 0 20/10/08 05:59:26 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3 20/10/08 05:59:30 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 2 20/10/08 05:59:31 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3 20/10/08 05:59:44 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 2 20/10/08 05:59:44 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 0 20/10/08 05:59:45 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3 20/10/08 05:59:50 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 2 20/10/08 05:59:50 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 1 20/10/08 05:59:50 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 0 20/10/08 05:59:54 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3 20/10/08 05:59:54 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes. ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the newly added test case. Closes #29981 from dongjoon-hyun/SPARK-K8S-INITIAL. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../cluster/k8s/ExecutorPodsAllocator.scala | 30 +++++++++-- .../k8s/ExecutorLifecycleTestUtils.scala | 5 ++ .../k8s/ExecutorPodsAllocatorSuite.scala | 53 ++++++++++++++++++- ...ecutorPodsPollingSnapshotSourceSuite.scala | 8 +-- .../k8s/ExecutorPodsSnapshotSuite.scala | 5 +- ...ExecutorPodsWatchSnapshotSourceSuite.scala | 10 ++-- 6 files changed, 97 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 774ef34f69..5e09de37f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.scheduler.cluster.k8s +import java.time.Instant +import java.time.format.DateTimeParseException import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong} import scala.collection.mutable @@ -30,6 +32,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesConf import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT import org.apache.spark.util.{Clock, Utils} private[spark] class ExecutorPodsAllocator( @@ -50,6 +53,8 @@ private[spark] class ExecutorPodsAllocator( private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000) + private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000 + private val namespace = conf.get(KUBERNETES_NAMESPACE) private val kubernetesDriverPodName = conf @@ -87,6 +92,7 @@ private[spark] class ExecutorPodsAllocator( } def setTotalExpectedExecutors(total: Int): Unit = { + logDebug(s"Set totalExpectedExecutors to $total") totalExpectedExecutors.set(total) if (!hasPendingPods.get()) { snapshotsStore.notifySubscribers() @@ -149,7 +155,6 @@ private[spark] class ExecutorPodsAllocator( case (_, PodPending(_)) => true case _ => false } - .map { case (id, _) => id } // Make a local, non-volatile copy of the reference since it's used multiple times. This // is the only method that modifies the list, so this is safe. @@ -173,7 +178,8 @@ private[spark] class ExecutorPodsAllocator( // It's possible that we have outstanding pods that are outdated when dynamic allocation // decides to downscale the application. So check if we can release any pending pods early // instead of waiting for them to time out. Drop them first from the unacknowledged list, - // then from the pending. + // then from the pending. However, in order to prevent too frequent frunctuation, newly + // requested pods are protected during executorIdleTimeout period. // // TODO: with dynamic allocation off, handle edge cases if we end up with more running // executors than expected. @@ -181,8 +187,13 @@ private[spark] class ExecutorPodsAllocator( newlyCreatedExecutors.size if (knownPodCount > currentTotalExpectedExecutors) { val excess = knownPodCount - currentTotalExpectedExecutors - val knownPendingToDelete = currentPendingExecutors.take(excess - newlyCreatedExecutors.size) - val toDelete = newlyCreatedExecutors.keys.take(excess).toList ++ knownPendingToDelete + val knownPendingToDelete = currentPendingExecutors + .filter(x => isExecutorIdleTimedOut(x._2, currentTime)) + .map { case (id, _) => id } + .take(excess - newlyCreatedExecutors.size) + val toDelete = newlyCreatedExecutors + .filter(x => currentTime - x._2 > executorIdleTimeout) + .keys.take(excess).toList ++ knownPendingToDelete if (toDelete.nonEmpty) { logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).") @@ -268,4 +279,15 @@ private[spark] class ExecutorPodsAllocator( } } } + + private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = { + try { + val startTime = Instant.parse(state.pod.getStatus.getStartTime).toEpochMilli() + currentTime - startTime > executorIdleTimeout + } catch { + case _: Exception => + logDebug(s"Cannot get startTime of pod ${state.pod}") + true + } + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala index 2e883623a4..0377e54f3c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.scheduler.cluster.k8s +import java.time.Instant + import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder} import org.apache.spark.deploy.k8s.Constants._ @@ -29,6 +31,7 @@ object ExecutorLifecycleTestUtils { new PodBuilder(podWithAttachedContainerForId(executorId)) .editOrNewStatus() .withPhase("failed") + .withStartTime(Instant.now.toString) .addNewContainerStatus() .withName("spark-executor") .withImage("k8s-spark") @@ -59,6 +62,7 @@ object ExecutorLifecycleTestUtils { new PodBuilder(podWithAttachedContainerForId(executorId)) .editOrNewStatus() .withPhase("pending") + .withStartTime(Instant.now.toString) .endStatus() .build() } @@ -67,6 +71,7 @@ object ExecutorLifecycleTestUtils { new PodBuilder(podWithAttachedContainerForId(executorId)) .editOrNewStatus() .withPhase("running") + .withStartTime(Instant.now.toString) .endStatus() .build() } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index e4b36e4659..c1c33b2a0f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.scheduler.cluster.k8s +import java.time.Instant + import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource @@ -31,6 +33,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSp import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ import org.apache.spark.util.ManualClock @@ -47,11 +50,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .endMetadata() .build() - private val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + private val conf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s") private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L) + private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000 private val secMgr = new SecurityManager(conf) private var waitForExecutorPodsClock: ManualClock = _ @@ -159,6 +165,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) .thenReturn(podOperations) + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + // Target 1 executor, make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(1) verify(podOperations).create(podWithAttachedContainerForId(1)) @@ -184,6 +193,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(podOperations, never()).delete() // Scale down to 1. Pending executors (both acknowledged and not) should be deleted. + waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(1) snapshotsStore.notifySubscribers() verify(podOperations, times(4)).create(any()) @@ -202,6 +212,47 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(!podsAllocatorUnderTest.isDeleted("4")) } + test("SPARK-33099: Respect executor idle timeout configuration") { + when(podOperations + .withField("status.phase", "Pending")) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(podOperations) + when(podOperations + .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) + .thenReturn(podOperations) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + podsAllocatorUnderTest.setTotalExpectedExecutors(5) + verify(podOperations).create(podWithAttachedContainerForId(1)) + verify(podOperations).create(podWithAttachedContainerForId(2)) + verify(podOperations).create(podWithAttachedContainerForId(3)) + verify(podOperations).create(podWithAttachedContainerForId(4)) + verify(podOperations).create(podWithAttachedContainerForId(5)) + verify(podOperations, times(5)).create(any()) + + snapshotsStore.updatePod(pendingExecutor(1)) + snapshotsStore.updatePod(pendingExecutor(2)) + + // Newly created executors (both acknowledged and not) are protected by executorIdleTimeout + podsAllocatorUnderTest.setTotalExpectedExecutors(0) + snapshotsStore.notifySubscribers() + verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") + verify(podOperations, never()).delete() + + // Newly created executors (both acknowledged and not) are cleaned up. + waitForExecutorPodsClock.advance(executorIdleTimeout * 2) + snapshotsStore.notifySubscribers() + verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") + verify(podOperations).delete() + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala index 63e43bd40c..a8e825678d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala @@ -77,13 +77,15 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn } test("Items returned by the API should be pushed to the event queue") { + val exec1 = runningExecutor(1) + val exec2 = runningExecutor(2) when(activeExecutorPods.list()) .thenReturn(new PodListBuilder() .addToItems( - runningExecutor(1), - runningExecutor(2)) + exec1, + exec2) .build()) pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS) - verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), runningExecutor(2))) + verify(eventQueue).replaceSnapshot(Seq(exec1, exec2)) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala index 70e19c904e..6ca1733bcd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala @@ -50,11 +50,12 @@ class ExecutorPodsSnapshotSuite extends SparkFunSuite { Map( 0L -> PodPending(originalPods(0)), 1L -> PodSucceeded(succeededExecutor(1)))) - val snapshotWithNewPod = snapshotWithUpdatedPod.withUpdate(pendingExecutor(2)) + val pendingExec = pendingExecutor(2) + val snapshotWithNewPod = snapshotWithUpdatedPod.withUpdate(pendingExec) assert(snapshotWithNewPod.executorPods === Map( 0L -> PodPending(originalPods(0)), 1L -> PodSucceeded(succeededExecutor(1)), - 2L -> PodPending(pendingExecutor(2)))) + 2L -> PodPending(pendingExec))) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala index ac1968b4ff..e35fc83019 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala @@ -67,9 +67,11 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA } test("Watch events should be pushed to the snapshots store as snapshot updates.") { - watch.getValue.eventReceived(Action.ADDED, runningExecutor(1)) - watch.getValue.eventReceived(Action.MODIFIED, runningExecutor(2)) - verify(eventQueue).updatePod(runningExecutor(1)) - verify(eventQueue).updatePod(runningExecutor(2)) + val exec1 = runningExecutor(1) + val exec2 = runningExecutor(2) + watch.getValue.eventReceived(Action.ADDED, exec1) + watch.getValue.eventReceived(Action.MODIFIED, exec2) + verify(eventQueue).updatePod(exec1) + verify(eventQueue).updatePod(exec2) } }