diff --git a/LICENSE b/LICENSE
index 820f14dbde..cc1f580207 100644
--- a/LICENSE
+++ b/LICENSE
@@ -237,6 +237,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
+ (BSD 3 Clause) jmock (org.jmock:jmock-junit4:2.8.4 - http://jmock.org/)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 165a15c73e..0f08a2b0ad 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -19,13 +19,12 @@ package org.apache.spark.util
import java.util.concurrent._
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal
-import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
-
import org.apache.spark.SparkException
private[spark] object ThreadUtils {
@@ -103,6 +102,22 @@ private[spark] object ThreadUtils {
executor
}
+ /**
+ * Wrapper over ScheduledThreadPoolExecutor.
+ */
+ def newDaemonThreadPoolScheduledExecutor(threadNamePrefix: String, numThreads: Int)
+ : ScheduledExecutorService = {
+ val threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(s"$threadNamePrefix-%d")
+ .build()
+ val executor = new ScheduledThreadPoolExecutor(numThreads, threadFactory)
+ // By default, a cancelled task is not automatically removed from the work queue until its delay
+ // elapses. We have to enable it manually.
+ executor.setRemoveOnCancelPolicy(true)
+ executor
+ }
+
/**
* Run a piece of code in a new thread and return the result. Exception in the new thread is
* thrown in the caller thread with an adjusted stack trace that removes references to this
@@ -229,4 +244,14 @@ private[spark] object ThreadUtils {
}
}
// scalastyle:on awaitready
+
+ def shutdown(
+ executor: ExecutorService,
+ gracePeriod: Duration = FiniteDuration(30, TimeUnit.SECONDS)): Unit = {
+ executor.shutdown()
+ executor.awaitTermination(gracePeriod.toMillis, TimeUnit.MILLISECONDS)
+ if (!executor.isShutdown) {
+ executor.shutdownNow()
+ }
+ }
}
diff --git a/licenses/LICENSE-jmock.txt b/licenses/LICENSE-jmock.txt
new file mode 100644
index 0000000000..ed7964fe3d
--- /dev/null
+++ b/licenses/LICENSE-jmock.txt
@@ -0,0 +1,28 @@
+Copyright (c) 2000-2017, jMock.org
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+Redistributions of source code must retain the above copyright notice,
+this list of conditions and the following disclaimer. Redistributions
+in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution.
+
+Neither the name of jMock nor the names of its contributors may be
+used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/pom.xml b/pom.xml
index 23bbd3b097..4b4e6c13ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -760,6 +760,12 @@
1.10.19
test
+
+ org.jmock
+ jmock-junit4
+ test
+ 2.8.4
+
org.scalacheck
scalacheck_${scala.binary.version}
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index a62f271273..a6dd47a6b7 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -77,6 +77,12 @@
+
+ com.squareup.okhttp3
+ okhttp
+ 3.8.1
+
+
org.mockito
mockito-core
@@ -84,9 +90,9 @@
- com.squareup.okhttp3
- okhttp
- 3.8.1
+ org.jmock
+ jmock-junit4
+ test
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 590deaa72e..bf33179ae3 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -176,6 +176,24 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")
+ val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
+ ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
+ .doc("Interval between polls against the Kubernetes API server to inspect the " +
+ "state of executors.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .checkValue(interval => interval > 0, s"API server polling interval must be a" +
+ " positive time value.")
+ .createWithDefaultString("30s")
+
+ val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL =
+ ConfigBuilder("spark.kubernetes.executor.eventProcessingInterval")
+ .doc("Interval between successive inspection of executor events sent from the" +
+ " Kubernetes API.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .checkValue(interval => interval > 0, s"Event processing interval must be a positive" +
+ " time value.")
+ .createWithDefaultString("1s")
+
val MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
.doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
@@ -193,7 +211,6 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")
-
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala
new file mode 100644
index 0000000000..83daddf714
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+sealed trait ExecutorPodState {
+ def pod: Pod
+}
+
+case class PodRunning(pod: Pod) extends ExecutorPodState
+
+case class PodPending(pod: Pod) extends ExecutorPodState
+
+sealed trait FinalPodState extends ExecutorPodState
+
+case class PodSucceeded(pod: Pod) extends FinalPodState
+
+case class PodFailed(pod: Pod) extends FinalPodState
+
+case class PodDeleted(pod: Pod) extends FinalPodState
+
+case class PodUnknown(pod: Pod) extends ExecutorPodState
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
new file mode 100644
index 0000000000..5a143ad360
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, Utils}
+
+private[spark] class ExecutorPodsAllocator(
+ conf: SparkConf,
+ executorBuilder: KubernetesExecutorBuilder,
+ kubernetesClient: KubernetesClient,
+ snapshotsStore: ExecutorPodsSnapshotsStore,
+ clock: Clock) extends Logging {
+
+ private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+ private val totalExpectedExecutors = new AtomicInteger(0)
+
+ private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+ private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+ private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000)
+
+ private val kubernetesDriverPodName = conf
+ .get(KUBERNETES_DRIVER_POD_NAME)
+ .getOrElse(throw new SparkException("Must specify the driver pod name"))
+
+ private val driverPod = kubernetesClient.pods()
+ .withName(kubernetesDriverPodName)
+ .get()
+
+ // Executor IDs that have been requested from Kubernetes but have not been detected in any
+ // snapshot yet. Mapped to the timestamp when they were created.
+ private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
+
+ def start(applicationId: String): Unit = {
+ snapshotsStore.addSubscriber(podAllocationDelay) {
+ onNewSnapshots(applicationId, _)
+ }
+ }
+
+ def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total)
+
+ private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
+ newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
+ // For all executors we've created against the API but have not seen in a snapshot
+ // yet - check the current time. If the current time has exceeded some threshold,
+ // assume that the pod was either never created (the API server never properly
+ // handled the creation request), or the API server created the pod but we missed
+ // both the creation and deletion events. In either case, delete the missing pod
+ // if possible, and mark such a pod to be rescheduled below.
+ newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
+ val currentTime = clock.getTimeMillis()
+ if (currentTime - timeCreated > podCreationTimeout) {
+ logWarning(s"Executor with id $execId was not detected in the Kubernetes" +
+ s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
+ " previous allocation attempt tried to create it. The executor may have been" +
+ " deleted but the application missed the deletion event.")
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
+ .delete()
+ }
+ newlyCreatedExecutors -= execId
+ } else {
+ logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
+ s" was created ${currentTime - timeCreated} milliseconds ago.")
+ }
+ }
+
+ if (snapshots.nonEmpty) {
+ // Only need to examine the cluster as of the latest snapshot, the "current" state, to see if
+ // we need to allocate more executors or not.
+ val latestSnapshot = snapshots.last
+ val currentRunningExecutors = latestSnapshot.executorPods.values.count {
+ case PodRunning(_) => true
+ case _ => false
+ }
+ val currentPendingExecutors = latestSnapshot.executorPods.values.count {
+ case PodPending(_) => true
+ case _ => false
+ }
+ val currentTotalExpectedExecutors = totalExpectedExecutors.get
+ logDebug(s"Currently have $currentRunningExecutors running executors and" +
+ s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" +
+ s" have been requested but are pending appearance in the cluster.")
+ if (newlyCreatedExecutors.isEmpty
+ && currentPendingExecutors == 0
+ && currentRunningExecutors < currentTotalExpectedExecutors) {
+ val numExecutorsToAllocate = math.min(
+ currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
+ logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
+ for ( _ <- 0 until numExecutorsToAllocate) {
+ val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
+ val executorConf = KubernetesConf.createExecutorConf(
+ conf,
+ newExecutorId.toString,
+ applicationId,
+ driverPod)
+ val executorPod = executorBuilder.buildFromFeatures(executorConf)
+ val podWithAttachedContainer = new PodBuilder(executorPod.pod)
+ .editOrNewSpec()
+ .addToContainers(executorPod.container)
+ .endSpec()
+ .build()
+ kubernetesClient.pods().create(podWithAttachedContainer)
+ newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
+ logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
+ }
+ } else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
+ // TODO handle edge cases if we end up with more running executors than expected.
+ logDebug("Current number of running executors is equal to the number of requested" +
+ " executors. Not scaling up further.")
+ } else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
+ logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
+ s" executors to begin running before requesting for more executors. # of executors in" +
+ s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" +
+ s" created but we have not observed as being present in the cluster yet:" +
+ s" ${newlyCreatedExecutors.size}.")
+ }
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
new file mode 100644
index 0000000000..b28d939903
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.Cache
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsLifecycleManager(
+ conf: SparkConf,
+ executorBuilder: KubernetesExecutorBuilder,
+ kubernetesClient: KubernetesClient,
+ snapshotsStore: ExecutorPodsSnapshotsStore,
+ // Use a best-effort to track which executors have been removed already. It's not generally
+ // job-breaking if we remove executors more than once but it's ideal if we make an attempt
+ // to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
+ // bounds.
+ removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) extends Logging {
+
+ import ExecutorPodsLifecycleManager._
+
+ private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
+
+ def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+ snapshotsStore.addSubscriber(eventProcessingInterval) {
+ onNewSnapshots(schedulerBackend, _)
+ }
+ }
+
+ private def onNewSnapshots(
+ schedulerBackend: KubernetesClusterSchedulerBackend,
+ snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
+ val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
+ snapshots.foreach { snapshot =>
+ snapshot.executorPods.foreach { case (execId, state) =>
+ state match {
+ case deleted@PodDeleted(_) =>
+ logDebug(s"Snapshot reported deleted executor with id $execId," +
+ s" pod name ${state.pod.getMetadata.getName}")
+ removeExecutorFromSpark(schedulerBackend, deleted, execId)
+ execIdsRemovedInThisRound += execId
+ case failed@PodFailed(_) =>
+ logDebug(s"Snapshot reported failed executor with id $execId," +
+ s" pod name ${state.pod.getMetadata.getName}")
+ onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
+ case succeeded@PodSucceeded(_) =>
+ logDebug(s"Snapshot reported succeeded executor with id $execId," +
+ s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
+ s" unusual unless Spark specifically informed the executor to exit.")
+ onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
+ case _ =>
+ }
+ }
+ }
+
+ // Reconcile the case where Spark claims to know about an executor but the corresponding pod
+ // is missing from the cluster. This would occur if we miss a deletion event and the pod
+ // transitions immediately from running io absent. We only need to check against the latest
+ // snapshot for this, and we don't do this for executors in the deleted executors cache or
+ // that we just removed in this round.
+ if (snapshots.nonEmpty) {
+ val latestSnapshot = snapshots.last
+ (schedulerBackend.getExecutorIds().map(_.toLong).toSet
+ -- latestSnapshot.executorPods.keySet
+ -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
+ if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
+ val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
+ s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
+ s" executor may have been deleted but the driver missed the deletion event."
+ logDebug(exitReasonMessage)
+ val exitReason = ExecutorExited(
+ UNKNOWN_EXIT_CODE,
+ exitCausedByApp = false,
+ exitReasonMessage)
+ schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason)
+ execIdsRemovedInThisRound += missingExecutorId
+ }
+ }
+ }
+ logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
+ s" from Spark that were either found to be deleted or non-existent in the cluster.")
+ }
+
+ private def onFinalNonDeletedState(
+ podState: FinalPodState,
+ execId: Long,
+ schedulerBackend: KubernetesClusterSchedulerBackend,
+ execIdsRemovedInRound: mutable.Set[Long]): Unit = {
+ removeExecutorFromK8s(podState.pod)
+ removeExecutorFromSpark(schedulerBackend, podState, execId)
+ execIdsRemovedInRound += execId
+ }
+
+ private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
+ // If deletion failed on a previous try, we can try again if resync informs us the pod
+ // is still around.
+ // Delete as best attempt - duplicate deletes will throw an exception but the end state
+ // of getting rid of the pod is what matters.
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .withName(updatedPod.getMetadata.getName)
+ .delete()
+ }
+ }
+
+ private def removeExecutorFromSpark(
+ schedulerBackend: KubernetesClusterSchedulerBackend,
+ podState: FinalPodState,
+ execId: Long): Unit = {
+ if (removedExecutorsCache.getIfPresent(execId) == null) {
+ removedExecutorsCache.put(execId, execId)
+ val exitReason = findExitReason(podState, execId)
+ schedulerBackend.doRemoveExecutor(execId.toString, exitReason)
+ }
+ }
+
+ private def findExitReason(podState: FinalPodState, execId: Long): ExecutorExited = {
+ val exitCode = findExitCode(podState)
+ val (exitCausedByApp, exitMessage) = podState match {
+ case PodDeleted(_) =>
+ (false, s"The executor with id $execId was deleted by a user or the framework.")
+ case _ =>
+ val msg = exitReasonMessage(podState, execId, exitCode)
+ (true, msg)
+ }
+ ExecutorExited(exitCode, exitCausedByApp, exitMessage)
+ }
+
+ private def exitReasonMessage(podState: FinalPodState, execId: Long, exitCode: Int) = {
+ val pod = podState.pod
+ s"""
+ |The executor with id $execId exited with exit code $exitCode.
+ |The API gave the following brief reason: ${pod.getStatus.getReason}
+ |The API gave the following message: ${pod.getStatus.getMessage}
+ |The API gave the following container statuses:
+ |
+ |${pod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
+ """.stripMargin
+ }
+
+ private def findExitCode(podState: FinalPodState): Int = {
+ podState.pod.getStatus.getContainerStatuses.asScala.find { containerStatus =>
+ containerStatus.getState.getTerminated != null
+ }.map { terminatedContainer =>
+ terminatedContainer.getState.getTerminated.getExitCode.toInt
+ }.getOrElse(UNKNOWN_EXIT_CODE)
+ }
+}
+
+private object ExecutorPodsLifecycleManager {
+ val UNKNOWN_EXIT_CODE = -1
+}
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
new file mode 100644
index 0000000000..e77e604d00
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+ conf: SparkConf,
+ kubernetesClient: KubernetesClient,
+ snapshotsStore: ExecutorPodsSnapshotsStore,
+ pollingExecutor: ScheduledExecutorService) extends Logging {
+
+ private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+ private var pollingFuture: Future[_] = _
+
+ def start(applicationId: String): Unit = {
+ require(pollingFuture == null, "Cannot start polling more than once.")
+ logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
+ pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+ new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
+ }
+
+ def stop(): Unit = {
+ if (pollingFuture != null) {
+ pollingFuture.cancel(true)
+ pollingFuture = null
+ }
+ ThreadUtils.shutdown(pollingExecutor)
+ }
+
+ private class PollRunnable(applicationId: String) extends Runnable {
+ override def run(): Unit = {
+ logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
+ snapshotsStore.replaceSnapshot(kubernetesClient
+ .pods()
+ .withLabel(SPARK_APP_ID_LABEL, applicationId)
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .list()
+ .getItems
+ .asScala)
+ }
+ }
+
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
new file mode 100644
index 0000000000..26be918043
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+
+/**
+ * An immutable view of the current executor pods that are running in the cluster.
+ */
+private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) {
+
+ import ExecutorPodsSnapshot._
+
+ def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
+ val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
+ new ExecutorPodsSnapshot(newExecutorPods)
+ }
+}
+
+object ExecutorPodsSnapshot extends Logging {
+
+ def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
+ ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
+ }
+
+ def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, ExecutorPodState])
+
+ private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, ExecutorPodState] = {
+ executorPods.map { pod =>
+ (pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, toState(pod))
+ }.toMap
+ }
+
+ private def toState(pod: Pod): ExecutorPodState = {
+ if (isDeleted(pod)) {
+ PodDeleted(pod)
+ } else {
+ val phase = pod.getStatus.getPhase.toLowerCase
+ phase match {
+ case "pending" =>
+ PodPending(pod)
+ case "running" =>
+ PodRunning(pod)
+ case "failed" =>
+ PodFailed(pod)
+ case "succeeded" =>
+ PodSucceeded(pod)
+ case _ =>
+ logWarning(s"Received unknown phase $phase for executor pod with name" +
+ s" ${pod.getMetadata.getName} in namespace ${pod.getMetadata.getNamespace}")
+ PodUnknown(pod)
+ }
+ }
+ }
+
+ private def isDeleted(pod: Pod): Boolean = pod.getMetadata.getDeletionTimestamp != null
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStore.scala
new file mode 100644
index 0000000000..dd264332cf
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStore.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+private[spark] trait ExecutorPodsSnapshotsStore {
+
+ def addSubscriber
+ (processBatchIntervalMillis: Long)
+ (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
+
+ def stop(): Unit
+
+ def updatePod(updatedPod: Pod): Unit
+
+ def replaceSnapshot(newSnapshot: Seq[Pod]): Unit
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
new file mode 100644
index 0000000000..5583b4617e
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent._
+
+import io.fabric8.kubernetes.api.model.Pod
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Controls the propagation of the Spark application's executor pods state to subscribers that
+ * react to that state.
+ *
+ * Roughly follows a producer-consumer model. Producers report states of executor pods, and these
+ * states are then published to consumers that can perform any actions in response to these states.
+ *
+ * Producers push updates in one of two ways. An incremental update sent by updatePod() represents
+ * a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that
+ * the passed pods are all of the most up to date states of all executor pods for the application.
+ * The combination of the states of all executor pods for the application is collectively known as
+ * a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that
+ * most recent snapshot - either by incrementally updating the snapshot with a single new pod state,
+ * or by replacing the snapshot entirely on a full sync.
+ *
+ * Consumers, or subscribers, register that they want to be informed about all snapshots of the
+ * executor pods. Every time the store replaces its most up to date snapshot from either an
+ * incremental update or a full sync, the most recent snapshot after the update is posted to the
+ * subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in
+ * time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different
+ * time intervals.
+ */
+private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService)
+ extends ExecutorPodsSnapshotsStore {
+
+ private val SNAPSHOT_LOCK = new Object()
+
+ private val subscribers = mutable.Buffer.empty[SnapshotsSubscriber]
+ private val pollingTasks = mutable.Buffer.empty[Future[_]]
+
+ @GuardedBy("SNAPSHOT_LOCK")
+ private var currentSnapshot = ExecutorPodsSnapshot()
+
+ override def addSubscriber(
+ processBatchIntervalMillis: Long)
+ (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
+ val newSubscriber = SnapshotsSubscriber(
+ new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots)
+ SNAPSHOT_LOCK.synchronized {
+ newSubscriber.snapshotsBuffer.add(currentSnapshot)
+ }
+ subscribers += newSubscriber
+ pollingTasks += subscribersExecutor.scheduleWithFixedDelay(
+ toRunnable(() => callSubscriber(newSubscriber)),
+ 0L,
+ processBatchIntervalMillis,
+ TimeUnit.MILLISECONDS)
+ }
+
+ override def stop(): Unit = {
+ pollingTasks.foreach(_.cancel(true))
+ ThreadUtils.shutdown(subscribersExecutor)
+ }
+
+ override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized {
+ currentSnapshot = currentSnapshot.withUpdate(updatedPod)
+ addCurrentSnapshotToSubscribers()
+ }
+
+ override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = SNAPSHOT_LOCK.synchronized {
+ currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
+ addCurrentSnapshotToSubscribers()
+ }
+
+ private def addCurrentSnapshotToSubscribers(): Unit = {
+ subscribers.foreach { subscriber =>
+ subscriber.snapshotsBuffer.add(currentSnapshot)
+ }
+ }
+
+ private def callSubscriber(subscriber: SnapshotsSubscriber): Unit = {
+ Utils.tryLogNonFatalError {
+ val currentSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot].asJava
+ subscriber.snapshotsBuffer.drainTo(currentSnapshots)
+ subscriber.onNewSnapshots(currentSnapshots.asScala)
+ }
+ }
+
+ private def toRunnable[T](runnable: () => Unit): Runnable = new Runnable {
+ override def run(): Unit = runnable()
+ }
+
+ private case class SnapshotsSubscriber(
+ snapshotsBuffer: BlockingQueue[ExecutorPodsSnapshot],
+ onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
new file mode 100644
index 0000000000..a6749a644e
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsWatchSnapshotSource(
+ snapshotsStore: ExecutorPodsSnapshotsStore,
+ kubernetesClient: KubernetesClient) extends Logging {
+
+ private var watchConnection: Closeable = _
+
+ def start(applicationId: String): Unit = {
+ require(watchConnection == null, "Cannot start the watcher twice.")
+ logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
+ s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
+ watchConnection = kubernetesClient.pods()
+ .withLabel(SPARK_APP_ID_LABEL, applicationId)
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .watch(new ExecutorPodsWatcher())
+ }
+
+ def stop(): Unit = {
+ if (watchConnection != null) {
+ Utils.tryLogNonFatalError {
+ watchConnection.close()
+ }
+ watchConnection = null
+ }
+ }
+
+ private class ExecutorPodsWatcher extends Watcher[Pod] {
+ override def eventReceived(action: Action, pod: Pod): Unit = {
+ val podName = pod.getMetadata.getName
+ logDebug(s"Received executor pod update for pod named $podName, action $action")
+ snapshotsStore.updatePod(pod)
+ }
+
+ override def onClose(e: KubernetesClientException): Unit = {
+ logWarning("Kubernetes client has been closed (this is expected if the application is" +
+ " shutting down.)", e)
+ }
+ }
+
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 0ea80dfbc0..c6e931a384 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -17,7 +17,9 @@
package org.apache.spark.scheduler.cluster.k8s
import java.io.File
+import java.util.concurrent.TimeUnit
+import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.client.Config
import org.apache.spark.{SparkContext, SparkException}
@@ -26,7 +28,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{SystemClock, ThreadUtils}
private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
@@ -56,17 +58,45 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
- val allocatorExecutor = ThreadUtils
- .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-executor-requests")
+
+ val subscribersExecutor = ThreadUtils
+ .newDaemonThreadPoolScheduledExecutor(
+ "kubernetes-executor-snapshots-subscribers", 2)
+ val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(subscribersExecutor)
+ val removedExecutorsCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(3, TimeUnit.MINUTES)
+ .build[java.lang.Long, java.lang.Long]()
+ val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
+ sc.conf,
+ new KubernetesExecutorBuilder(),
+ kubernetesClient,
+ snapshotsStore,
+ removedExecutorsCache)
+
+ val executorPodsAllocator = new ExecutorPodsAllocator(
+ sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock())
+
+ val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
+ snapshotsStore,
+ kubernetesClient)
+
+ val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ "kubernetes-executor-pod-polling-sync")
+ val podsPollingEventSource = new ExecutorPodsPollingSnapshotSource(
+ sc.conf, kubernetesClient, snapshotsStore, eventsPollingExecutor)
+
new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
sc.env.rpcEnv,
- new KubernetesExecutorBuilder,
kubernetesClient,
- allocatorExecutor,
- requestExecutorsService)
+ requestExecutorsService,
+ snapshotsStore,
+ executorPodsAllocator,
+ executorPodsLifecycleEventHandler,
+ podsWatchEventSource,
+ podsPollingEventSource)
}
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index d86664c810..fa6dc2c479 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -16,60 +16,32 @@
*/
package org.apache.spark.scheduler.cluster.k8s
-import java.io.Closeable
-import java.net.InetAddress
-import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit}
-import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
-import javax.annotation.concurrent.GuardedBy
+import java.util.concurrent.ExecutorService
-import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
-import io.fabric8.kubernetes.client.Watcher.Action
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}
-import org.apache.spark.SparkException
-import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesConf
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
-import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
- executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
- allocatorExecutor: ScheduledExecutorService,
- requestExecutorsService: ExecutorService)
+ requestExecutorsService: ExecutorService,
+ snapshotsStore: ExecutorPodsSnapshotsStore,
+ podAllocator: ExecutorPodsAllocator,
+ lifecycleEventHandler: ExecutorPodsLifecycleManager,
+ watchEvents: ExecutorPodsWatchSnapshotSource,
+ pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
- import KubernetesClusterSchedulerBackend._
-
- private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
- private val RUNNING_EXECUTOR_PODS_LOCK = new Object
- @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
- private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
- private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
- private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]()
- private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]()
-
- private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
-
- private val kubernetesDriverPodName = conf
- .get(KUBERNETES_DRIVER_POD_NAME)
- .getOrElse(throw new SparkException("Must specify the driver pod name"))
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService)
- private val driverPod = kubernetesClient.pods()
- .inNamespace(kubernetesNamespace)
- .withName(kubernetesDriverPodName)
- .get()
-
protected override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
@@ -77,372 +49,93 @@ private[spark] class KubernetesClusterSchedulerBackend(
super.minRegisteredRatio
}
- private val executorWatchResource = new AtomicReference[Closeable]
- private val totalExpectedExecutors = new AtomicInteger(0)
-
- private val driverUrl = RpcEndpointAddress(
- conf.get("spark.driver.host"),
- conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
- CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
-
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
- private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+ // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
+ private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+ removeExecutor(executorId, reason)
+ }
- private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+ override def start(): Unit = {
+ super.start()
+ if (!Utils.isDynamicAllocationEnabled(conf)) {
+ podAllocator.setTotalExpectedExecutors(initialExecutors)
+ }
+ lifecycleEventHandler.start(this)
+ podAllocator.start(applicationId())
+ watchEvents.start(applicationId())
+ pollEvents.start(applicationId())
+ }
- private val executorLostReasonCheckMaxAttempts = conf.get(
- KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
+ override def stop(): Unit = {
+ super.stop()
- private val allocatorRunnable = new Runnable {
-
- // Maintains a map of executor id to count of checks performed to learn the loss reason
- // for an executor.
- private val executorReasonCheckAttemptCounts = new mutable.HashMap[String, Int]
-
- override def run(): Unit = {
- handleDisconnectedExecutors()
-
- val executorsToAllocate = mutable.Map[String, Pod]()
- val currentTotalRegisteredExecutors = totalRegisteredExecutors.get
- val currentTotalExpectedExecutors = totalExpectedExecutors.get
- val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts()
- RUNNING_EXECUTOR_PODS_LOCK.synchronized {
- if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
- logDebug("Waiting for pending executors before scaling")
- } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
- logDebug("Maximum allowed executor limit reached. Not scaling up further.")
- } else {
- for (_ <- 0 until math.min(
- currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
- val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
- val executorConf = KubernetesConf.createExecutorConf(
- conf,
- executorId,
- applicationId(),
- driverPod)
- val executorPod = executorBuilder.buildFromFeatures(executorConf)
- val podWithAttachedContainer = new PodBuilder(executorPod.pod)
- .editOrNewSpec()
- .addToContainers(executorPod.container)
- .endSpec()
- .build()
-
- executorsToAllocate(executorId) = podWithAttachedContainer
- logInfo(
- s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
- }
- }
- }
-
- val allocatedExecutors = executorsToAllocate.mapValues { pod =>
- Utils.tryLog {
- kubernetesClient.pods().create(pod)
- }
- }
-
- RUNNING_EXECUTOR_PODS_LOCK.synchronized {
- allocatedExecutors.map {
- case (executorId, attemptedAllocatedExecutor) =>
- attemptedAllocatedExecutor.map { successfullyAllocatedExecutor =>
- runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor)
- }
- }
- }
+ Utils.tryLogNonFatalError {
+ snapshotsStore.stop()
}
- def handleDisconnectedExecutors(): Unit = {
- // For each disconnected executor, synchronize with the loss reasons that may have been found
- // by the executor pod watcher. If the loss reason was discovered by the watcher,
- // inform the parent class with removeExecutor.
- disconnectedPodsByExecutorIdPendingRemoval.asScala.foreach {
- case (executorId, executorPod) =>
- val knownExitReason = Option(podsWithKnownExitReasons.remove(
- executorPod.getMetadata.getName))
- knownExitReason.fold {
- removeExecutorOrIncrementLossReasonCheckCount(executorId)
- } { executorExited =>
- logWarning(s"Removing executor $executorId with loss reason " + executorExited.message)
- removeExecutor(executorId, executorExited)
- // We don't delete the pod running the executor that has an exit condition caused by
- // the application from the Kubernetes API server. This allows users to debug later on
- // through commands such as "kubectl logs " and
- // "kubectl describe pod ". Note that exited containers have terminated and
- // therefore won't take CPU and memory resources.
- // Otherwise, the executor pod is marked to be deleted from the API server.
- if (executorExited.exitCausedByApp) {
- logInfo(s"Executor $executorId exited because of the application.")
- deleteExecutorFromDataStructures(executorId)
- } else {
- logInfo(s"Executor $executorId failed because of a framework error.")
- deleteExecutorFromClusterAndDataStructures(executorId)
- }
- }
- }
+ Utils.tryLogNonFatalError {
+ watchEvents.stop()
}
- def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
- val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0)
- if (reasonCheckCount >= executorLostReasonCheckMaxAttempts) {
- removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons."))
- deleteExecutorFromClusterAndDataStructures(executorId)
- } else {
- executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1)
- }
+ Utils.tryLogNonFatalError {
+ pollEvents.stop()
}
- def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
- deleteExecutorFromDataStructures(executorId).foreach { pod =>
- kubernetesClient.pods().delete(pod)
- }
+ Utils.tryLogNonFatalError {
+ kubernetesClient.pods()
+ .withLabel(SPARK_APP_ID_LABEL, applicationId())
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .delete()
}
- def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {
- disconnectedPodsByExecutorIdPendingRemoval.remove(executorId)
- executorReasonCheckAttemptCounts -= executorId
- podsWithKnownExitReasons.remove(executorId)
- RUNNING_EXECUTOR_PODS_LOCK.synchronized {
- runningExecutorsToPods.remove(executorId).orElse {
- logWarning(s"Unable to remove pod for unknown executor $executorId")
- None
- }
- }
+ Utils.tryLogNonFatalError {
+ ThreadUtils.shutdown(requestExecutorsService)
}
+
+ Utils.tryLogNonFatalError {
+ kubernetesClient.close()
+ }
+ }
+
+ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
+ // TODO when we support dynamic allocation, the pod allocator should be told to process the
+ // current snapshot in order to decrease/increase the number of executors accordingly.
+ podAllocator.setTotalExpectedExecutors(requestedTotal)
+ true
}
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
}
- override def start(): Unit = {
- super.start()
- executorWatchResource.set(
- kubernetesClient
- .pods()
- .withLabel(SPARK_APP_ID_LABEL, applicationId())
- .watch(new ExecutorPodsWatcher()))
-
- allocatorExecutor.scheduleWithFixedDelay(
- allocatorRunnable, 0L, podAllocationInterval, TimeUnit.MILLISECONDS)
-
- if (!Utils.isDynamicAllocationEnabled(conf)) {
- doRequestTotalExecutors(initialExecutors)
- }
- }
-
- override def stop(): Unit = {
- // stop allocation of new resources and caches.
- allocatorExecutor.shutdown()
- allocatorExecutor.awaitTermination(30, TimeUnit.SECONDS)
-
- // send stop message to executors so they shut down cleanly
- super.stop()
-
- try {
- val resource = executorWatchResource.getAndSet(null)
- if (resource != null) {
- resource.close()
- }
- } catch {
- case e: Throwable => logWarning("Failed to close the executor pod watcher", e)
- }
-
- // then delete the executor pods
- Utils.tryLogNonFatalError {
- deleteExecutorPodsOnStop()
- executorPodsByIPs.clear()
- }
- Utils.tryLogNonFatalError {
- logInfo("Closing kubernetes client")
- kubernetesClient.close()
- }
- }
-
- /**
- * @return A map of K8s cluster nodes to the number of tasks that could benefit from data
- * locality if an executor launches on the cluster node.
- */
- private def getNodesWithLocalTaskCounts() : Map[String, Int] = {
- val nodeToLocalTaskCount = synchronized {
- mutable.Map[String, Int]() ++ hostToLocalTaskCount
- }
-
- for (pod <- executorPodsByIPs.values().asScala) {
- // Remove cluster nodes that are running our executors already.
- // TODO: This prefers spreading out executors across nodes. In case users want
- // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
- // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html
- nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty ||
- nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty ||
- nodeToLocalTaskCount.remove(
- InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty
- }
- nodeToLocalTaskCount.toMap[String, Int]
- }
-
- override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
- totalExpectedExecutors.set(requestedTotal)
- true
+ override def getExecutorIds(): Seq[String] = synchronized {
+ super.getExecutorIds()
}
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
- val podsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
- executorIds.flatMap { executorId =>
- runningExecutorsToPods.remove(executorId) match {
- case Some(pod) =>
- disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
- Some(pod)
-
- case None =>
- logWarning(s"Unable to remove pod for unknown executor $executorId")
- None
- }
- }
- }
-
- kubernetesClient.pods().delete(podsToDelete: _*)
- true
- }
-
- private def deleteExecutorPodsOnStop(): Unit = {
- val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
- val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*)
- runningExecutorsToPods.clear()
- runningExecutorPodsCopy
- }
- kubernetesClient.pods().delete(executorPodsToDelete: _*)
- }
-
- private class ExecutorPodsWatcher extends Watcher[Pod] {
-
- private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1
-
- override def eventReceived(action: Action, pod: Pod): Unit = {
- val podName = pod.getMetadata.getName
- val podIP = pod.getStatus.getPodIP
-
- action match {
- case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
- && pod.getMetadata.getDeletionTimestamp == null) =>
- val clusterNodeName = pod.getSpec.getNodeName
- logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
- executorPodsByIPs.put(podIP, pod)
-
- case Action.DELETED | Action.ERROR =>
- val executorId = getExecutorId(pod)
- logDebug(s"Executor pod $podName at IP $podIP was at $action.")
- if (podIP != null) {
- executorPodsByIPs.remove(podIP)
- }
-
- val executorExitReason = if (action == Action.ERROR) {
- logWarning(s"Received error event of executor pod $podName. Reason: " +
- pod.getStatus.getReason)
- executorExitReasonOnError(pod)
- } else if (action == Action.DELETED) {
- logWarning(s"Received delete event of executor pod $podName. Reason: " +
- pod.getStatus.getReason)
- executorExitReasonOnDelete(pod)
- } else {
- throw new IllegalStateException(
- s"Unknown action that should only be DELETED or ERROR: $action")
- }
- podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
-
- if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
- log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
- s"watch received an event of type $action for this executor. The executor may " +
- "have failed to start in the first place and never registered with the driver.")
- }
- disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
-
- case _ => logDebug(s"Received event of executor pod $podName: " + action)
- }
- }
-
- override def onClose(cause: KubernetesClientException): Unit = {
- logDebug("Executor pod watch closed.", cause)
- }
-
- private def getExecutorExitStatus(pod: Pod): Int = {
- val containerStatuses = pod.getStatus.getContainerStatuses
- if (!containerStatuses.isEmpty) {
- // we assume the first container represents the pod status. This assumption may not hold
- // true in the future. Revisit this if side-car containers start running inside executor
- // pods.
- getExecutorExitStatus(containerStatuses.get(0))
- } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
- }
-
- private def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
- Option(containerStatus.getState).map { containerState =>
- Option(containerState.getTerminated).map { containerStateTerminated =>
- containerStateTerminated.getExitCode.intValue()
- }.getOrElse(UNKNOWN_EXIT_CODE)
- }.getOrElse(UNKNOWN_EXIT_CODE)
- }
-
- private def isPodAlreadyReleased(pod: Pod): Boolean = {
- val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
- RUNNING_EXECUTOR_PODS_LOCK.synchronized {
- !runningExecutorsToPods.contains(executorId)
- }
- }
-
- private def executorExitReasonOnError(pod: Pod): ExecutorExited = {
- val containerExitStatus = getExecutorExitStatus(pod)
- // container was probably actively killed by the driver.
- if (isPodAlreadyReleased(pod)) {
- ExecutorExited(containerExitStatus, exitCausedByApp = false,
- s"Container in pod ${pod.getMetadata.getName} exited from explicit termination " +
- "request.")
- } else {
- val containerExitReason = s"Pod ${pod.getMetadata.getName}'s executor container " +
- s"exited with exit status code $containerExitStatus."
- ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
- }
- }
-
- private def executorExitReasonOnDelete(pod: Pod): ExecutorExited = {
- val exitMessage = if (isPodAlreadyReleased(pod)) {
- s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."
- } else {
- s"Pod ${pod.getMetadata.getName} deleted or lost."
- }
- ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
- }
-
- private def getExecutorId(pod: Pod): String = {
- val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
- require(executorId != null, "Unexpected pod metadata; expected all executor pods " +
- s"to have label $SPARK_EXECUTOR_ID_LABEL.")
- executorId
- }
+ kubernetesClient.pods()
+ .withLabel(SPARK_APP_ID_LABEL, applicationId())
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
+ .delete()
+ // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
}
override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
}
- private class KubernetesDriverEndpoint(
- rpcEnv: RpcEnv,
- sparkProperties: Seq[(String, String)])
+ private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
- addressToExecutorId.get(rpcAddress).foreach { executorId =>
- if (disableExecutor(executorId)) {
- RUNNING_EXECUTOR_PODS_LOCK.synchronized {
- runningExecutorsToPods.get(executorId).foreach { pod =>
- disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
- }
- }
- }
- }
+ // Don't do anything besides disabling the executor - allow the Kubernetes API events to
+ // drive the rest of the lifecycle decisions
+ // TODO what if we disconnect from a networking issue? Probably want to mark the executor
+ // to be deleted eventually.
+ addressToExecutorId.get(rpcAddress).foreach(disableExecutor)
}
}
-}
-private object KubernetesClusterSchedulerBackend {
- private val UNKNOWN_EXIT_CODE = -1
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
new file mode 100644
index 0000000000..527fc6b0d8
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{DoneablePod, HasMetadata, Pod, PodList}
+import io.fabric8.kubernetes.client.{Watch, Watcher}
+import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
+
+object Fabric8Aliases {
+ type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
+ type LABELED_PODS = FilterWatchListDeletable[
+ Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
+ type SINGLE_POD = PodResource[Pod, DoneablePod]
+ type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
+ HasMetadata, Boolean]
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index a8a8218c62..d045d9ae89 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.mockito.MockitoSugar._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
class ClientSuite extends SparkFunSuite with BeforeAndAfter {
@@ -103,15 +104,11 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
.build()
}
- private type ResourceList = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
- HasMetadata, Boolean]
- private type Pods = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
-
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
- private var podOperations: Pods = _
+ private var podOperations: PODS = _
@Mock
private var namedPods: PodResource[Pod, DoneablePod] = _
@@ -123,7 +120,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private var driverBuilder: KubernetesDriverBuilder = _
@Mock
- private var resourceList: ResourceList = _
+ private var resourceList: RESOURCE_LIST = _
private var kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf] = _
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
new file mode 100644
index 0000000000..f7721e6fd6
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+import scala.collection.mutable
+
+class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore {
+
+ private val snapshotsBuffer = mutable.Buffer.empty[ExecutorPodsSnapshot]
+ private val subscribers = mutable.Buffer.empty[Seq[ExecutorPodsSnapshot] => Unit]
+
+ private var currentSnapshot = ExecutorPodsSnapshot()
+
+ override def addSubscriber
+ (processBatchIntervalMillis: Long)
+ (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
+ subscribers += onNewSnapshots
+ }
+
+ override def stop(): Unit = {}
+
+ def notifySubscribers(): Unit = {
+ subscribers.foreach(_(snapshotsBuffer))
+ snapshotsBuffer.clear()
+ }
+
+ override def updatePod(updatedPod: Pod): Unit = {
+ currentSnapshot = currentSnapshot.withUpdate(updatedPod)
+ snapshotsBuffer += currentSnapshot
+ }
+
+ override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = {
+ currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
+ snapshotsBuffer += currentSnapshot
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
new file mode 100644
index 0000000000..c6b667ed85
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder}
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkPod
+
+object ExecutorLifecycleTestUtils {
+
+ val TEST_SPARK_APP_ID = "spark-app-id"
+
+ def failedExecutorWithoutDeletion(executorId: Long): Pod = {
+ new PodBuilder(podWithAttachedContainerForId(executorId))
+ .editOrNewStatus()
+ .withPhase("failed")
+ .addNewContainerStatus()
+ .withName("spark-executor")
+ .withImage("k8s-spark")
+ .withNewState()
+ .withNewTerminated()
+ .withMessage("Failed")
+ .withExitCode(1)
+ .endTerminated()
+ .endState()
+ .endContainerStatus()
+ .addNewContainerStatus()
+ .withName("spark-executor-sidecar")
+ .withImage("k8s-spark-sidecar")
+ .withNewState()
+ .withNewTerminated()
+ .withMessage("Failed")
+ .withExitCode(1)
+ .endTerminated()
+ .endState()
+ .endContainerStatus()
+ .withMessage("Executor failed.")
+ .withReason("Executor failed because of a thrown error.")
+ .endStatus()
+ .build()
+ }
+
+ def pendingExecutor(executorId: Long): Pod = {
+ new PodBuilder(podWithAttachedContainerForId(executorId))
+ .editOrNewStatus()
+ .withPhase("pending")
+ .endStatus()
+ .build()
+ }
+
+ def runningExecutor(executorId: Long): Pod = {
+ new PodBuilder(podWithAttachedContainerForId(executorId))
+ .editOrNewStatus()
+ .withPhase("running")
+ .endStatus()
+ .build()
+ }
+
+ def succeededExecutor(executorId: Long): Pod = {
+ new PodBuilder(podWithAttachedContainerForId(executorId))
+ .editOrNewStatus()
+ .withPhase("succeeded")
+ .endStatus()
+ .build()
+ }
+
+ def deletedExecutor(executorId: Long): Pod = {
+ new PodBuilder(podWithAttachedContainerForId(executorId))
+ .editOrNewMetadata()
+ .withNewDeletionTimestamp("523012521")
+ .endMetadata()
+ .build()
+ }
+
+ def unknownExecutor(executorId: Long): Pod = {
+ new PodBuilder(podWithAttachedContainerForId(executorId))
+ .editOrNewStatus()
+ .withPhase("unknown")
+ .endStatus()
+ .build()
+ }
+
+ def podWithAttachedContainerForId(executorId: Long): Pod = {
+ val sparkPod = executorPodWithId(executorId)
+ val podWithAttachedContainer = new PodBuilder(sparkPod.pod)
+ .editOrNewSpec()
+ .addToContainers(sparkPod.container)
+ .endSpec()
+ .build()
+ podWithAttachedContainer
+ }
+
+ def executorPodWithId(executorId: Long): SparkPod = {
+ val pod = new PodBuilder()
+ .withNewMetadata()
+ .withName(s"spark-executor-$executorId")
+ .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
+ .endMetadata()
+ .build()
+ val container = new ContainerBuilder()
+ .withName("spark-executor")
+ .withImage("k8s-spark")
+ .build()
+ SparkPod(pod, container)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
new file mode 100644
index 0000000000..0c19f5946b
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.PodResource
+import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{never, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+import org.apache.spark.util.ManualClock
+
+class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private val driverPodName = "driver"
+
+ private val driverPod = new PodBuilder()
+ .withNewMetadata()
+ .withName(driverPodName)
+ .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+ .withUid("driver-pod-uid")
+ .endMetadata()
+ .build()
+
+ private val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+
+ private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+ private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+ private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L)
+
+ private var waitForExecutorPodsClock: ManualClock = _
+
+ @Mock
+ private var kubernetesClient: KubernetesClient = _
+
+ @Mock
+ private var podOperations: PODS = _
+
+ @Mock
+ private var labeledPods: LABELED_PODS = _
+
+ @Mock
+ private var driverPodOperations: PodResource[Pod, DoneablePod] = _
+
+ @Mock
+ private var executorBuilder: KubernetesExecutorBuilder = _
+
+ private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+
+ private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
+ when(driverPodOperations.get).thenReturn(driverPod)
+ when(executorBuilder.buildFromFeatures(kubernetesConfWithCorrectFields()))
+ .thenAnswer(executorPodAnswer())
+ snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
+ waitForExecutorPodsClock = new ManualClock(0L)
+ podsAllocatorUnderTest = new ExecutorPodsAllocator(
+ conf, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
+ podsAllocatorUnderTest.start(TEST_SPARK_APP_ID)
+ }
+
+ test("Initially request executors in batches. Do not request another batch if the" +
+ " first has not finished.") {
+ podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
+ snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+ snapshotsStore.notifySubscribers()
+ for (nextId <- 1 to podAllocationSize) {
+ verify(podOperations).create(podWithAttachedContainerForId(nextId))
+ }
+ verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+ }
+
+ test("Request executors in batches. Allow another batch to be requested if" +
+ " all pending executors start running.") {
+ podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
+ snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+ snapshotsStore.notifySubscribers()
+ for (execId <- 1 until podAllocationSize) {
+ snapshotsStore.updatePod(runningExecutor(execId))
+ }
+ snapshotsStore.notifySubscribers()
+ verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+ snapshotsStore.updatePod(runningExecutor(podAllocationSize))
+ snapshotsStore.notifySubscribers()
+ verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
+ snapshotsStore.updatePod(runningExecutor(podAllocationSize))
+ snapshotsStore.notifySubscribers()
+ verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod]))
+ }
+
+ test("When a current batch reaches error states immediately, re-request" +
+ " them on the next batch.") {
+ podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize)
+ snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+ snapshotsStore.notifySubscribers()
+ for (execId <- 1 until podAllocationSize) {
+ snapshotsStore.updatePod(runningExecutor(execId))
+ }
+ val failedPod = failedExecutorWithoutDeletion(podAllocationSize)
+ snapshotsStore.updatePod(failedPod)
+ snapshotsStore.notifySubscribers()
+ verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
+ }
+
+ test("When an executor is requested but the API does not report it in a reasonable time, retry" +
+ " requesting that executor.") {
+ podsAllocatorUnderTest.setTotalExpectedExecutors(1)
+ snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+ snapshotsStore.notifySubscribers()
+ snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+ waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
+ when(podOperations.withLabel(SPARK_EXECUTOR_ID_LABEL, "1")).thenReturn(labeledPods)
+ snapshotsStore.notifySubscribers()
+ verify(labeledPods).delete()
+ verify(podOperations).create(podWithAttachedContainerForId(2))
+ }
+
+ private def executorPodAnswer(): Answer[SparkPod] = {
+ new Answer[SparkPod] {
+ override def answer(invocation: InvocationOnMock): SparkPod = {
+ val k8sConf = invocation.getArgumentAt(
+ 0, classOf[KubernetesConf[KubernetesExecutorSpecificConf]])
+ executorPodWithId(k8sConf.roleSpecificConf.executorId.toInt)
+ }
+ }
+ }
+
+ private def kubernetesConfWithCorrectFields(): KubernetesConf[KubernetesExecutorSpecificConf] =
+ Matchers.argThat(new ArgumentMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
+ override def matches(argument: scala.Any): Boolean = {
+ if (!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) {
+ false
+ } else {
+ val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
+ val executorSpecificConf = k8sConf.roleSpecificConf
+ val expectedK8sConf = KubernetesConf.createExecutorConf(
+ conf,
+ executorSpecificConf.executorId,
+ TEST_SPARK_APP_ID,
+ driverPod)
+ k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
+ // Since KubernetesConf.createExecutorConf clones the SparkConf object, force
+ // deep equality comparison for the SparkConf object and use object equality
+ // comparison on all other fields.
+ k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf)
+ }
+ }
+ })
+
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
new file mode 100644
index 0000000000..562ace9f49
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.CacheBuilder
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.PodResource
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private var namedExecutorPods: mutable.Map[String, PodResource[Pod, DoneablePod]] = _
+
+ @Mock
+ private var kubernetesClient: KubernetesClient = _
+
+ @Mock
+ private var podOperations: PODS = _
+
+ @Mock
+ private var executorBuilder: KubernetesExecutorBuilder = _
+
+ @Mock
+ private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+
+ private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+ private var eventHandlerUnderTest: ExecutorPodsLifecycleManager = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ val removedExecutorsCache = CacheBuilder.newBuilder().build[java.lang.Long, java.lang.Long]
+ snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
+ namedExecutorPods = mutable.Map.empty[String, PodResource[Pod, DoneablePod]]
+ when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty[String])
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
+ eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
+ new SparkConf(),
+ executorBuilder,
+ kubernetesClient,
+ snapshotsStore,
+ removedExecutorsCache)
+ eventHandlerUnderTest.start(schedulerBackend)
+ }
+
+ test("When an executor reaches error states immediately, remove from the scheduler backend.") {
+ val failedPod = failedExecutorWithoutDeletion(1)
+ snapshotsStore.updatePod(failedPod)
+ snapshotsStore.notifySubscribers()
+ val msg = exitReasonMessage(1, failedPod)
+ val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+ verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+ verify(namedExecutorPods(failedPod.getMetadata.getName)).delete()
+ }
+
+ test("Don't remove executors twice from Spark but remove from K8s repeatedly.") {
+ val failedPod = failedExecutorWithoutDeletion(1)
+ snapshotsStore.updatePod(failedPod)
+ snapshotsStore.updatePod(failedPod)
+ snapshotsStore.notifySubscribers()
+ val msg = exitReasonMessage(1, failedPod)
+ val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+ verify(schedulerBackend, times(1)).doRemoveExecutor("1", expectedLossReason)
+ verify(namedExecutorPods(failedPod.getMetadata.getName), times(2)).delete()
+ }
+
+ test("When the scheduler backend lists executor ids that aren't present in the cluster," +
+ " remove those executors from Spark.") {
+ when(schedulerBackend.getExecutorIds()).thenReturn(Seq("1"))
+ val msg = s"The executor with ID 1 was not found in the cluster but we didn't" +
+ s" get a reason why. Marking the executor as failed. The executor may have been" +
+ s" deleted but the driver missed the deletion event."
+ val expectedLossReason = ExecutorExited(-1, exitCausedByApp = false, msg)
+ snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+ snapshotsStore.notifySubscribers()
+ verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+ }
+
+ private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = {
+ s"""
+ |The executor with id $failedExecutorId exited with exit code 1.
+ |The API gave the following brief reason: ${failedPod.getStatus.getReason}
+ |The API gave the following message: ${failedPod.getStatus.getMessage}
+ |The API gave the following container statuses:
+ |
+ |${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
+ """.stripMargin
+ }
+
+ private def namedPodsAnswer(): Answer[PodResource[Pod, DoneablePod]] = {
+ new Answer[PodResource[Pod, DoneablePod]] {
+ override def answer(invocation: InvocationOnMock): PodResource[Pod, DoneablePod] = {
+ val podName = invocation.getArgumentAt(0, classOf[String])
+ namedExecutorPods.getOrElseUpdate(
+ podName, mock(classOf[PodResource[Pod, DoneablePod]]))
+ }
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
new file mode 100644
index 0000000000..1b26d6af29
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+
+import io.fabric8.kubernetes.api.model.PodListBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.{verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private val sparkConf = new SparkConf
+
+ private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+ @Mock
+ private var kubernetesClient: KubernetesClient = _
+
+ @Mock
+ private var podOperations: PODS = _
+
+ @Mock
+ private var appIdLabeledPods: LABELED_PODS = _
+
+ @Mock
+ private var executorRoleLabeledPods: LABELED_PODS = _
+
+ @Mock
+ private var eventQueue: ExecutorPodsSnapshotsStore = _
+
+ private var pollingExecutor: DeterministicScheduler = _
+ private var pollingSourceUnderTest: ExecutorPodsPollingSnapshotSource = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ pollingExecutor = new DeterministicScheduler()
+ pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
+ sparkConf,
+ kubernetesClient,
+ eventQueue,
+ pollingExecutor)
+ pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+ .thenReturn(appIdLabeledPods)
+ when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+ .thenReturn(executorRoleLabeledPods)
+ }
+
+ test("Items returned by the API should be pushed to the event queue") {
+ when(executorRoleLabeledPods.list())
+ .thenReturn(new PodListBuilder()
+ .addToItems(
+ runningExecutor(1),
+ runningExecutor(2))
+ .build())
+ pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
+ verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), runningExecutor(2)))
+
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
new file mode 100644
index 0000000000..70e19c904e
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsSnapshotSuite extends SparkFunSuite {
+
+ test("States are interpreted correctly from pod metadata.") {
+ val pods = Seq(
+ pendingExecutor(0),
+ runningExecutor(1),
+ succeededExecutor(2),
+ failedExecutorWithoutDeletion(3),
+ deletedExecutor(4),
+ unknownExecutor(5))
+ val snapshot = ExecutorPodsSnapshot(pods)
+ assert(snapshot.executorPods ===
+ Map(
+ 0L -> PodPending(pods(0)),
+ 1L -> PodRunning(pods(1)),
+ 2L -> PodSucceeded(pods(2)),
+ 3L -> PodFailed(pods(3)),
+ 4L -> PodDeleted(pods(4)),
+ 5L -> PodUnknown(pods(5))))
+ }
+
+ test("Updates add new pods for non-matching ids and edit existing pods for matching ids") {
+ val originalPods = Seq(
+ pendingExecutor(0),
+ runningExecutor(1))
+ val originalSnapshot = ExecutorPodsSnapshot(originalPods)
+ val snapshotWithUpdatedPod = originalSnapshot.withUpdate(succeededExecutor(1))
+ assert(snapshotWithUpdatedPod.executorPods ===
+ Map(
+ 0L -> PodPending(originalPods(0)),
+ 1L -> PodSucceeded(succeededExecutor(1))))
+ val snapshotWithNewPod = snapshotWithUpdatedPod.withUpdate(pendingExecutor(2))
+ assert(snapshotWithNewPod.executorPods ===
+ Map(
+ 0L -> PodPending(originalPods(0)),
+ 1L -> PodSucceeded(succeededExecutor(1)),
+ 2L -> PodPending(pendingExecutor(2))))
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
new file mode 100644
index 0000000000..cf54b3c4eb
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.scalatest.BeforeAndAfter
+import scala.collection.mutable
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Constants._
+
+class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private var eventBufferScheduler: DeterministicScheduler = _
+ private var eventQueueUnderTest: ExecutorPodsSnapshotsStoreImpl = _
+
+ before {
+ eventBufferScheduler = new DeterministicScheduler()
+ eventQueueUnderTest = new ExecutorPodsSnapshotsStoreImpl(eventBufferScheduler)
+ }
+
+ test("Subscribers get notified of events periodically.") {
+ val receivedSnapshots1 = mutable.Buffer.empty[ExecutorPodsSnapshot]
+ val receivedSnapshots2 = mutable.Buffer.empty[ExecutorPodsSnapshot]
+ eventQueueUnderTest.addSubscriber(1000) {
+ receivedSnapshots1 ++= _
+ }
+ eventQueueUnderTest.addSubscriber(2000) {
+ receivedSnapshots2 ++= _
+ }
+
+ eventBufferScheduler.runUntilIdle()
+ assert(receivedSnapshots1 === Seq(ExecutorPodsSnapshot()))
+ assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
+
+ pushPodWithIndex(1)
+ // Force time to move forward so that the buffer is emitted, scheduling the
+ // processing task on the subscription executor...
+ eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+ // ... then actually execute the subscribers.
+
+ assert(receivedSnapshots1 === Seq(
+ ExecutorPodsSnapshot(),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+ assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
+
+ eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+
+ // Don't repeat snapshots
+ assert(receivedSnapshots1 === Seq(
+ ExecutorPodsSnapshot(),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+ assert(receivedSnapshots2 === Seq(
+ ExecutorPodsSnapshot(),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+ pushPodWithIndex(2)
+ pushPodWithIndex(3)
+ eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+
+ assert(receivedSnapshots1 === Seq(
+ ExecutorPodsSnapshot(),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)))))
+ assert(receivedSnapshots2 === Seq(
+ ExecutorPodsSnapshot(),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+
+ eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+ assert(receivedSnapshots1 === Seq(
+ ExecutorPodsSnapshot(),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)))))
+ assert(receivedSnapshots1 === receivedSnapshots2)
+ }
+
+ test("Even without sending events, initially receive an empty buffer.") {
+ val receivedInitialSnapshot = new AtomicReference[Seq[ExecutorPodsSnapshot]](null)
+ eventQueueUnderTest.addSubscriber(1000) {
+ receivedInitialSnapshot.set
+ }
+ assert(receivedInitialSnapshot.get == null)
+ eventBufferScheduler.runUntilIdle()
+ assert(receivedInitialSnapshot.get === Seq(ExecutorPodsSnapshot()))
+ }
+
+ test("Replacing the snapshot passes the new snapshot to subscribers.") {
+ val receivedSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot]
+ eventQueueUnderTest.addSubscriber(1000) {
+ receivedSnapshots ++= _
+ }
+ eventQueueUnderTest.updatePod(podWithIndex(1))
+ eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+ assert(receivedSnapshots === Seq(
+ ExecutorPodsSnapshot(),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+ eventQueueUnderTest.replaceSnapshot(Seq(podWithIndex(2)))
+ eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+ assert(receivedSnapshots === Seq(
+ ExecutorPodsSnapshot(),
+ ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+ ExecutorPodsSnapshot(Seq(podWithIndex(2)))))
+ }
+
+ private def pushPodWithIndex(index: Int): Unit =
+ eventQueueUnderTest.updatePod(podWithIndex(index))
+
+ private def podWithIndex(index: Int): Pod =
+ new PodBuilder()
+ .editOrNewMetadata()
+ .withName(s"pod-$index")
+ .addToLabels(SPARK_EXECUTOR_ID_LABEL, index.toString)
+ .endMetadata()
+ .editOrNewStatus()
+ .withPhase("running")
+ .endStatus()
+ .build()
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
new file mode 100644
index 0000000000..ac1968b4ff
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.Mockito.{verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {
+
+ @Mock
+ private var eventQueue: ExecutorPodsSnapshotsStore = _
+
+ @Mock
+ private var kubernetesClient: KubernetesClient = _
+
+ @Mock
+ private var podOperations: PODS = _
+
+ @Mock
+ private var appIdLabeledPods: LABELED_PODS = _
+
+ @Mock
+ private var executorRoleLabeledPods: LABELED_PODS = _
+
+ @Mock
+ private var watchConnection: Watch = _
+
+ private var watch: ArgumentCaptor[Watcher[Pod]] = _
+
+ private var watchSourceUnderTest: ExecutorPodsWatchSnapshotSource = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ watch = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+ .thenReturn(appIdLabeledPods)
+ when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+ .thenReturn(executorRoleLabeledPods)
+ when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
+ watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
+ eventQueue, kubernetesClient)
+ watchSourceUnderTest.start(TEST_SPARK_APP_ID)
+ }
+
+ test("Watch events should be pushed to the snapshots store as snapshot updates.") {
+ watch.getValue.eventReceived(Action.ADDED, runningExecutor(1))
+ watch.getValue.eventReceived(Action.MODIFIED, runningExecutor(2))
+ verify(eventQueue).updatePod(runningExecutor(1))
+ verify(eventQueue).updatePod(runningExecutor(2))
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 96065e83f0..52e7a12dba 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -16,100 +16,30 @@
*/
package org.apache.spark.scheduler.cluster.k8s
-import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit}
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, Pod, PodBuilder, PodList}
-import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
-import io.fabric8.kubernetes.client.Watcher.Action
-import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
-import org.hamcrest.{BaseMatcher, Description, Matcher}
-import org.mockito.{AdditionalAnswers, ArgumentCaptor, Matchers, Mock, MockitoAnnotations}
-import org.mockito.Matchers.{any, eq => mockitoEq}
-import org.mockito.Mockito.{doNothing, never, times, verify, when}
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.Matchers.{eq => mockitoEq}
+import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter
-import org.scalatest.mockito.MockitoSugar._
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.rpc._
-import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorKilled, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter {
- private val APP_ID = "test-spark-app"
- private val DRIVER_POD_NAME = "spark-driver-pod"
- private val NAMESPACE = "test-namespace"
- private val SPARK_DRIVER_HOST = "localhost"
- private val SPARK_DRIVER_PORT = 7077
- private val POD_ALLOCATION_INTERVAL = "1m"
- private val FIRST_EXECUTOR_POD = new PodBuilder()
- .withNewMetadata()
- .withName("pod1")
- .endMetadata()
- .withNewSpec()
- .withNodeName("node1")
- .endSpec()
- .withNewStatus()
- .withHostIP("192.168.99.100")
- .endStatus()
- .build()
- private val SECOND_EXECUTOR_POD = new PodBuilder()
- .withNewMetadata()
- .withName("pod2")
- .endMetadata()
- .withNewSpec()
- .withNodeName("node2")
- .endSpec()
- .withNewStatus()
- .withHostIP("192.168.99.101")
- .endStatus()
- .build()
-
- private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
- private type LABELED_PODS = FilterWatchListDeletable[
- Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
- private type IN_NAMESPACE_PODS = NonNamespaceOperation[
- Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
+ private val requestExecutorsService = new DeterministicScheduler()
+ private val sparkConf = new SparkConf(false)
+ .set("spark.executor.instances", "3")
@Mock
- private var sparkContext: SparkContext = _
-
- @Mock
- private var listenerBus: LiveListenerBus = _
-
- @Mock
- private var taskSchedulerImpl: TaskSchedulerImpl = _
-
- @Mock
- private var allocatorExecutor: ScheduledExecutorService = _
-
- @Mock
- private var requestExecutorsService: ExecutorService = _
-
- @Mock
- private var executorBuilder: KubernetesExecutorBuilder = _
-
- @Mock
- private var kubernetesClient: KubernetesClient = _
-
- @Mock
- private var podOperations: PODS = _
-
- @Mock
- private var podsWithLabelOperations: LABELED_PODS = _
-
- @Mock
- private var podsInNamespace: IN_NAMESPACE_PODS = _
-
- @Mock
- private var podsWithDriverName: PodResource[Pod, DoneablePod] = _
+ private var sc: SparkContext = _
@Mock
private var rpcEnv: RpcEnv = _
@@ -118,332 +48,103 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private var driverEndpointRef: RpcEndpointRef = _
@Mock
- private var executorPodsWatch: Watch = _
+ private var kubernetesClient: KubernetesClient = _
@Mock
- private var successFuture: Future[Boolean] = _
+ private var podOperations: PODS = _
+
+ @Mock
+ private var labeledPods: LABELED_PODS = _
+
+ @Mock
+ private var taskScheduler: TaskSchedulerImpl = _
+
+ @Mock
+ private var eventQueue: ExecutorPodsSnapshotsStore = _
+
+ @Mock
+ private var podAllocator: ExecutorPodsAllocator = _
+
+ @Mock
+ private var lifecycleEventHandler: ExecutorPodsLifecycleManager = _
+
+ @Mock
+ private var watchEvents: ExecutorPodsWatchSnapshotSource = _
+
+ @Mock
+ private var pollEvents: ExecutorPodsPollingSnapshotSource = _
- private var sparkConf: SparkConf = _
- private var executorPodsWatcherArgument: ArgumentCaptor[Watcher[Pod]] = _
- private var allocatorRunnable: ArgumentCaptor[Runnable] = _
- private var requestExecutorRunnable: ArgumentCaptor[Runnable] = _
private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
-
- private val driverPod = new PodBuilder()
- .withNewMetadata()
- .withName(DRIVER_POD_NAME)
- .addToLabels(SPARK_APP_ID_LABEL, APP_ID)
- .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
- .endMetadata()
- .build()
+ private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
before {
MockitoAnnotations.initMocks(this)
- sparkConf = new SparkConf()
- .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
- .set(KUBERNETES_NAMESPACE, NAMESPACE)
- .set("spark.driver.host", SPARK_DRIVER_HOST)
- .set("spark.driver.port", SPARK_DRIVER_PORT.toString)
- .set(KUBERNETES_ALLOCATION_BATCH_DELAY.key, POD_ALLOCATION_INTERVAL)
- executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
- allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
- requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
+ when(taskScheduler.sc).thenReturn(sc)
+ when(sc.conf).thenReturn(sparkConf)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
- when(sparkContext.conf).thenReturn(sparkConf)
- when(sparkContext.listenerBus).thenReturn(listenerBus)
- when(taskSchedulerImpl.sc).thenReturn(sparkContext)
- when(kubernetesClient.pods()).thenReturn(podOperations)
- when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations)
- when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture()))
- .thenReturn(executorPodsWatch)
- when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace)
- when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName)
- when(podsWithDriverName.get()).thenReturn(driverPod)
- when(allocatorExecutor.scheduleWithFixedDelay(
- allocatorRunnable.capture(),
- mockitoEq(0L),
- mockitoEq(TimeUnit.MINUTES.toMillis(1)),
- mockitoEq(TimeUnit.MILLISECONDS))).thenReturn(null)
- // Creating Futures in Scala backed by a Java executor service resolves to running
- // ExecutorService#execute (as opposed to submit)
- doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())
when(rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
-
- // Used by the CoarseGrainedSchedulerBackend when making RPC calls.
- when(driverEndpointRef.ask[Boolean]
- (any(classOf[Any]))
- (any())).thenReturn(successFuture)
- when(successFuture.failed).thenReturn(Future[Throwable] {
- // emulate behavior of the Future.failed method.
- throw new NoSuchElementException()
- }(ThreadUtils.sameThread))
- }
-
- test("Basic lifecycle expectations when starting and stopping the scheduler.") {
- val scheduler = newSchedulerBackend()
- scheduler.start()
- assert(executorPodsWatcherArgument.getValue != null)
- assert(allocatorRunnable.getValue != null)
- scheduler.stop()
- verify(executorPodsWatch).close()
- }
-
- test("Static allocation should request executors upon first allocator run.") {
- sparkConf
- .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
- .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
- val scheduler = newSchedulerBackend()
- scheduler.start()
- requestExecutorRunnable.getValue.run()
- val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
- val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
- when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
- allocatorRunnable.getValue.run()
- verify(podOperations).create(firstResolvedPod)
- verify(podOperations).create(secondResolvedPod)
- }
-
- test("Killing executors deletes the executor pods") {
- sparkConf
- .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
- .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
- val scheduler = newSchedulerBackend()
- scheduler.start()
- requestExecutorRunnable.getValue.run()
- val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
- val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
- when(podOperations.create(any(classOf[Pod])))
- .thenAnswer(AdditionalAnswers.returnsFirstArg())
- allocatorRunnable.getValue.run()
- scheduler.doKillExecutors(Seq("2"))
- requestExecutorRunnable.getAllValues.asScala.last.run()
- verify(podOperations).delete(secondResolvedPod)
- verify(podOperations, never()).delete(firstResolvedPod)
- }
-
- test("Executors should be requested in batches.") {
- sparkConf
- .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
- .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
- val scheduler = newSchedulerBackend()
- scheduler.start()
- requestExecutorRunnable.getValue.run()
- when(podOperations.create(any(classOf[Pod])))
- .thenAnswer(AdditionalAnswers.returnsFirstArg())
- val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
- val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
- allocatorRunnable.getValue.run()
- verify(podOperations).create(firstResolvedPod)
- verify(podOperations, never()).create(secondResolvedPod)
- val registerFirstExecutorMessage = RegisterExecutor(
- "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String])
- when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
- driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
- .apply(registerFirstExecutorMessage)
- allocatorRunnable.getValue.run()
- verify(podOperations).create(secondResolvedPod)
- }
-
- test("Scaled down executors should be cleaned up") {
- sparkConf
- .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
- .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
- val scheduler = newSchedulerBackend()
- scheduler.start()
-
- // The scheduler backend spins up one executor pod.
- requestExecutorRunnable.getValue.run()
- when(podOperations.create(any(classOf[Pod])))
- .thenAnswer(AdditionalAnswers.returnsFirstArg())
- val resolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
- allocatorRunnable.getValue.run()
- val executorEndpointRef = mock[RpcEndpointRef]
- when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
- val registerFirstExecutorMessage = RegisterExecutor(
- "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
- when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
- driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
- .apply(registerFirstExecutorMessage)
-
- // Request that there are 0 executors and trigger deletion from driver.
- scheduler.doRequestTotalExecutors(0)
- requestExecutorRunnable.getAllValues.asScala.last.run()
- scheduler.doKillExecutors(Seq("1"))
- requestExecutorRunnable.getAllValues.asScala.last.run()
- verify(podOperations, times(1)).delete(resolvedPod)
- driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
-
- val exitedPod = exitPod(resolvedPod, 0)
- executorPodsWatcherArgument.getValue.eventReceived(Action.DELETED, exitedPod)
- allocatorRunnable.getValue.run()
-
- // No more deletion attempts of the executors.
- // This is graceful termination and should not be detected as a failure.
- verify(podOperations, times(1)).delete(resolvedPod)
- verify(driverEndpointRef, times(1)).send(
- RemoveExecutor("1", ExecutorExited(
- 0,
- exitCausedByApp = false,
- s"Container in pod ${exitedPod.getMetadata.getName} exited from" +
- s" explicit termination request.")))
- }
-
- test("Executors that fail should not be deleted.") {
- sparkConf
- .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
- .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-
- val scheduler = newSchedulerBackend()
- scheduler.start()
- val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
- when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
- requestExecutorRunnable.getValue.run()
- allocatorRunnable.getValue.run()
- val executorEndpointRef = mock[RpcEndpointRef]
- when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
- val registerFirstExecutorMessage = RegisterExecutor(
- "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
- when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
- driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
- .apply(registerFirstExecutorMessage)
- driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
- executorPodsWatcherArgument.getValue.eventReceived(
- Action.ERROR, exitPod(firstResolvedPod, 1))
-
- // A replacement executor should be created but the error pod should persist.
- val replacementPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
- scheduler.doRequestTotalExecutors(1)
- requestExecutorRunnable.getValue.run()
- allocatorRunnable.getAllValues.asScala.last.run()
- verify(podOperations, never()).delete(firstResolvedPod)
- verify(driverEndpointRef).send(
- RemoveExecutor("1", ExecutorExited(
- 1,
- exitCausedByApp = true,
- s"Pod ${FIRST_EXECUTOR_POD.getMetadata.getName}'s executor container exited with" +
- " exit status code 1.")))
- }
-
- test("Executors disconnected due to unknown reasons are deleted and replaced.") {
- sparkConf
- .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
- .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
- val executorLostReasonCheckMaxAttempts = sparkConf.get(
- KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
-
- val scheduler = newSchedulerBackend()
- scheduler.start()
- val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
- when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
- requestExecutorRunnable.getValue.run()
- allocatorRunnable.getValue.run()
- val executorEndpointRef = mock[RpcEndpointRef]
- when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
- val registerFirstExecutorMessage = RegisterExecutor(
- "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
- when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
- driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
- .apply(registerFirstExecutorMessage)
-
- driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
- 1 to executorLostReasonCheckMaxAttempts foreach { _ =>
- allocatorRunnable.getValue.run()
- verify(podOperations, never()).delete(FIRST_EXECUTOR_POD)
- }
-
- val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
- allocatorRunnable.getValue.run()
- verify(podOperations).delete(firstResolvedPod)
- verify(driverEndpointRef).send(
- RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
- }
-
- test("Executors that fail to start on the Kubernetes API call rebuild in the next batch.") {
- sparkConf
- .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
- .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
- val scheduler = newSchedulerBackend()
- scheduler.start()
- val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
- when(podOperations.create(firstResolvedPod))
- .thenThrow(new RuntimeException("test"))
- requestExecutorRunnable.getValue.run()
- allocatorRunnable.getValue.run()
- verify(podOperations, times(1)).create(firstResolvedPod)
- val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
- allocatorRunnable.getValue.run()
- verify(podOperations).create(recreatedResolvedPod)
- }
-
- test("Executors that are initially created but the watch notices them fail are rebuilt" +
- " in the next batch.") {
- sparkConf
- .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
- .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
- val scheduler = newSchedulerBackend()
- scheduler.start()
- val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
- when(podOperations.create(FIRST_EXECUTOR_POD)).thenAnswer(AdditionalAnswers.returnsFirstArg())
- requestExecutorRunnable.getValue.run()
- allocatorRunnable.getValue.run()
- verify(podOperations, times(1)).create(firstResolvedPod)
- executorPodsWatcherArgument.getValue.eventReceived(Action.ERROR, firstResolvedPod)
- val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
- allocatorRunnable.getValue.run()
- verify(podOperations).create(recreatedResolvedPod)
- }
-
- private def newSchedulerBackend(): KubernetesClusterSchedulerBackend = {
- new KubernetesClusterSchedulerBackend(
- taskSchedulerImpl,
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
+ taskScheduler,
rpcEnv,
- executorBuilder,
kubernetesClient,
- allocatorExecutor,
- requestExecutorsService) {
-
- override def applicationId(): String = APP_ID
+ requestExecutorsService,
+ eventQueue,
+ podAllocator,
+ lifecycleEventHandler,
+ watchEvents,
+ pollEvents) {
+ override def applicationId(): String = TEST_SPARK_APP_ID
}
}
- private def exitPod(basePod: Pod, exitCode: Int): Pod = {
- new PodBuilder(basePod)
- .editStatus()
- .addNewContainerStatus()
- .withNewState()
- .withNewTerminated()
- .withExitCode(exitCode)
- .endTerminated()
- .endState()
- .endContainerStatus()
- .endStatus()
- .build()
+ test("Start all components") {
+ schedulerBackendUnderTest.start()
+ verify(podAllocator).setTotalExpectedExecutors(3)
+ verify(podAllocator).start(TEST_SPARK_APP_ID)
+ verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
+ verify(watchEvents).start(TEST_SPARK_APP_ID)
+ verify(pollEvents).start(TEST_SPARK_APP_ID)
}
- private def expectPodCreationWithId(executorId: Int, expectedPod: Pod): Pod = {
- val resolvedPod = new PodBuilder(expectedPod)
- .editMetadata()
- .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
- .endMetadata()
- .build()
- val resolvedContainer = new ContainerBuilder().build()
- when(executorBuilder.buildFromFeatures(Matchers.argThat(
- new BaseMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
- override def matches(argument: scala.Any)
- : Boolean = {
- argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] &&
- argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
- .roleSpecificConf.executorId == executorId.toString
- }
-
- override def describeTo(description: Description): Unit = {}
- }))).thenReturn(SparkPod(resolvedPod, resolvedContainer))
- new PodBuilder(resolvedPod)
- .editSpec()
- .addToContainers(resolvedContainer)
- .endSpec()
- .build()
+ test("Stop all components") {
+ when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+ when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+ schedulerBackendUnderTest.stop()
+ verify(eventQueue).stop()
+ verify(watchEvents).stop()
+ verify(pollEvents).stop()
+ verify(labeledPods).delete()
+ verify(kubernetesClient).close()
}
+
+ test("Remove executor") {
+ schedulerBackendUnderTest.start()
+ schedulerBackendUnderTest.doRemoveExecutor(
+ "1", ExecutorKilled)
+ verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
+ }
+
+ test("Kill executors") {
+ schedulerBackendUnderTest.start()
+ when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+ when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+ when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods)
+ schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
+ verify(labeledPods, never()).delete()
+ requestExecutorsService.runNextPendingCommand()
+ verify(labeledPods).delete()
+ }
+
+ test("Request total executors") {
+ schedulerBackendUnderTest.start()
+ schedulerBackendUnderTest.doRequestTotalExecutors(5)
+ verify(podAllocator).setTotalExpectedExecutors(3)
+ verify(podAllocator, never()).setTotalExpectedExecutors(5)
+ requestExecutorsService.runNextPendingCommand()
+ verify(podAllocator).setTotalExpectedExecutors(5)
+ }
+
}