[SPARK-33099][K8S] Respect executor idle timeout conf in ExecutorPodsAllocator

### What changes were proposed in this pull request?

This PR aims to protect the executor pod request or pending pod during executor idle timeout.

### Why are the changes needed?

In case of dynamic allocation, Apache Spark K8s `ExecutorPodsAllocator` cancels the pod requests or pending pods too eagerly. Like the following example, `ExecutorPodsAllocator` received the new total executor adjust request rapidly in two minutes. Sometimes, it's called 3 times in a single second. It repeats `request` and `delete` on that request or pending pod frequently. This PR is reusing `spark.dynamicAllocation.executorIdleTimeout (default: 60s)` to keep the pod request or pending pod.

```
20/10/08 05:58:08 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3
20/10/08 05:58:08 INFO ExecutorPodsAllocator: Going to request 3 executors from Kubernetes.
20/10/08 05:58:09 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3
20/10/08 05:58:43 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 1
20/10/08 05:58:47 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 0
20/10/08 05:59:26 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3
20/10/08 05:59:30 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 2
20/10/08 05:59:31 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3
20/10/08 05:59:44 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 2
20/10/08 05:59:44 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 0
20/10/08 05:59:45 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3
20/10/08 05:59:50 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 2
20/10/08 05:59:50 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 1
20/10/08 05:59:50 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 0
20/10/08 05:59:54 INFO ExecutorPodsAllocator: Set totalExpectedExecutors to 3
20/10/08 05:59:54 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes.
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the newly added test case.

Closes #29981 from dongjoon-hyun/SPARK-K8S-INITIAL.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2020-10-09 02:50:38 -07:00
parent 1234c66fa6
commit e1909c96fb
6 changed files with 97 additions and 14 deletions

View file

@ -16,6 +16,8 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant
import java.time.format.DateTimeParseException
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
import scala.collection.mutable
@ -30,6 +32,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
import org.apache.spark.util.{Clock, Utils}
private[spark] class ExecutorPodsAllocator(
@ -50,6 +53,8 @@ private[spark] class ExecutorPodsAllocator(
private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000)
private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000
private val namespace = conf.get(KUBERNETES_NAMESPACE)
private val kubernetesDriverPodName = conf
@ -87,6 +92,7 @@ private[spark] class ExecutorPodsAllocator(
}
def setTotalExpectedExecutors(total: Int): Unit = {
logDebug(s"Set totalExpectedExecutors to $total")
totalExpectedExecutors.set(total)
if (!hasPendingPods.get()) {
snapshotsStore.notifySubscribers()
@ -149,7 +155,6 @@ private[spark] class ExecutorPodsAllocator(
case (_, PodPending(_)) => true
case _ => false
}
.map { case (id, _) => id }
// Make a local, non-volatile copy of the reference since it's used multiple times. This
// is the only method that modifies the list, so this is safe.
@ -173,7 +178,8 @@ private[spark] class ExecutorPodsAllocator(
// It's possible that we have outstanding pods that are outdated when dynamic allocation
// decides to downscale the application. So check if we can release any pending pods early
// instead of waiting for them to time out. Drop them first from the unacknowledged list,
// then from the pending.
// then from the pending. However, in order to prevent too frequent frunctuation, newly
// requested pods are protected during executorIdleTimeout period.
//
// TODO: with dynamic allocation off, handle edge cases if we end up with more running
// executors than expected.
@ -181,8 +187,13 @@ private[spark] class ExecutorPodsAllocator(
newlyCreatedExecutors.size
if (knownPodCount > currentTotalExpectedExecutors) {
val excess = knownPodCount - currentTotalExpectedExecutors
val knownPendingToDelete = currentPendingExecutors.take(excess - newlyCreatedExecutors.size)
val toDelete = newlyCreatedExecutors.keys.take(excess).toList ++ knownPendingToDelete
val knownPendingToDelete = currentPendingExecutors
.filter(x => isExecutorIdleTimedOut(x._2, currentTime))
.map { case (id, _) => id }
.take(excess - newlyCreatedExecutors.size)
val toDelete = newlyCreatedExecutors
.filter(x => currentTime - x._2 > executorIdleTimeout)
.keys.take(excess).toList ++ knownPendingToDelete
if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
@ -268,4 +279,15 @@ private[spark] class ExecutorPodsAllocator(
}
}
}
private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = {
try {
val startTime = Instant.parse(state.pod.getStatus.getStartTime).toEpochMilli()
currentTime - startTime > executorIdleTimeout
} catch {
case _: Exception =>
logDebug(s"Cannot get startTime of pod ${state.pod}")
true
}
}
}

View file

@ -16,6 +16,8 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant
import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder}
import org.apache.spark.deploy.k8s.Constants._
@ -29,6 +31,7 @@ object ExecutorLifecycleTestUtils {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewStatus()
.withPhase("failed")
.withStartTime(Instant.now.toString)
.addNewContainerStatus()
.withName("spark-executor")
.withImage("k8s-spark")
@ -59,6 +62,7 @@ object ExecutorLifecycleTestUtils {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewStatus()
.withPhase("pending")
.withStartTime(Instant.now.toString)
.endStatus()
.build()
}
@ -67,6 +71,7 @@ object ExecutorLifecycleTestUtils {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewStatus()
.withPhase("running")
.withStartTime(Instant.now.toString)
.endStatus()
.build()
}

View file

@ -16,6 +16,8 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
@ -31,6 +33,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSp
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
import org.apache.spark.util.ManualClock
@ -47,11 +50,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
.endMetadata()
.build()
private val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
private val conf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s")
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L)
private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000
private val secMgr = new SecurityManager(conf)
private var waitForExecutorPodsClock: ManualClock = _
@ -159,6 +165,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
.thenReturn(podOperations)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
// Target 1 executor, make sure it's requested, even with an empty initial snapshot.
podsAllocatorUnderTest.setTotalExpectedExecutors(1)
verify(podOperations).create(podWithAttachedContainerForId(1))
@ -184,6 +193,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
verify(podOperations, never()).delete()
// Scale down to 1. Pending executors (both acknowledged and not) should be deleted.
waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
podsAllocatorUnderTest.setTotalExpectedExecutors(1)
snapshotsStore.notifySubscribers()
verify(podOperations, times(4)).create(any())
@ -202,6 +212,47 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(!podsAllocatorUnderTest.isDeleted("4"))
}
test("SPARK-33099: Respect executor idle timeout configuration") {
when(podOperations
.withField("status.phase", "Pending"))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(podOperations)
when(podOperations
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
.thenReturn(podOperations)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
podsAllocatorUnderTest.setTotalExpectedExecutors(5)
verify(podOperations).create(podWithAttachedContainerForId(1))
verify(podOperations).create(podWithAttachedContainerForId(2))
verify(podOperations).create(podWithAttachedContainerForId(3))
verify(podOperations).create(podWithAttachedContainerForId(4))
verify(podOperations).create(podWithAttachedContainerForId(5))
verify(podOperations, times(5)).create(any())
snapshotsStore.updatePod(pendingExecutor(1))
snapshotsStore.updatePod(pendingExecutor(2))
// Newly created executors (both acknowledged and not) are protected by executorIdleTimeout
podsAllocatorUnderTest.setTotalExpectedExecutors(0)
snapshotsStore.notifySubscribers()
verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5")
verify(podOperations, never()).delete()
// Newly created executors (both acknowledged and not) are cleaned up.
waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
snapshotsStore.notifySubscribers()
verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5")
verify(podOperations).delete()
}
private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)

View file

@ -77,13 +77,15 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
}
test("Items returned by the API should be pushed to the event queue") {
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
when(activeExecutorPods.list())
.thenReturn(new PodListBuilder()
.addToItems(
runningExecutor(1),
runningExecutor(2))
exec1,
exec2)
.build())
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), runningExecutor(2)))
verify(eventQueue).replaceSnapshot(Seq(exec1, exec2))
}
}

View file

@ -50,11 +50,12 @@ class ExecutorPodsSnapshotSuite extends SparkFunSuite {
Map(
0L -> PodPending(originalPods(0)),
1L -> PodSucceeded(succeededExecutor(1))))
val snapshotWithNewPod = snapshotWithUpdatedPod.withUpdate(pendingExecutor(2))
val pendingExec = pendingExecutor(2)
val snapshotWithNewPod = snapshotWithUpdatedPod.withUpdate(pendingExec)
assert(snapshotWithNewPod.executorPods ===
Map(
0L -> PodPending(originalPods(0)),
1L -> PodSucceeded(succeededExecutor(1)),
2L -> PodPending(pendingExecutor(2))))
2L -> PodPending(pendingExec)))
}
}

View file

@ -67,9 +67,11 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
}
test("Watch events should be pushed to the snapshots store as snapshot updates.") {
watch.getValue.eventReceived(Action.ADDED, runningExecutor(1))
watch.getValue.eventReceived(Action.MODIFIED, runningExecutor(2))
verify(eventQueue).updatePod(runningExecutor(1))
verify(eventQueue).updatePod(runningExecutor(2))
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
watch.getValue.eventReceived(Action.ADDED, exec1)
watch.getValue.eventReceived(Action.MODIFIED, exec2)
verify(eventQueue).updatePod(exec1)
verify(eventQueue).updatePod(exec2)
}
}