[SPARK-29905][K8S] Improve pod lifecycle manager behavior with dynamic allocation

This issue mainly shows up when you enable dynamic allocation:
because there are many executor state changes (because of executors
being requested and starting to run, and later stopped), the lifecycle
manager class could end up logging information about the same executor
multiple times, since the different events would cause the same
executor update to be present in multiple pod snapshots. On top of that,
it could end up making multiple redundant calls into the API server
for the same pod.

Another issue was when the config was set to not delete executor
pods; with dynamic allocation, that means pods keep accumulating
in the API server, and every time the full sync is done by the
polling source, all executors, even the finished ones that Spark
technically does not care about anymore, would be processed.

The change modifies the lifecycle monitor so that it:

- logs executor updates a single time, even if it shows up in
  multiple snapshots, by checking whether the state change
  happened before.
- marks finished-but-not-deleted-in-k8s executors with a label
  so that they can be easily filtered out.

This reduces the amount of logging done by the lifecycle manager,
which is a minor thing in general since the logs are at debug level.
But it also reduces the amount of data that needs to be fetched
from the API server under certain configurations, and overall
reduces interaction with the API server when dynamic allocation is on.

There's also a change in the snapshot store to ensure that the
same subscriber is not called concurrently. That is kind of a bug,
since it means subscribers could be processing snapshots out of order,
or even that they could block multiple threads (e.g. the allocator
callback was synchronized). I actually ran into the "concurrent calls"
situation in the lifecycle manager during testing, and while it did not
seem to cause problems, it did make for some head scratching while
looking at the logs. It seemed safer to fix that.

Unit tests were updated to check for the changes. Also tested in real
cluster with dynamic allocation on.

Closes #26535 from vanzin/SPARK-29905.

Lead-authored-by: Marcelo Vanzin <vanzin@apache.org>
Co-authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Marcelo Vanzin 2020-04-16 14:15:10 -07:00 committed by Dongjoon Hyun
parent 2921be7a4e
commit b8ccd75524
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
7 changed files with 190 additions and 81 deletions

View file

