diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index a3c74ff7b2..759c205b52 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -24,6 +24,7 @@ private[spark] object Constants { val SPARK_ROLE_LABEL = "spark-role" val SPARK_POD_DRIVER_ROLE = "driver" val SPARK_POD_EXECUTOR_ROLE = "executor" + val SPARK_EXECUTOR_INACTIVE_LABEL = "spark-exec-inactive" // Credentials secrets val DRIVER_CREDENTIALS_SECRETS_BASE_DIR = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index b394f35b15..b6ea1faeda 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -94,7 +94,7 @@ private[spark] class ExecutorPodsAllocator( private def onNewSnapshots( applicationId: String, - snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized { + snapshots: Seq[ExecutorPodsSnapshot]): Unit = { newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) // For all executors we've created against the API but have not seen in a snapshot // yet - check the current time. If the current time has exceeded some threshold, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index d6b75824d0..5d91e52cb2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -17,13 +17,14 @@ package org.apache.spark.scheduler.cluster.k8s import com.google.common.cache.Cache -import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.ExecutorExited @@ -41,11 +42,14 @@ private[spark] class ExecutorPodsLifecycleManager( import ExecutorPodsLifecycleManager._ - private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) - private lazy val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) + // Keep track of which pods are inactive to avoid contacting the API server multiple times. + // This set is cleaned up when a snapshot containing the updated pod is processed. + private val inactivatedPods = mutable.HashSet.empty[Long] + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { + val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) snapshotsStore.addSubscriber(eventProcessingInterval) { onNewSnapshots(schedulerBackend, _) } @@ -58,56 +62,78 @@ private[spark] class ExecutorPodsLifecycleManager( snapshots.foreach { snapshot => snapshot.executorPods.foreach { case (execId, state) => state match { + case _state if isPodInactive(_state.pod) => + inactivatedPods -= execId + case deleted@PodDeleted(_) => - logDebug(s"Snapshot reported deleted executor with id $execId," + - s" pod name ${state.pod.getMetadata.getName}") - removeExecutorFromSpark(schedulerBackend, deleted, execId) - execIdsRemovedInThisRound += execId - case failed@PodFailed(_) => - logDebug(s"Snapshot reported failed executor with id $execId," + - s" pod name ${state.pod.getMetadata.getName}") - onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound) - case succeeded@PodSucceeded(_) => - if (schedulerBackend.isExecutorActive(execId.toString)) { - logInfo(s"Snapshot reported succeeded executor with id $execId, " + - "even though the application has not requested for it to be removed.") - } else { - logDebug(s"Snapshot reported succeeded executor with id $execId," + - s" pod name ${state.pod.getMetadata.getName}.") + if (removeExecutorFromSpark(schedulerBackend, deleted, execId)) { + execIdsRemovedInThisRound += execId + logDebug(s"Snapshot reported deleted executor with id $execId," + + s" pod name ${state.pod.getMetadata.getName}") } - onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound) + inactivatedPods -= execId + + case failed@PodFailed(_) => + val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId) + if (onFinalNonDeletedState(failed, execId, schedulerBackend, deleteFromK8s)) { + execIdsRemovedInThisRound += execId + logDebug(s"Snapshot reported failed executor with id $execId," + + s" pod name ${state.pod.getMetadata.getName}") + } + + case succeeded@PodSucceeded(_) => + val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId) + if (onFinalNonDeletedState(succeeded, execId, schedulerBackend, deleteFromK8s)) { + execIdsRemovedInThisRound += execId + if (schedulerBackend.isExecutorActive(execId.toString)) { + logInfo(s"Snapshot reported succeeded executor with id $execId, " + + "even though the application has not requested for it to be removed.") + } else { + logDebug(s"Snapshot reported succeeded executor with id $execId," + + s" pod name ${state.pod.getMetadata.getName}.") + } + } + case _ => } } } + // Clean up any pods from the inactive list that don't match any pods from the last snapshot. + // This makes sure that we don't keep growing that set indefinitely, in case we end up missing + // an update for some pod. + if (inactivatedPods.nonEmpty && snapshots.nonEmpty) { + inactivatedPods.retain(snapshots.last.executorPods.contains(_)) + } + // Reconcile the case where Spark claims to know about an executor but the corresponding pod // is missing from the cluster. This would occur if we miss a deletion event and the pod - // transitions immediately from running io absent. We only need to check against the latest + // transitions immediately from running to absent. We only need to check against the latest // snapshot for this, and we don't do this for executors in the deleted executors cache or // that we just removed in this round. - if (snapshots.nonEmpty) { - val latestSnapshot = snapshots.last - (schedulerBackend.getExecutorIds().map(_.toLong).toSet - -- latestSnapshot.executorPods.keySet - -- execIdsRemovedInThisRound).foreach { missingExecutorId => - if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) { - val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" + - s" cluster but we didn't get a reason why. Marking the executor as failed. The" + - s" executor may have been deleted but the driver missed the deletion event." - logDebug(exitReasonMessage) - val exitReason = ExecutorExited( - UNKNOWN_EXIT_CODE, - exitCausedByApp = false, - exitReasonMessage) - schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason) - execIdsRemovedInThisRound += missingExecutorId - } + val lostExecutors = if (snapshots.nonEmpty) { + schedulerBackend.getExecutorIds().map(_.toLong).toSet -- + snapshots.last.executorPods.keySet -- execIdsRemovedInThisRound + } else { + Nil + } + + lostExecutors.foreach { lostId => + if (removedExecutorsCache.getIfPresent(lostId) == null) { + val exitReasonMessage = s"The executor with ID $lostId was not found in the" + + s" cluster but we didn't get a reason why. Marking the executor as failed. The" + + s" executor may have been deleted but the driver missed the deletion event." + logDebug(exitReasonMessage) + val exitReason = ExecutorExited( + UNKNOWN_EXIT_CODE, + exitCausedByApp = false, + exitReasonMessage) + schedulerBackend.doRemoveExecutor(lostId.toString, exitReason) } } - if (execIdsRemovedInThisRound.nonEmpty) { - logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" + + if (lostExecutors.nonEmpty) { + logDebug(s"Removed executors with ids ${lostExecutors.mkString(",")}" + s" from Spark that were either found to be deleted or non-existent in the cluster.") } } @@ -116,35 +142,57 @@ private[spark] class ExecutorPodsLifecycleManager( podState: FinalPodState, execId: Long, schedulerBackend: KubernetesClusterSchedulerBackend, - execIdsRemovedInRound: mutable.Set[Long]): Unit = { - removeExecutorFromSpark(schedulerBackend, podState, execId) - if (shouldDeleteExecutors) { - removeExecutorFromK8s(podState.pod) + deleteFromK8s: Boolean): Boolean = { + val deleted = removeExecutorFromSpark(schedulerBackend, podState, execId) + if (deleteFromK8s) { + removeExecutorFromK8s(execId, podState.pod) } - execIdsRemovedInRound += execId + deleted } - private def removeExecutorFromK8s(updatedPod: Pod): Unit = { - // If deletion failed on a previous try, we can try again if resync informs us the pod - // is still around. - // Delete as best attempt - duplicate deletes will throw an exception but the end state - // of getting rid of the pod is what matters. + private def removeExecutorFromK8s(execId: Long, updatedPod: Pod): Unit = { Utils.tryLogNonFatalError { - kubernetesClient - .pods() - .withName(updatedPod.getMetadata.getName) - .delete() + if (shouldDeleteExecutors) { + // If deletion failed on a previous try, we can try again if resync informs us the pod + // is still around. + // Delete as best attempt - duplicate deletes will throw an exception but the end state + // of getting rid of the pod is what matters. + kubernetesClient + .pods() + .withName(updatedPod.getMetadata.getName) + .delete() + } else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) { + // If the config is set to keep the executor around, mark the pod as "inactive" so it + // can be ignored in future updates from the API server. + logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as inactive since " + + "deletion is disabled.") + val inactivatedPod = new PodBuilder(updatedPod) + .editMetadata() + .addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava) + .endMetadata() + .build() + + kubernetesClient + .pods() + .withName(updatedPod.getMetadata.getName) + .patch(inactivatedPod) + + inactivatedPods += execId + } } } private def removeExecutorFromSpark( schedulerBackend: KubernetesClusterSchedulerBackend, podState: FinalPodState, - execId: Long): Unit = { + execId: Long): Boolean = { if (removedExecutorsCache.getIfPresent(execId) == null) { removedExecutorsCache.put(execId, execId) val exitReason = findExitReason(podState, execId) schedulerBackend.doRemoveExecutor(execId.toString, exitReason) + true + } else { + false } } @@ -181,6 +229,10 @@ private[spark] class ExecutorPodsLifecycleManager( terminatedContainer.getState.getTerminated.getExitCode.toInt }.getOrElse(UNKNOWN_EXIT_CODE) } + + private def isPodInactive(pod: Pod): Boolean = { + pod.getMetadata.getLabels.get(SPARK_EXECUTOR_INACTIVE_LABEL) == "true" + } } private object ExecutorPodsLifecycleManager { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 96a5059df6..fd8f6979c9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -59,6 +59,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( .pods() .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true") .list() .getItems .asScala) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala index 8aa20bfbc8..d68dc3ebef 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala @@ -16,14 +16,19 @@ */ package org.apache.spark.scheduler.cluster.k8s +import java.util.ArrayList import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantLock +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import io.fabric8.kubernetes.api.model.Pod -import javax.annotation.concurrent.GuardedBy -import scala.collection.JavaConverters._ -import scala.collection.mutable -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils /** * Controls the propagation of the Spark application's executor pods state to subscribers that @@ -46,9 +51,11 @@ import org.apache.spark.util.{ThreadUtils, Utils} * subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in * time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different * time intervals. + *
+ * The subcriber notification callback is guaranteed to be called from a single thread at a time. */ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService) - extends ExecutorPodsSnapshotsStore { + extends ExecutorPodsSnapshotsStore with Logging { private val SNAPSHOT_LOCK = new Object() @@ -61,14 +68,13 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul override def addSubscriber( processBatchIntervalMillis: Long) (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = { - val newSubscriber = SnapshotsSubscriber( - new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots) + val newSubscriber = new SnapshotsSubscriber(onNewSnapshots) SNAPSHOT_LOCK.synchronized { - newSubscriber.snapshotsBuffer.add(currentSnapshot) + newSubscriber.addCurrentSnapshot() } subscribers.add(newSubscriber) pollingTasks.add(subscribersExecutor.scheduleWithFixedDelay( - () => callSubscriber(newSubscriber), + () => newSubscriber.processSnapshots(), 0L, processBatchIntervalMillis, TimeUnit.MILLISECONDS)) @@ -77,7 +83,7 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul override def notifySubscribers(): Unit = SNAPSHOT_LOCK.synchronized { subscribers.asScala.foreach { s => subscribersExecutor.submit(new Runnable() { - override def run(): Unit = callSubscriber(s) + override def run(): Unit = s.processSnapshots() }) } } @@ -98,20 +104,57 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul } private def addCurrentSnapshotToSubscribers(): Unit = { - subscribers.asScala.foreach { subscriber => - subscriber.snapshotsBuffer.add(currentSnapshot) - } + subscribers.asScala.foreach(_.addCurrentSnapshot()) } - private def callSubscriber(subscriber: SnapshotsSubscriber): Unit = { - Utils.tryLogNonFatalError { - val currentSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot].asJava - subscriber.snapshotsBuffer.drainTo(currentSnapshots) - subscriber.onNewSnapshots(currentSnapshots.asScala) + private class SnapshotsSubscriber(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit) { + + private val snapshotsBuffer = new LinkedBlockingQueue[ExecutorPodsSnapshot]() + private val lock = new ReentrantLock() + private val notificationCount = new AtomicInteger() + + def addCurrentSnapshot(): Unit = { + snapshotsBuffer.add(currentSnapshot) + } + + def processSnapshots(): Unit = { + notificationCount.incrementAndGet() + processSnapshotsInternal() + } + + private def processSnapshotsInternal(): Unit = { + if (lock.tryLock()) { + // Check whether there are pending notifications before calling the subscriber. This + // is needed to avoid calling the subscriber spuriously when the race described in the + // comment below happens. + if (notificationCount.get() > 0) { + try { + val snapshots = new ArrayList[ExecutorPodsSnapshot]() + snapshotsBuffer.drainTo(snapshots) + onNewSnapshots(snapshots.asScala) + } catch { + case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", e) + } finally { + lock.unlock() + } + + if (notificationCount.decrementAndGet() > 0) { + // There was another concurrent request for this subcriber. Schedule a task to + // immediately process snapshots again, so that the subscriber can pick up any + // changes that may have happened between the time it started looking at snapshots + // above, and the time the concurrent request arrived. + // + // This has to be done outside of the lock, otherwise we might miss a notification + // arriving after the above check, but before we've released the lock. Flip side is + // that we may schedule a useless task that will just fail to grab the lock. + subscribersExecutor.submit(new Runnable() { + override def run(): Unit = processSnapshotsInternal() + }) + } + } else { + lock.unlock() + } + } } } - - private case class SnapshotsSubscriber( - snapshotsBuffer: BlockingQueue[ExecutorPodsSnapshot], - onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index 9920f4d3ea..fb6f3ace7a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -20,7 +20,7 @@ import com.google.common.cache.CacheBuilder import io.fabric8.kubernetes.api.model.{DoneablePod, Pod} import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource -import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mock, never, times, verify, when} import org.mockito.invocation.InvocationOnMock @@ -30,6 +30,7 @@ import scala.collection.mutable import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.Config +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.scheduler.ExecutorExited @@ -80,6 +81,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte test("Don't remove executors twice from Spark but remove from K8s repeatedly.") { val failedPod = failedExecutorWithoutDeletion(1) snapshotsStore.updatePod(failedPod) + snapshotsStore.notifySubscribers() snapshotsStore.updatePod(failedPod) snapshotsStore.notifySubscribers() val msg = exitReasonMessage(1, failedPod) @@ -108,7 +110,13 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte val msg = exitReasonMessage(1, failedPod) val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg) verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) - verify(podOperations, never()).delete() + verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete() + + val podCaptor = ArgumentCaptor.forClass(classOf[Pod]) + verify(namedExecutorPods(failedPod.getMetadata.getName)).patch(podCaptor.capture()) + + val pod = podCaptor.getValue() + assert(pod.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true") } private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala index 1b26d6af29..63e43bd40c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala @@ -49,6 +49,9 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn @Mock private var executorRoleLabeledPods: LABELED_PODS = _ + @Mock + private var activeExecutorPods: LABELED_PODS = _ + @Mock private var eventQueue: ExecutorPodsSnapshotsStore = _ @@ -69,10 +72,12 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn .thenReturn(appIdLabeledPods) when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) .thenReturn(executorRoleLabeledPods) + when(executorRoleLabeledPods.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")) + .thenReturn(activeExecutorPods) } test("Items returned by the API should be pushed to the event queue") { - when(executorRoleLabeledPods.list()) + when(activeExecutorPods.list()) .thenReturn(new PodListBuilder() .addToItems( runningExecutor(1), @@ -80,6 +85,5 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn .build()) pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS) verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), runningExecutor(2))) - } }