@ -24,6 +24,7 @@ private[spark] object Constants {
val SPARK_ROLE_LABEL = "spark-role"
val SPARK_POD_DRIVER_ROLE = "driver"
val SPARK_POD_EXECUTOR_ROLE = "executor"
val SPARK_EXECUTOR_INACTIVE_LABEL = "spark-exec-inactive"
// Credentials secrets
val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =

View file

@ -94,7 +94,7 @@ private[spark] class ExecutorPodsAllocator(
private def onNewSnapshots(
applicationId: String,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
// For all executors we've created against the API but have not seen in a snapshot
// yet - check the current time. If the current time has exceeded some threshold,

View file

@ -17,13 +17,14 @@
package org.apache.spark.scheduler.cluster.k8s
import com.google.common.cache.Cache
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorExited
@ -41,11 +42,14 @@ private[spark] class ExecutorPodsLifecycleManager(
import ExecutorPodsLifecycleManager._
private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
private lazy val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
// Keep track of which pods are inactive to avoid contacting the API server multiple times.
// This set is cleaned up when a snapshot containing the updated pod is processed.
private val inactivatedPods = mutable.HashSet.empty[Long]
def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
snapshotsStore.addSubscriber(eventProcessingInterval) {
onNewSnapshots(schedulerBackend, _)
}
@ -58,56 +62,78 @@ private[spark] class ExecutorPodsLifecycleManager(
snapshots.foreach { snapshot =>
snapshot.executorPods.foreach { case (execId, state) =>
state match {
case _state if isPodInactive(_state.pod) =>
inactivatedPods -= execId
case deleted@PodDeleted(_) =>
logDebug(s"Snapshot reported deleted executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}")
removeExecutorFromSpark(schedulerBackend, deleted, execId)
execIdsRemovedInThisRound += execId
case failed@PodFailed(_) =>
logDebug(s"Snapshot reported failed executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}")
onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
case succeeded@PodSucceeded(_) =>
if (schedulerBackend.isExecutorActive(execId.toString)) {
logInfo(s"Snapshot reported succeeded executor with id $execId, " +
"even though the application has not requested for it to be removed.")
} else {
logDebug(s"Snapshot reported succeeded executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}.")
if (removeExecutorFromSpark(schedulerBackend, deleted, execId)) {
execIdsRemovedInThisRound += execId
logDebug(s"Snapshot reported deleted executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}")
}
onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
inactivatedPods -= execId
case failed@PodFailed(_) =>
val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
if (onFinalNonDeletedState(failed, execId, schedulerBackend, deleteFromK8s)) {
execIdsRemovedInThisRound += execId
logDebug(s"Snapshot reported failed executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}")
}
case succeeded@PodSucceeded(_) =>
val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
if (onFinalNonDeletedState(succeeded, execId, schedulerBackend, deleteFromK8s)) {
execIdsRemovedInThisRound += execId
if (schedulerBackend.isExecutorActive(execId.toString)) {
logInfo(s"Snapshot reported succeeded executor with id $execId, " +
"even though the application has not requested for it to be removed.")
} else {
logDebug(s"Snapshot reported succeeded executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}.")
}
}
case _ =>
}
}
}
// Clean up any pods from the inactive list that don't match any pods from the last snapshot.
// This makes sure that we don't keep growing that set indefinitely, in case we end up missing
// an update for some pod.
if (inactivatedPods.nonEmpty && snapshots.nonEmpty) {
inactivatedPods.retain(snapshots.last.executorPods.contains(_))
}
// Reconcile the case where Spark claims to know about an executor but the corresponding pod
// is missing from the cluster. This would occur if we miss a deletion event and the pod
// transitions immediately from running io absent. We only need to check against the latest
// transitions immediately from running to absent. We only need to check against the latest
// snapshot for this, and we don't do this for executors in the deleted executors cache or
// that we just removed in this round.
if (snapshots.nonEmpty) {
val latestSnapshot = snapshots.last
(schedulerBackend.getExecutorIds().map(_.toLong).toSet
-- latestSnapshot.executorPods.keySet
-- execIdsRemovedInThisRound).foreach { missingExecutorId =>
if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
s" executor may have been deleted but the driver missed the deletion event."
logDebug(exitReasonMessage)
val exitReason = ExecutorExited(
UNKNOWN_EXIT_CODE,
exitCausedByApp = false,
exitReasonMessage)
schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason)
execIdsRemovedInThisRound += missingExecutorId
}
val lostExecutors = if (snapshots.nonEmpty) {
schedulerBackend.getExecutorIds().map(_.toLong).toSet --
snapshots.last.executorPods.keySet -- execIdsRemovedInThisRound
} else {
Nil
}
lostExecutors.foreach { lostId =>
if (removedExecutorsCache.getIfPresent(lostId) == null) {
val exitReasonMessage = s"The executor with ID $lostId was not found in the" +
s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
s" executor may have been deleted but the driver missed the deletion event."
logDebug(exitReasonMessage)
val exitReason = ExecutorExited(
UNKNOWN_EXIT_CODE,
exitCausedByApp = false,
exitReasonMessage)
schedulerBackend.doRemoveExecutor(lostId.toString, exitReason)
}
}
if (execIdsRemovedInThisRound.nonEmpty) {
logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
if (lostExecutors.nonEmpty) {
logDebug(s"Removed executors with ids ${lostExecutors.mkString(",")}" +
s" from Spark that were either found to be deleted or non-existent in the cluster.")
}
}
@ -116,35 +142,57 @@ private[spark] class ExecutorPodsLifecycleManager(
podState: FinalPodState,
execId: Long,
schedulerBackend: KubernetesClusterSchedulerBackend,
execIdsRemovedInRound: mutable.Set[Long]): Unit = {
removeExecutorFromSpark(schedulerBackend, podState, execId)
if (shouldDeleteExecutors) {
removeExecutorFromK8s(podState.pod)
deleteFromK8s: Boolean): Boolean = {
val deleted = removeExecutorFromSpark(schedulerBackend, podState, execId)
if (deleteFromK8s) {
removeExecutorFromK8s(execId, podState.pod)
}
execIdsRemovedInRound += execId
deleted
}
private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
// If deletion failed on a previous try, we can try again if resync informs us the pod
// is still around.
// Delete as best attempt - duplicate deletes will throw an exception but the end state
// of getting rid of the pod is what matters.
private def removeExecutorFromK8s(execId: Long, updatedPod: Pod): Unit = {
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withName(updatedPod.getMetadata.getName)
.delete()
if (shouldDeleteExecutors) {
// If deletion failed on a previous try, we can try again if resync informs us the pod
// is still around.
// Delete as best attempt - duplicate deletes will throw an exception but the end state
// of getting rid of the pod is what matters.
kubernetesClient
.pods()
.withName(updatedPod.getMetadata.getName)
.delete()
} else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) {
// If the config is set to keep the executor around, mark the pod as "inactive" so it
// can be ignored in future updates from the API server.
logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as inactive since " +
"deletion is disabled.")
val inactivatedPod = new PodBuilder(updatedPod)
.editMetadata()
.addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava)
.endMetadata()
.build()
kubernetesClient
.pods()
.withName(updatedPod.getMetadata.getName)
.patch(inactivatedPod)
inactivatedPods += execId
}
}
}
private def removeExecutorFromSpark(
schedulerBackend: KubernetesClusterSchedulerBackend,
podState: FinalPodState,
execId: Long): Unit = {
execId: Long): Boolean = {
if (removedExecutorsCache.getIfPresent(execId) == null) {
removedExecutorsCache.put(execId, execId)
val exitReason = findExitReason(podState, execId)
schedulerBackend.doRemoveExecutor(execId.toString, exitReason)
true
} else {
false
}
}
@ -181,6 +229,10 @@ private[spark] class ExecutorPodsLifecycleManager(
terminatedContainer.getState.getTerminated.getExitCode.toInt
}.getOrElse(UNKNOWN_EXIT_CODE)
}
private def isPodInactive(pod: Pod): Boolean = {
pod.getMetadata.getLabels.get(SPARK_EXECUTOR_INACTIVE_LABEL) == "true"
}
}
private object ExecutorPodsLifecycleManager {

View file

@ -59,6 +59,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
.list()
.getItems
.asScala)

View file

@ -16,14 +16,19 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.ArrayList
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model.Pod
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils
/**
* Controls the propagation of the Spark application's executor pods state to subscribers that
@ -46,9 +51,11 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in
* time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different
* time intervals.
* <br>
* The subcriber notification callback is guaranteed to be called from a single thread at a time.
*/
private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService)
extends ExecutorPodsSnapshotsStore {
extends ExecutorPodsSnapshotsStore with Logging {
private val SNAPSHOT_LOCK = new Object()
@ -61,14 +68,13 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
override def addSubscriber(
processBatchIntervalMillis: Long)
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
val newSubscriber = SnapshotsSubscriber(
new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots)
val newSubscriber = new SnapshotsSubscriber(onNewSnapshots)
SNAPSHOT_LOCK.synchronized {
newSubscriber.snapshotsBuffer.add(currentSnapshot)
newSubscriber.addCurrentSnapshot()
}
subscribers.add(newSubscriber)
pollingTasks.add(subscribersExecutor.scheduleWithFixedDelay(
() => callSubscriber(newSubscriber),
() => newSubscriber.processSnapshots(),
0L,
processBatchIntervalMillis,
TimeUnit.MILLISECONDS))
@ -77,7 +83,7 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
override def notifySubscribers(): Unit = SNAPSHOT_LOCK.synchronized {
subscribers.asScala.foreach { s =>
subscribersExecutor.submit(new Runnable() {
override def run(): Unit = callSubscriber(s)
override def run(): Unit = s.processSnapshots()
})
}
}
@ -98,20 +104,57 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
}
private def addCurrentSnapshotToSubscribers(): Unit = {
subscribers.asScala.foreach { subscriber =>
subscriber.snapshotsBuffer.add(currentSnapshot)
}
subscribers.asScala.foreach(_.addCurrentSnapshot())
}
private def callSubscriber(subscriber: SnapshotsSubscriber): Unit = {
Utils.tryLogNonFatalError {
val currentSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot].asJava
subscriber.snapshotsBuffer.drainTo(currentSnapshots)
subscriber.onNewSnapshots(currentSnapshots.asScala)
private class SnapshotsSubscriber(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit) {
private val snapshotsBuffer = new LinkedBlockingQueue[ExecutorPodsSnapshot]()
private val lock = new ReentrantLock()
private val notificationCount = new AtomicInteger()
def addCurrentSnapshot(): Unit = {
snapshotsBuffer.add(currentSnapshot)
}
def processSnapshots(): Unit = {
notificationCount.incrementAndGet()
processSnapshotsInternal()
}
private def processSnapshotsInternal(): Unit = {
if (lock.tryLock()) {
// Check whether there are pending notifications before calling the subscriber. This
// is needed to avoid calling the subscriber spuriously when the race described in the
// comment below happens.
if (notificationCount.get() > 0) {
try {
val snapshots = new ArrayList[ExecutorPodsSnapshot]()
snapshotsBuffer.drainTo(snapshots)
onNewSnapshots(snapshots.asScala)
} catch {
case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", e)
} finally {
lock.unlock()
}
if (notificationCount.decrementAndGet() > 0) {
// There was another concurrent request for this subcriber. Schedule a task to
// immediately process snapshots again, so that the subscriber can pick up any
// changes that may have happened between the time it started looking at snapshots
// above, and the time the concurrent request arrived.
//
// This has to be done outside of the lock, otherwise we might miss a notification
// arriving after the above check, but before we've released the lock. Flip side is
// that we may schedule a useless task that will just fail to grab the lock.
subscribersExecutor.submit(new Runnable() {
override def run(): Unit = processSnapshotsInternal()
})
}
} else {
lock.unlock()
}
}
}
}
private case class SnapshotsSubscriber(
snapshotsBuffer: BlockingQueue[ExecutorPodsSnapshot],
onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
}

View file

@ -20,7 +20,7 @@ import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
@ -30,6 +30,7 @@ import scala.collection.mutable
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.scheduler.ExecutorExited
@ -80,6 +81,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
test("Don't remove executors twice from Spark but remove from K8s repeatedly.") {
val failedPod = failedExecutorWithoutDeletion(1)
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod)
@ -108,7 +110,13 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
val msg = exitReasonMessage(1, failedPod)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
verify(podOperations, never()).delete()
verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
val podCaptor = ArgumentCaptor.forClass(classOf[Pod])
verify(namedExecutorPods(failedPod.getMetadata.getName)).patch(podCaptor.capture())
val pod = podCaptor.getValue()
assert(pod.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true")
}
private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = {

View file

@ -49,6 +49,9 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
@Mock
private var executorRoleLabeledPods: LABELED_PODS = _
@Mock
private var activeExecutorPods: LABELED_PODS = _
@Mock
private var eventQueue: ExecutorPodsSnapshotsStore = _
@ -69,10 +72,12 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
.thenReturn(appIdLabeledPods)
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
when(executorRoleLabeledPods.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true"))
.thenReturn(activeExecutorPods)
}
test("Items returned by the API should be pushed to the event queue") {
when(executorRoleLabeledPods.list())
when(activeExecutorPods.list())
.thenReturn(new PodListBuilder()
.addToItems(
runningExecutor(1),
@ -80,6 +85,5 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
.build())
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), runningExecutor(2)))
}
}