[SPARK-24248][K8S] Use level triggering and state reconciliation in scheduling and lifecycle

## What changes were proposed in this pull request?

Previously, the scheduler backend was maintaining state in many places, not only for reading state but also writing to it. For example, state had to be managed in both the watch and in the executor allocator runnable. Furthermore, one had to keep track of multiple hash tables.

We can do better here by:

1. Consolidating the places where we manage state. Here, we take inspiration from traditional Kubernetes controllers. These controllers tend to follow a level-triggered mechanism. This means that the controller will continuously monitor the API server via watches and polling, and on periodic passes, the controller will reconcile the current state of the cluster with the desired state. We implement this by introducing the concept of a pod snapshot, which is a given state of the executors in the Kubernetes cluster. We operate periodically on snapshots. To prevent overloading the API server with polling requests to get the state of the cluster (particularly for executor allocation where we want to be checking frequently to get executors to launch without unbearably bad latency), we use watches to populate snapshots by applying observed events to a previous snapshot to get a new snapshot. Whenever we do poll the cluster, the polled state replaces any existing snapshot - this ensures eventual consistency and mirroring of the cluster, as is desired in a level triggered architecture.

2. Storing less specialized in-memory state in general. Previously we were creating hash tables to represent the state of executors. Instead, it's easier to represent state solely by the snapshots.

## How was this patch tested?

Integration tests should test there's no regressions end to end. Unit tests to be updated, in particular focusing on different orderings of events, particularly accounting for when events come in unexpected ordering.

Author: mcheah <mcheah@palantir.com>

Closes #21366 from mccheah/event-queue-driven-scheduling.
This commit is contained in:
mcheah 2018-06-14 15:56:21 -07:00
parent 18cb0c0798
commit 270a9a3cac
27 changed files with 1870 additions and 784 deletions

View file

@ -237,6 +237,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
(BSD 3 Clause) jmock (org.jmock:jmock-junit4:2.8.4 - http://jmock.org/)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)

View file

@ -19,13 +19,12 @@ package org.apache.spark.util
import java.util.concurrent._
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration.Duration
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.spark.SparkException
private[spark] object ThreadUtils {
@ -103,6 +102,22 @@ private[spark] object ThreadUtils {
executor
}
/**
* Wrapper over ScheduledThreadPoolExecutor.
*/
def newDaemonThreadPoolScheduledExecutor(threadNamePrefix: String, numThreads: Int)
: ScheduledExecutorService = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(s"$threadNamePrefix-%d")
.build()
val executor = new ScheduledThreadPoolExecutor(numThreads, threadFactory)
// By default, a cancelled task is not automatically removed from the work queue until its delay
// elapses. We have to enable it manually.
executor.setRemoveOnCancelPolicy(true)
executor
}
/**
* Run a piece of code in a new thread and return the result. Exception in the new thread is
* thrown in the caller thread with an adjusted stack trace that removes references to this
@ -229,4 +244,14 @@ private[spark] object ThreadUtils {
}
}
// scalastyle:on awaitready
def shutdown(
executor: ExecutorService,
gracePeriod: Duration = FiniteDuration(30, TimeUnit.SECONDS)): Unit = {
executor.shutdown()
executor.awaitTermination(gracePeriod.toMillis, TimeUnit.MILLISECONDS)
if (!executor.isShutdown) {
executor.shutdownNow()
}
}
}

View file

@ -0,0 +1,28 @@
Copyright (c) 2000-2017, jMock.org
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer. Redistributions
in binary form must reproduce the above copyright notice, this list of
conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
Neither the name of jMock nor the names of its contributors may be
used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -760,6 +760,12 @@
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<scope>test</scope>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>

View file

@ -77,6 +77,12 @@
</dependency>
<!-- End of shaded deps. -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
@ -84,9 +90,9 @@
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.8.1</version>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

View file

@ -176,6 +176,24 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")
val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
.doc("Interval between polls against the Kubernetes API server to inspect the " +
"state of executors.")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(interval => interval > 0, s"API server polling interval must be a" +
" positive time value.")
.createWithDefaultString("30s")
val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.eventProcessingInterval")
.doc("Interval between successive inspection of executor events sent from the" +
" Kubernetes API.")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(interval => interval > 0, s"Event processing interval must be a positive" +
" time value.")
.createWithDefaultString("1s")
val MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
.doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
@ -193,7 +211,6 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

View file

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.Pod
sealed trait ExecutorPodState {
def pod: Pod
}
case class PodRunning(pod: Pod) extends ExecutorPodState
case class PodPending(pod: Pod) extends ExecutorPodState
sealed trait FinalPodState extends ExecutorPodState
case class PodSucceeded(pod: Pod) extends FinalPodState
case class PodFailed(pod: Pod) extends FinalPodState
case class PodDeleted(pod: Pod) extends FinalPodState
case class PodUnknown(pod: Pod) extends ExecutorPodState

View file

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.mutable
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.{Clock, Utils}
private[spark] class ExecutorPodsAllocator(
conf: SparkConf,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
clock: Clock) extends Logging {
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
private val totalExpectedExecutors = new AtomicInteger(0)
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000)
private val kubernetesDriverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(throw new SparkException("Must specify the driver pod name"))
private val driverPod = kubernetesClient.pods()
.withName(kubernetesDriverPodName)
.get()
// Executor IDs that have been requested from Kubernetes but have not been detected in any
// snapshot yet. Mapped to the timestamp when they were created.
private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
def start(applicationId: String): Unit = {
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, _)
}
}
def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total)
private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
// For all executors we've created against the API but have not seen in a snapshot
// yet - check the current time. If the current time has exceeded some threshold,
// assume that the pod was either never created (the API server never properly
// handled the creation request), or the API server created the pod but we missed
// both the creation and deletion events. In either case, delete the missing pod
// if possible, and mark such a pod to be rescheduled below.
newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
val currentTime = clock.getTimeMillis()
if (currentTime - timeCreated > podCreationTimeout) {
logWarning(s"Executor with id $execId was not detected in the Kubernetes" +
s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
" previous allocation attempt tried to create it. The executor may have been" +
" deleted but the application missed the deletion event.")
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
.delete()
}
newlyCreatedExecutors -= execId
} else {
logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
s" was created ${currentTime - timeCreated} milliseconds ago.")
}
}
if (snapshots.nonEmpty) {
// Only need to examine the cluster as of the latest snapshot, the "current" state, to see if
// we need to allocate more executors or not.
val latestSnapshot = snapshots.last
val currentRunningExecutors = latestSnapshot.executorPods.values.count {
case PodRunning(_) => true
case _ => false
}
val currentPendingExecutors = latestSnapshot.executorPods.values.count {
case PodPending(_) => true
case _ => false
}
val currentTotalExpectedExecutors = totalExpectedExecutors.get
logDebug(s"Currently have $currentRunningExecutors running executors and" +
s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" +
s" have been requested but are pending appearance in the cluster.")
if (newlyCreatedExecutors.isEmpty
&& currentPendingExecutors == 0
&& currentRunningExecutors < currentTotalExpectedExecutors) {
val numExecutorsToAllocate = math.min(
currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
conf,
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf)
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
kubernetesClient.pods().create(podWithAttachedContainer)
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
}
} else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
// TODO handle edge cases if we end up with more running executors than expected.
logDebug("Current number of running executors is equal to the number of requested" +
" executors. Not scaling up further.")
} else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
s" executors to begin running before requesting for more executors. # of executors in" +
s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" +
s" created but we have not observed as being present in the cluster yet:" +
s" ${newlyCreatedExecutors.size}.")
}
}
}
}

View file

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import com.google.common.cache.Cache
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.util.Utils
private[spark] class ExecutorPodsLifecycleManager(
conf: SparkConf,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
// Use a best-effort to track which executors have been removed already. It's not generally
// job-breaking if we remove executors more than once but it's ideal if we make an attempt
// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
// bounds.
removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) extends Logging {
import ExecutorPodsLifecycleManager._
private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
snapshotsStore.addSubscriber(eventProcessingInterval) {
onNewSnapshots(schedulerBackend, _)
}
}
private def onNewSnapshots(
schedulerBackend: KubernetesClusterSchedulerBackend,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
snapshots.foreach { snapshot =>
snapshot.executorPods.foreach { case (execId, state) =>
state match {
case deleted@PodDeleted(_) =>
logDebug(s"Snapshot reported deleted executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}")
removeExecutorFromSpark(schedulerBackend, deleted, execId)
execIdsRemovedInThisRound += execId
case failed@PodFailed(_) =>
logDebug(s"Snapshot reported failed executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}")
onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
case succeeded@PodSucceeded(_) =>
logDebug(s"Snapshot reported succeeded executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
s" unusual unless Spark specifically informed the executor to exit.")
onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
case _ =>
}
}
}
// Reconcile the case where Spark claims to know about an executor but the corresponding pod
// is missing from the cluster. This would occur if we miss a deletion event and the pod
// transitions immediately from running io absent. We only need to check against the latest
// snapshot for this, and we don't do this for executors in the deleted executors cache or
// that we just removed in this round.
if (snapshots.nonEmpty) {
val latestSnapshot = snapshots.last
(schedulerBackend.getExecutorIds().map(_.toLong).toSet
-- latestSnapshot.executorPods.keySet
-- execIdsRemovedInThisRound).foreach { missingExecutorId =>
if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
s" executor may have been deleted but the driver missed the deletion event."
logDebug(exitReasonMessage)
val exitReason = ExecutorExited(
UNKNOWN_EXIT_CODE,
exitCausedByApp = false,
exitReasonMessage)
schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason)
execIdsRemovedInThisRound += missingExecutorId
}
}
}
logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
s" from Spark that were either found to be deleted or non-existent in the cluster.")
}
private def onFinalNonDeletedState(
podState: FinalPodState,
execId: Long,
schedulerBackend: KubernetesClusterSchedulerBackend,
execIdsRemovedInRound: mutable.Set[Long]): Unit = {
removeExecutorFromK8s(podState.pod)
removeExecutorFromSpark(schedulerBackend, podState, execId)
execIdsRemovedInRound += execId
}
private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
// If deletion failed on a previous try, we can try again if resync informs us the pod
// is still around.
// Delete as best attempt - duplicate deletes will throw an exception but the end state
// of getting rid of the pod is what matters.
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withName(updatedPod.getMetadata.getName)
.delete()
}
}
private def removeExecutorFromSpark(
schedulerBackend: KubernetesClusterSchedulerBackend,
podState: FinalPodState,
execId: Long): Unit = {
if (removedExecutorsCache.getIfPresent(execId) == null) {
removedExecutorsCache.put(execId, execId)
val exitReason = findExitReason(podState, execId)
schedulerBackend.doRemoveExecutor(execId.toString, exitReason)
}
}
private def findExitReason(podState: FinalPodState, execId: Long): ExecutorExited = {
val exitCode = findExitCode(podState)
val (exitCausedByApp, exitMessage) = podState match {
case PodDeleted(_) =>
(false, s"The executor with id $execId was deleted by a user or the framework.")
case _ =>
val msg = exitReasonMessage(podState, execId, exitCode)
(true, msg)
}
ExecutorExited(exitCode, exitCausedByApp, exitMessage)
}
private def exitReasonMessage(podState: FinalPodState, execId: Long, exitCode: Int) = {
val pod = podState.pod
s"""
|The executor with id $execId exited with exit code $exitCode.
|The API gave the following brief reason: ${pod.getStatus.getReason}
|The API gave the following message: ${pod.getStatus.getMessage}
|The API gave the following container statuses:
|
|${pod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
""".stripMargin
}
private def findExitCode(podState: FinalPodState): Int = {
podState.pod.getStatus.getContainerStatuses.asScala.find { containerStatus =>
containerStatus.getState.getTerminated != null
}.map { terminatedContainer =>
terminatedContainer.getState.getTerminated.getExitCode.toInt
}.getOrElse(UNKNOWN_EXIT_CODE)
}
}
private object ExecutorPodsLifecycleManager {
val UNKNOWN_EXIT_CODE = -1
}

View file

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils
private[spark] class ExecutorPodsPollingSnapshotSource(
conf: SparkConf,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
pollingExecutor: ScheduledExecutorService) extends Logging {
private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private var pollingFuture: Future[_] = _
def start(applicationId: String): Unit = {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
}
def stop(): Unit = {
if (pollingFuture != null) {
pollingFuture.cancel(true)
pollingFuture = null
}
ThreadUtils.shutdown(pollingExecutor)
}
private class PollRunnable(applicationId: String) extends Runnable {
override def run(): Unit = {
logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
snapshotsStore.replaceSnapshot(kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.list()
.getItems
.asScala)
}
}
}

View file

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.Pod
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
/**
* An immutable view of the current executor pods that are running in the cluster.
*/
private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) {
import ExecutorPodsSnapshot._
def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
new ExecutorPodsSnapshot(newExecutorPods)
}
}
object ExecutorPodsSnapshot extends Logging {
def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
}
def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, ExecutorPodState])
private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, ExecutorPodState] = {
executorPods.map { pod =>
(pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, toState(pod))
}.toMap
}
private def toState(pod: Pod): ExecutorPodState = {
if (isDeleted(pod)) {
PodDeleted(pod)
} else {
val phase = pod.getStatus.getPhase.toLowerCase
phase match {
case "pending" =>
PodPending(pod)
case "running" =>
PodRunning(pod)
case "failed" =>
PodFailed(pod)
case "succeeded" =>
PodSucceeded(pod)
case _ =>
logWarning(s"Received unknown phase $phase for executor pod with name" +
s" ${pod.getMetadata.getName} in namespace ${pod.getMetadata.getNamespace}")
PodUnknown(pod)
}
}
}
private def isDeleted(pod: Pod): Boolean = pod.getMetadata.getDeletionTimestamp != null
}

View file

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.Pod
private[spark] trait ExecutorPodsSnapshotsStore {
def addSubscriber
(processBatchIntervalMillis: Long)
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
def stop(): Unit
def updatePod(updatedPod: Pod): Unit
def replaceSnapshot(newSnapshot: Seq[Pod]): Unit
}

View file

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent._
import io.fabric8.kubernetes.api.model.Pod
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.util.{ThreadUtils, Utils}
/**
* Controls the propagation of the Spark application's executor pods state to subscribers that
* react to that state.
* <br>
* Roughly follows a producer-consumer model. Producers report states of executor pods, and these
* states are then published to consumers that can perform any actions in response to these states.
* <br>
* Producers push updates in one of two ways. An incremental update sent by updatePod() represents
* a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that
* the passed pods are all of the most up to date states of all executor pods for the application.
* The combination of the states of all executor pods for the application is collectively known as
* a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that
* most recent snapshot - either by incrementally updating the snapshot with a single new pod state,
* or by replacing the snapshot entirely on a full sync.
* <br>
* Consumers, or subscribers, register that they want to be informed about all snapshots of the
* executor pods. Every time the store replaces its most up to date snapshot from either an
* incremental update or a full sync, the most recent snapshot after the update is posted to the
* subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in
* time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different
* time intervals.
*/
private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService)
extends ExecutorPodsSnapshotsStore {
private val SNAPSHOT_LOCK = new Object()
private val subscribers = mutable.Buffer.empty[SnapshotsSubscriber]
private val pollingTasks = mutable.Buffer.empty[Future[_]]
@GuardedBy("SNAPSHOT_LOCK")
private var currentSnapshot = ExecutorPodsSnapshot()
override def addSubscriber(
processBatchIntervalMillis: Long)
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
val newSubscriber = SnapshotsSubscriber(
new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots)
SNAPSHOT_LOCK.synchronized {
newSubscriber.snapshotsBuffer.add(currentSnapshot)
}
subscribers += newSubscriber
pollingTasks += subscribersExecutor.scheduleWithFixedDelay(
toRunnable(() => callSubscriber(newSubscriber)),
0L,
processBatchIntervalMillis,
TimeUnit.MILLISECONDS)
}
override def stop(): Unit = {
pollingTasks.foreach(_.cancel(true))
ThreadUtils.shutdown(subscribersExecutor)
}
override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized {
currentSnapshot = currentSnapshot.withUpdate(updatedPod)
addCurrentSnapshotToSubscribers()
}
override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = SNAPSHOT_LOCK.synchronized {
currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
addCurrentSnapshotToSubscribers()
}
private def addCurrentSnapshotToSubscribers(): Unit = {
subscribers.foreach { subscriber =>
subscriber.snapshotsBuffer.add(currentSnapshot)
}
}
private def callSubscriber(subscriber: SnapshotsSubscriber): Unit = {
Utils.tryLogNonFatalError {
val currentSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot].asJava
subscriber.snapshotsBuffer.drainTo(currentSnapshots)
subscriber.onNewSnapshots(currentSnapshots.asScala)
}
}
private def toRunnable[T](runnable: () => Unit): Runnable = new Runnable {
override def run(): Unit = runnable()
}
private case class SnapshotsSubscriber(
snapshotsBuffer: BlockingQueue[ExecutorPodsSnapshot],
onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
}

View file

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import java.io.Closeable
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
private[spark] class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient) extends Logging {
private var watchConnection: Closeable = _
def start(applicationId: String): Unit = {
require(watchConnection == null, "Cannot start the watcher twice.")
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
}
def stop(): Unit = {
if (watchConnection != null) {
Utils.tryLogNonFatalError {
watchConnection.close()
}
watchConnection = null
}
}
private class ExecutorPodsWatcher extends Watcher[Pod] {
override def eventReceived(action: Action, pod: Pod): Unit = {
val podName = pod.getMetadata.getName
logDebug(s"Received executor pod update for pod named $podName, action $action")
snapshotsStore.updatePod(pod)
}
override def onClose(e: KubernetesClientException): Unit = {
logWarning("Kubernetes client has been closed (this is expected if the application is" +
" shutting down.)", e)
}
}
}

View file

@ -17,7 +17,9 @@
package org.apache.spark.scheduler.cluster.k8s
import java.io.File
import java.util.concurrent.TimeUnit
import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.client.Config
import org.apache.spark.{SparkContext, SparkException}
@ -26,7 +28,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{SystemClock, ThreadUtils}
private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
@ -56,17 +58,45 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-executor-requests")
val subscribersExecutor = ThreadUtils
.newDaemonThreadPoolScheduledExecutor(
"kubernetes-executor-snapshots-subscribers", 2)
val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(subscribersExecutor)
val removedExecutorsCache = CacheBuilder.newBuilder()
.expireAfterWrite(3, TimeUnit.MINUTES)
.build[java.lang.Long, java.lang.Long]()
val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
sc.conf,
new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
removedExecutorsCache)
val executorPodsAllocator = new ExecutorPodsAllocator(
sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock())
val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
kubernetesClient)
val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-pod-polling-sync")
val podsPollingEventSource = new ExecutorPodsPollingSnapshotSource(
sc.conf, kubernetesClient, snapshotsStore, eventsPollingExecutor)
new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
sc.env.rpcEnv,
new KubernetesExecutorBuilder,
kubernetesClient,
allocatorExecutor,
requestExecutorsService)
requestExecutorsService,
snapshotsStore,
executorPodsAllocator,
executorPodsLifecycleEventHandler,
podsWatchEventSource,
podsPollingEventSource)
}
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {

View file

@ -16,60 +16,32 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.io.Closeable
import java.net.InetAddress
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
import javax.annotation.concurrent.GuardedBy
import java.util.concurrent.ExecutorService
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._
import scala.collection.mutable
import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
allocatorExecutor: ScheduledExecutorService,
requestExecutorsService: ExecutorService)
requestExecutorsService: ExecutorService,
snapshotsStore: ExecutorPodsSnapshotsStore,
podAllocator: ExecutorPodsAllocator,
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
import KubernetesClusterSchedulerBackend._
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
@GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]()
private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]()
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
private val kubernetesDriverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(throw new SparkException("Must specify the driver pod name"))
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService)
private val driverPod = kubernetesClient.pods()
.inNamespace(kubernetesNamespace)
.withName(kubernetesDriverPodName)
.get()
protected override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
@ -77,372 +49,93 @@ private[spark] class KubernetesClusterSchedulerBackend(
super.minRegisteredRatio
}
private val executorWatchResource = new AtomicReference[Closeable]
private val totalExpectedExecutors = new AtomicInteger(0)
private val driverUrl = RpcEndpointAddress(
conf.get("spark.driver.host"),
conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
removeExecutor(executorId, reason)
}
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
override def start(): Unit = {
super.start()
if (!Utils.isDynamicAllocationEnabled(conf)) {
podAllocator.setTotalExpectedExecutors(initialExecutors)
}
lifecycleEventHandler.start(this)
podAllocator.start(applicationId())
watchEvents.start(applicationId())
pollEvents.start(applicationId())
}
private val executorLostReasonCheckMaxAttempts = conf.get(
KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
override def stop(): Unit = {
super.stop()
private val allocatorRunnable = new Runnable {
// Maintains a map of executor id to count of checks performed to learn the loss reason
// for an executor.
private val executorReasonCheckAttemptCounts = new mutable.HashMap[String, Int]
override def run(): Unit = {
handleDisconnectedExecutors()
val executorsToAllocate = mutable.Map[String, Pod]()
val currentTotalRegisteredExecutors = totalRegisteredExecutors.get
val currentTotalExpectedExecutors = totalExpectedExecutors.get
val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts()
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
logDebug("Waiting for pending executors before scaling")
} else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
for (_ <- 0 until math.min(
currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
val executorConf = KubernetesConf.createExecutorConf(
conf,
executorId,
applicationId(),
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf)
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
executorsToAllocate(executorId) = podWithAttachedContainer
logInfo(
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
}
}
}
val allocatedExecutors = executorsToAllocate.mapValues { pod =>
Utils.tryLog {
kubernetesClient.pods().create(pod)
}
}
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
allocatedExecutors.map {
case (executorId, attemptedAllocatedExecutor) =>
attemptedAllocatedExecutor.map { successfullyAllocatedExecutor =>
runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor)
}
}
}
Utils.tryLogNonFatalError {
snapshotsStore.stop()
}
def handleDisconnectedExecutors(): Unit = {
// For each disconnected executor, synchronize with the loss reasons that may have been found
// by the executor pod watcher. If the loss reason was discovered by the watcher,
// inform the parent class with removeExecutor.
disconnectedPodsByExecutorIdPendingRemoval.asScala.foreach {
case (executorId, executorPod) =>
val knownExitReason = Option(podsWithKnownExitReasons.remove(
executorPod.getMetadata.getName))
knownExitReason.fold {
removeExecutorOrIncrementLossReasonCheckCount(executorId)
} { executorExited =>
logWarning(s"Removing executor $executorId with loss reason " + executorExited.message)
removeExecutor(executorId, executorExited)
// We don't delete the pod running the executor that has an exit condition caused by
// the application from the Kubernetes API server. This allows users to debug later on
// through commands such as "kubectl logs <pod name>" and
// "kubectl describe pod <pod name>". Note that exited containers have terminated and
// therefore won't take CPU and memory resources.
// Otherwise, the executor pod is marked to be deleted from the API server.
if (executorExited.exitCausedByApp) {
logInfo(s"Executor $executorId exited because of the application.")
deleteExecutorFromDataStructures(executorId)
} else {
logInfo(s"Executor $executorId failed because of a framework error.")
deleteExecutorFromClusterAndDataStructures(executorId)
}
}
}
Utils.tryLogNonFatalError {
watchEvents.stop()
}
def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0)
if (reasonCheckCount >= executorLostReasonCheckMaxAttempts) {
removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons."))
deleteExecutorFromClusterAndDataStructures(executorId)
} else {
executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1)
}
Utils.tryLogNonFatalError {
pollEvents.stop()
}
def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
deleteExecutorFromDataStructures(executorId).foreach { pod =>
kubernetesClient.pods().delete(pod)
}
Utils.tryLogNonFatalError {
kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
}
def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {
disconnectedPodsByExecutorIdPendingRemoval.remove(executorId)
executorReasonCheckAttemptCounts -= executorId
podsWithKnownExitReasons.remove(executorId)
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.remove(executorId).orElse {
logWarning(s"Unable to remove pod for unknown executor $executorId")
None
}
}
Utils.tryLogNonFatalError {
ThreadUtils.shutdown(requestExecutorsService)
}
Utils.tryLogNonFatalError {
kubernetesClient.close()
}
}
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
// TODO when we support dynamic allocation, the pod allocator should be told to process the
// current snapshot in order to decrease/increase the number of executors accordingly.
podAllocator.setTotalExpectedExecutors(requestedTotal)
true
}
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
}
override def start(): Unit = {
super.start()
executorWatchResource.set(
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.watch(new ExecutorPodsWatcher()))
allocatorExecutor.scheduleWithFixedDelay(
allocatorRunnable, 0L, podAllocationInterval, TimeUnit.MILLISECONDS)
if (!Utils.isDynamicAllocationEnabled(conf)) {
doRequestTotalExecutors(initialExecutors)
}
}
override def stop(): Unit = {
// stop allocation of new resources and caches.
allocatorExecutor.shutdown()
allocatorExecutor.awaitTermination(30, TimeUnit.SECONDS)
// send stop message to executors so they shut down cleanly
super.stop()
try {
val resource = executorWatchResource.getAndSet(null)
if (resource != null) {
resource.close()
}
} catch {
case e: Throwable => logWarning("Failed to close the executor pod watcher", e)
}
// then delete the executor pods
Utils.tryLogNonFatalError {
deleteExecutorPodsOnStop()
executorPodsByIPs.clear()
}
Utils.tryLogNonFatalError {
logInfo("Closing kubernetes client")
kubernetesClient.close()
}
}
/**
* @return A map of K8s cluster nodes to the number of tasks that could benefit from data
* locality if an executor launches on the cluster node.
*/
private def getNodesWithLocalTaskCounts() : Map[String, Int] = {
val nodeToLocalTaskCount = synchronized {
mutable.Map[String, Int]() ++ hostToLocalTaskCount
}
for (pod <- executorPodsByIPs.values().asScala) {
// Remove cluster nodes that are running our executors already.
// TODO: This prefers spreading out executors across nodes. In case users want
// consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
// flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html
nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty ||
nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty ||
nodeToLocalTaskCount.remove(
InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty
}
nodeToLocalTaskCount.toMap[String, Int]
}
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
totalExpectedExecutors.set(requestedTotal)
true
override def getExecutorIds(): Seq[String] = synchronized {
super.getExecutorIds()
}
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
val podsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
executorIds.flatMap { executorId =>
runningExecutorsToPods.remove(executorId) match {
case Some(pod) =>
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
Some(pod)
case None =>
logWarning(s"Unable to remove pod for unknown executor $executorId")
None
}
}
}
kubernetesClient.pods().delete(podsToDelete: _*)
true
}
private def deleteExecutorPodsOnStop(): Unit = {
val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*)
runningExecutorsToPods.clear()
runningExecutorPodsCopy
}
kubernetesClient.pods().delete(executorPodsToDelete: _*)
}
private class ExecutorPodsWatcher extends Watcher[Pod] {
private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1
override def eventReceived(action: Action, pod: Pod): Unit = {
val podName = pod.getMetadata.getName
val podIP = pod.getStatus.getPodIP
action match {
case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
&& pod.getMetadata.getDeletionTimestamp == null) =>
val clusterNodeName = pod.getSpec.getNodeName
logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
executorPodsByIPs.put(podIP, pod)
case Action.DELETED | Action.ERROR =>
val executorId = getExecutorId(pod)
logDebug(s"Executor pod $podName at IP $podIP was at $action.")
if (podIP != null) {
executorPodsByIPs.remove(podIP)
}
val executorExitReason = if (action == Action.ERROR) {
logWarning(s"Received error event of executor pod $podName. Reason: " +
pod.getStatus.getReason)
executorExitReasonOnError(pod)
} else if (action == Action.DELETED) {
logWarning(s"Received delete event of executor pod $podName. Reason: " +
pod.getStatus.getReason)
executorExitReasonOnDelete(pod)
} else {
throw new IllegalStateException(
s"Unknown action that should only be DELETED or ERROR: $action")
}
podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
s"watch received an event of type $action for this executor. The executor may " +
"have failed to start in the first place and never registered with the driver.")
}
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
case _ => logDebug(s"Received event of executor pod $podName: " + action)
}
}
override def onClose(cause: KubernetesClientException): Unit = {
logDebug("Executor pod watch closed.", cause)
}
private def getExecutorExitStatus(pod: Pod): Int = {
val containerStatuses = pod.getStatus.getContainerStatuses
if (!containerStatuses.isEmpty) {
// we assume the first container represents the pod status. This assumption may not hold
// true in the future. Revisit this if side-car containers start running inside executor
// pods.
getExecutorExitStatus(containerStatuses.get(0))
} else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
}
private def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
Option(containerStatus.getState).map { containerState =>
Option(containerState.getTerminated).map { containerStateTerminated =>
containerStateTerminated.getExitCode.intValue()
}.getOrElse(UNKNOWN_EXIT_CODE)
}.getOrElse(UNKNOWN_EXIT_CODE)
}
private def isPodAlreadyReleased(pod: Pod): Boolean = {
val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
!runningExecutorsToPods.contains(executorId)
}
}
private def executorExitReasonOnError(pod: Pod): ExecutorExited = {
val containerExitStatus = getExecutorExitStatus(pod)
// container was probably actively killed by the driver.
if (isPodAlreadyReleased(pod)) {
ExecutorExited(containerExitStatus, exitCausedByApp = false,
s"Container in pod ${pod.getMetadata.getName} exited from explicit termination " +
"request.")
} else {
val containerExitReason = s"Pod ${pod.getMetadata.getName}'s executor container " +
s"exited with exit status code $containerExitStatus."
ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
}
}
private def executorExitReasonOnDelete(pod: Pod): ExecutorExited = {
val exitMessage = if (isPodAlreadyReleased(pod)) {
s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."
} else {
s"Pod ${pod.getMetadata.getName} deleted or lost."
}
ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
}
private def getExecutorId(pod: Pod): String = {
val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
require(executorId != null, "Unexpected pod metadata; expected all executor pods " +
s"to have label $SPARK_EXECUTOR_ID_LABEL.")
executorId
}
kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
.delete()
// Don't do anything else - let event handling from the Kubernetes API do the Spark changes
}
override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
}
private class KubernetesDriverEndpoint(
rpcEnv: RpcEnv,
sparkProperties: Seq[(String, String)])
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
if (disableExecutor(executorId)) {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.get(executorId).foreach { pod =>
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
}
}
}
}
// Don't do anything besides disabling the executor - allow the Kubernetes API events to
// drive the rest of the lifecycle decisions
// TODO what if we disconnect from a networking issue? Probably want to mark the executor
// to be deleted eventually.
addressToExecutorId.get(rpcAddress).foreach(disableExecutor)
}
}
}
private object KubernetesClusterSchedulerBackend {
private val UNKNOWN_EXIT_CODE = -1
}

View file

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.{DoneablePod, HasMetadata, Pod, PodList}
import io.fabric8.kubernetes.client.{Watch, Watcher}
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
object Fabric8Aliases {
type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
type LABELED_PODS = FilterWatchListDeletable[
Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
type SINGLE_POD = PodResource[Pod, DoneablePod]
type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata, Boolean]
}

View file

@ -27,6 +27,7 @@ import org.scalatest.mockito.MockitoSugar._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
class ClientSuite extends SparkFunSuite with BeforeAndAfter {
@ -103,15 +104,11 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
.build()
}
private type ResourceList = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata, Boolean]
private type Pods = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
private var podOperations: Pods = _
private var podOperations: PODS = _
@Mock
private var namedPods: PodResource[Pod, DoneablePod] = _
@ -123,7 +120,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private var driverBuilder: KubernetesDriverBuilder = _
@Mock
private var resourceList: ResourceList = _
private var resourceList: RESOURCE_LIST = _
private var kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf] = _

View file

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.Pod
import scala.collection.mutable
class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore {
private val snapshotsBuffer = mutable.Buffer.empty[ExecutorPodsSnapshot]
private val subscribers = mutable.Buffer.empty[Seq[ExecutorPodsSnapshot] => Unit]
private var currentSnapshot = ExecutorPodsSnapshot()
override def addSubscriber
(processBatchIntervalMillis: Long)
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
subscribers += onNewSnapshots
}
override def stop(): Unit = {}
def notifySubscribers(): Unit = {
subscribers.foreach(_(snapshotsBuffer))
snapshotsBuffer.clear()
}
override def updatePod(updatedPod: Pod): Unit = {
currentSnapshot = currentSnapshot.withUpdate(updatedPod)
snapshotsBuffer += currentSnapshot
}
override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = {
currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
snapshotsBuffer += currentSnapshot
}
}

View file

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkPod
object ExecutorLifecycleTestUtils {
val TEST_SPARK_APP_ID = "spark-app-id"
def failedExecutorWithoutDeletion(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewStatus()
.withPhase("failed")
.addNewContainerStatus()
.withName("spark-executor")
.withImage("k8s-spark")
.withNewState()
.withNewTerminated()
.withMessage("Failed")
.withExitCode(1)
.endTerminated()
.endState()
.endContainerStatus()
.addNewContainerStatus()
.withName("spark-executor-sidecar")
.withImage("k8s-spark-sidecar")
.withNewState()
.withNewTerminated()
.withMessage("Failed")
.withExitCode(1)
.endTerminated()
.endState()
.endContainerStatus()
.withMessage("Executor failed.")
.withReason("Executor failed because of a thrown error.")
.endStatus()
.build()
}
def pendingExecutor(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewStatus()
.withPhase("pending")
.endStatus()
.build()
}
def runningExecutor(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewStatus()
.withPhase("running")
.endStatus()
.build()
}
def succeededExecutor(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewStatus()
.withPhase("succeeded")
.endStatus()
.build()
}
def deletedExecutor(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewMetadata()
.withNewDeletionTimestamp("523012521")
.endMetadata()
.build()
}
def unknownExecutor(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewStatus()
.withPhase("unknown")
.endStatus()
.build()
}
def podWithAttachedContainerForId(executorId: Long): Pod = {
val sparkPod = executorPodWithId(executorId)
val podWithAttachedContainer = new PodBuilder(sparkPod.pod)
.editOrNewSpec()
.addToContainers(sparkPod.container)
.endSpec()
.build()
podWithAttachedContainer
}
def executorPodWithId(executorId: Long): SparkPod = {
val pod = new PodBuilder()
.withNewMetadata()
.withName(s"spark-executor-$executorId")
.addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
.addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
.endMetadata()
.build()
val container = new ContainerBuilder()
.withName("spark-executor")
.withImage("k8s-spark")
.build()
SparkPod(pod, container)
}
}

View file

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations}
import org.mockito.Matchers.any
import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
import org.apache.spark.util.ManualClock
class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
private val driverPodName = "driver"
private val driverPod = new PodBuilder()
.withNewMetadata()
.withName(driverPodName)
.addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
.addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
.withUid("driver-pod-uid")
.endMetadata()
.build()
private val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L)
private var waitForExecutorPodsClock: ManualClock = _
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
private var podOperations: PODS = _
@Mock
private var labeledPods: LABELED_PODS = _
@Mock
private var driverPodOperations: PodResource[Pod, DoneablePod] = _
@Mock
private var executorBuilder: KubernetesExecutorBuilder = _
private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
before {
MockitoAnnotations.initMocks(this)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(kubernetesConfWithCorrectFields()))
.thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
waitForExecutorPodsClock = new ManualClock(0L)
podsAllocatorUnderTest = new ExecutorPodsAllocator(
conf, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID)
}
test("Initially request executors in batches. Do not request another batch if the" +
" first has not finished.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
for (nextId <- 1 to podAllocationSize) {
verify(podOperations).create(podWithAttachedContainerForId(nextId))
}
verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
}
test("Request executors in batches. Allow another batch to be requested if" +
" all pending executors start running.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
for (execId <- 1 until podAllocationSize) {
snapshotsStore.updatePod(runningExecutor(execId))
}
snapshotsStore.notifySubscribers()
verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
snapshotsStore.updatePod(runningExecutor(podAllocationSize))
snapshotsStore.notifySubscribers()
verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
snapshotsStore.updatePod(runningExecutor(podAllocationSize))
snapshotsStore.notifySubscribers()
verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod]))
}
test("When a current batch reaches error states immediately, re-request" +
" them on the next batch.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
for (execId <- 1 until podAllocationSize) {
snapshotsStore.updatePod(runningExecutor(execId))
}
val failedPod = failedExecutorWithoutDeletion(podAllocationSize)
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
}
test("When an executor is requested but the API does not report it in a reasonable time, retry" +
" requesting that executor.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(1)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
when(podOperations.withLabel(SPARK_EXECUTOR_ID_LABEL, "1")).thenReturn(labeledPods)
snapshotsStore.notifySubscribers()
verify(labeledPods).delete()
verify(podOperations).create(podWithAttachedContainerForId(2))
}
private def executorPodAnswer(): Answer[SparkPod] = {
new Answer[SparkPod] {
override def answer(invocation: InvocationOnMock): SparkPod = {
val k8sConf = invocation.getArgumentAt(
0, classOf[KubernetesConf[KubernetesExecutorSpecificConf]])
executorPodWithId(k8sConf.roleSpecificConf.executorId.toInt)
}
}
}
private def kubernetesConfWithCorrectFields(): KubernetesConf[KubernetesExecutorSpecificConf] =
Matchers.argThat(new ArgumentMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
override def matches(argument: scala.Any): Boolean = {
if (!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) {
false
} else {
val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
val executorSpecificConf = k8sConf.roleSpecificConf
val expectedK8sConf = KubernetesConf.createExecutorConf(
conf,
executorSpecificConf.executorId,
TEST_SPARK_APP_ID,
driverPod)
k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
// Since KubernetesConf.createExecutorConf clones the SparkConf object, force
// deep equality comparison for the SparkConf object and use object equality
// comparison on all other fields.
k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf)
}
}
})
}

View file

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfter {
private var namedExecutorPods: mutable.Map[String, PodResource[Pod, DoneablePod]] = _
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
private var podOperations: PODS = _
@Mock
private var executorBuilder: KubernetesExecutorBuilder = _
@Mock
private var schedulerBackend: KubernetesClusterSchedulerBackend = _
private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
private var eventHandlerUnderTest: ExecutorPodsLifecycleManager = _
before {
MockitoAnnotations.initMocks(this)
val removedExecutorsCache = CacheBuilder.newBuilder().build[java.lang.Long, java.lang.Long]
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
namedExecutorPods = mutable.Map.empty[String, PodResource[Pod, DoneablePod]]
when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty[String])
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
new SparkConf(),
executorBuilder,
kubernetesClient,
snapshotsStore,
removedExecutorsCache)
eventHandlerUnderTest.start(schedulerBackend)
}
test("When an executor reaches error states immediately, remove from the scheduler backend.") {
val failedPod = failedExecutorWithoutDeletion(1)
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
verify(namedExecutorPods(failedPod.getMetadata.getName)).delete()
}
test("Don't remove executors twice from Spark but remove from K8s repeatedly.") {
val failedPod = failedExecutorWithoutDeletion(1)
snapshotsStore.updatePod(failedPod)
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend, times(1)).doRemoveExecutor("1", expectedLossReason)
verify(namedExecutorPods(failedPod.getMetadata.getName), times(2)).delete()
}
test("When the scheduler backend lists executor ids that aren't present in the cluster," +
" remove those executors from Spark.") {
when(schedulerBackend.getExecutorIds()).thenReturn(Seq("1"))
val msg = s"The executor with ID 1 was not found in the cluster but we didn't" +
s" get a reason why. Marking the executor as failed. The executor may have been" +
s" deleted but the driver missed the deletion event."
val expectedLossReason = ExecutorExited(-1, exitCausedByApp = false, msg)
snapshotsStore.replaceSnapshot(Seq.empty[Pod])
snapshotsStore.notifySubscribers()
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
}
private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = {
s"""
|The executor with id $failedExecutorId exited with exit code 1.
|The API gave the following brief reason: ${failedPod.getStatus.getReason}
|The API gave the following message: ${failedPod.getStatus.getMessage}
|The API gave the following container statuses:
|
|${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
""".stripMargin
}
private def namedPodsAnswer(): Answer[PodResource[Pod, DoneablePod]] = {
new Answer[PodResource[Pod, DoneablePod]] {
override def answer(invocation: InvocationOnMock): PodResource[Pod, DoneablePod] = {
val podName = invocation.getArgumentAt(0, classOf[String])
namedExecutorPods.getOrElseUpdate(
podName, mock(classOf[PodResource[Pod, DoneablePod]]))
}
}
}
}

View file

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.TimeUnit
import io.fabric8.kubernetes.api.model.PodListBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {
private val sparkConf = new SparkConf
private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
private var podOperations: PODS = _
@Mock
private var appIdLabeledPods: LABELED_PODS = _
@Mock
private var executorRoleLabeledPods: LABELED_PODS = _
@Mock
private var eventQueue: ExecutorPodsSnapshotsStore = _
private var pollingExecutor: DeterministicScheduler = _
private var pollingSourceUnderTest: ExecutorPodsPollingSnapshotSource = _
before {
MockitoAnnotations.initMocks(this)
pollingExecutor = new DeterministicScheduler()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(appIdLabeledPods)
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
}
test("Items returned by the API should be pushed to the event queue") {
when(executorRoleLabeledPods.list())
.thenReturn(new PodListBuilder()
.addToItems(
runningExecutor(1),
runningExecutor(2))
.build())
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), runningExecutor(2)))
}
}

View file

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import org.apache.spark.SparkFunSuite
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
class ExecutorPodsSnapshotSuite extends SparkFunSuite {
test("States are interpreted correctly from pod metadata.") {
val pods = Seq(
pendingExecutor(0),
runningExecutor(1),
succeededExecutor(2),
failedExecutorWithoutDeletion(3),
deletedExecutor(4),
unknownExecutor(5))
val snapshot = ExecutorPodsSnapshot(pods)
assert(snapshot.executorPods ===
Map(
0L -> PodPending(pods(0)),
1L -> PodRunning(pods(1)),
2L -> PodSucceeded(pods(2)),
3L -> PodFailed(pods(3)),
4L -> PodDeleted(pods(4)),
5L -> PodUnknown(pods(5))))
}
test("Updates add new pods for non-matching ids and edit existing pods for matching ids") {
val originalPods = Seq(
pendingExecutor(0),
runningExecutor(1))
val originalSnapshot = ExecutorPodsSnapshot(originalPods)
val snapshotWithUpdatedPod = originalSnapshot.withUpdate(succeededExecutor(1))
assert(snapshotWithUpdatedPod.executorPods ===
Map(
0L -> PodPending(originalPods(0)),
1L -> PodSucceeded(succeededExecutor(1))))
val snapshotWithNewPod = snapshotWithUpdatedPod.withUpdate(pendingExecutor(2))
assert(snapshotWithNewPod.executorPods ===
Map(
0L -> PodPending(originalPods(0)),
1L -> PodSucceeded(succeededExecutor(1)),
2L -> PodPending(pendingExecutor(2))))
}
}

View file

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
import org.jmock.lib.concurrent.DeterministicScheduler
import org.scalatest.BeforeAndAfter
import scala.collection.mutable
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Constants._
class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with BeforeAndAfter {
private var eventBufferScheduler: DeterministicScheduler = _
private var eventQueueUnderTest: ExecutorPodsSnapshotsStoreImpl = _
before {
eventBufferScheduler = new DeterministicScheduler()
eventQueueUnderTest = new ExecutorPodsSnapshotsStoreImpl(eventBufferScheduler)
}
test("Subscribers get notified of events periodically.") {
val receivedSnapshots1 = mutable.Buffer.empty[ExecutorPodsSnapshot]
val receivedSnapshots2 = mutable.Buffer.empty[ExecutorPodsSnapshot]
eventQueueUnderTest.addSubscriber(1000) {
receivedSnapshots1 ++= _
}
eventQueueUnderTest.addSubscriber(2000) {
receivedSnapshots2 ++= _
}
eventBufferScheduler.runUntilIdle()
assert(receivedSnapshots1 === Seq(ExecutorPodsSnapshot()))
assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
pushPodWithIndex(1)
// Force time to move forward so that the buffer is emitted, scheduling the
// processing task on the subscription executor...
eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
// ... then actually execute the subscribers.
assert(receivedSnapshots1 === Seq(
ExecutorPodsSnapshot(),
ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
// Don't repeat snapshots
assert(receivedSnapshots1 === Seq(
ExecutorPodsSnapshot(),
ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
assert(receivedSnapshots2 === Seq(
ExecutorPodsSnapshot(),
ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
pushPodWithIndex(2)
pushPodWithIndex(3)
eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
assert(receivedSnapshots1 === Seq(
ExecutorPodsSnapshot(),
ExecutorPodsSnapshot(Seq(podWithIndex(1))),
ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)))))
assert(receivedSnapshots2 === Seq(
ExecutorPodsSnapshot(),
ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
assert(receivedSnapshots1 === Seq(
ExecutorPodsSnapshot(),
ExecutorPodsSnapshot(Seq(podWithIndex(1))),
ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)))))
assert(receivedSnapshots1 === receivedSnapshots2)
}
test("Even without sending events, initially receive an empty buffer.") {
val receivedInitialSnapshot = new AtomicReference[Seq[ExecutorPodsSnapshot]](null)
eventQueueUnderTest.addSubscriber(1000) {
receivedInitialSnapshot.set
}
assert(receivedInitialSnapshot.get == null)
eventBufferScheduler.runUntilIdle()
assert(receivedInitialSnapshot.get === Seq(ExecutorPodsSnapshot()))
}
test("Replacing the snapshot passes the new snapshot to subscribers.") {
val receivedSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot]
eventQueueUnderTest.addSubscriber(1000) {
receivedSnapshots ++= _
}
eventQueueUnderTest.updatePod(podWithIndex(1))
eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
assert(receivedSnapshots === Seq(
ExecutorPodsSnapshot(),
ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
eventQueueUnderTest.replaceSnapshot(Seq(podWithIndex(2)))
eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
assert(receivedSnapshots === Seq(
ExecutorPodsSnapshot(),
ExecutorPodsSnapshot(Seq(podWithIndex(1))),
ExecutorPodsSnapshot(Seq(podWithIndex(2)))))
}
private def pushPodWithIndex(index: Int): Unit =
eventQueueUnderTest.updatePod(podWithIndex(index))
private def podWithIndex(index: Int): Pod =
new PodBuilder()
.editOrNewMetadata()
.withName(s"pod-$index")
.addToLabels(SPARK_EXECUTOR_ID_LABEL, index.toString)
.endMetadata()
.editOrNewStatus()
.withPhase("running")
.endStatus()
.build()
}

View file

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {
@Mock
private var eventQueue: ExecutorPodsSnapshotsStore = _
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
private var podOperations: PODS = _
@Mock
private var appIdLabeledPods: LABELED_PODS = _
@Mock
private var executorRoleLabeledPods: LABELED_PODS = _
@Mock
private var watchConnection: Watch = _
private var watch: ArgumentCaptor[Watcher[Pod]] = _
private var watchSourceUnderTest: ExecutorPodsWatchSnapshotSource = _
before {
MockitoAnnotations.initMocks(this)
watch = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(appIdLabeledPods)
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
}
test("Watch events should be pushed to the snapshots store as snapshot updates.") {
watch.getValue.eventReceived(Action.ADDED, runningExecutor(1))
watch.getValue.eventReceived(Action.MODIFIED, runningExecutor(2))
verify(eventQueue).updatePod(runningExecutor(1))
verify(eventQueue).updatePod(runningExecutor(2))
}
}

View file

@ -16,100 +16,30 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, Pod, PodBuilder, PodList}
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
import org.hamcrest.{BaseMatcher, Description, Matcher}
import org.mockito.{AdditionalAnswers, ArgumentCaptor, Matchers, Mock, MockitoAnnotations}
import org.mockito.Matchers.{any, eq => mockitoEq}
import org.mockito.Mockito.{doNothing, never, times, verify, when}
import io.fabric8.kubernetes.client.KubernetesClient
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Matchers.{eq => mockitoEq}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.mockito.MockitoSugar._
import scala.collection.JavaConverters._
import scala.concurrent.Future
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorKilled, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.ThreadUtils
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter {
private val APP_ID = "test-spark-app"
private val DRIVER_POD_NAME = "spark-driver-pod"
private val NAMESPACE = "test-namespace"
private val SPARK_DRIVER_HOST = "localhost"
private val SPARK_DRIVER_PORT = 7077
private val POD_ALLOCATION_INTERVAL = "1m"
private val FIRST_EXECUTOR_POD = new PodBuilder()
.withNewMetadata()
.withName("pod1")
.endMetadata()
.withNewSpec()
.withNodeName("node1")
.endSpec()
.withNewStatus()
.withHostIP("192.168.99.100")
.endStatus()
.build()
private val SECOND_EXECUTOR_POD = new PodBuilder()
.withNewMetadata()
.withName("pod2")
.endMetadata()
.withNewSpec()
.withNodeName("node2")
.endSpec()
.withNewStatus()
.withHostIP("192.168.99.101")
.endStatus()
.build()
private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
private type LABELED_PODS = FilterWatchListDeletable[
Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
private type IN_NAMESPACE_PODS = NonNamespaceOperation[
Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
private val requestExecutorsService = new DeterministicScheduler()
private val sparkConf = new SparkConf(false)
.set("spark.executor.instances", "3")
@Mock
private var sparkContext: SparkContext = _
@Mock
private var listenerBus: LiveListenerBus = _
@Mock
private var taskSchedulerImpl: TaskSchedulerImpl = _
@Mock
private var allocatorExecutor: ScheduledExecutorService = _
@Mock
private var requestExecutorsService: ExecutorService = _
@Mock
private var executorBuilder: KubernetesExecutorBuilder = _
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
private var podOperations: PODS = _
@Mock
private var podsWithLabelOperations: LABELED_PODS = _
@Mock
private var podsInNamespace: IN_NAMESPACE_PODS = _
@Mock
private var podsWithDriverName: PodResource[Pod, DoneablePod] = _
private var sc: SparkContext = _
@Mock
private var rpcEnv: RpcEnv = _
@ -118,332 +48,103 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private var driverEndpointRef: RpcEndpointRef = _
@Mock
private var executorPodsWatch: Watch = _
private var kubernetesClient: KubernetesClient = _
@Mock
private var successFuture: Future[Boolean] = _
private var podOperations: PODS = _
@Mock
private var labeledPods: LABELED_PODS = _
@Mock
private var taskScheduler: TaskSchedulerImpl = _
@Mock
private var eventQueue: ExecutorPodsSnapshotsStore = _
@Mock
private var podAllocator: ExecutorPodsAllocator = _
@Mock
private var lifecycleEventHandler: ExecutorPodsLifecycleManager = _
@Mock
private var watchEvents: ExecutorPodsWatchSnapshotSource = _
@Mock
private var pollEvents: ExecutorPodsPollingSnapshotSource = _
private var sparkConf: SparkConf = _
private var executorPodsWatcherArgument: ArgumentCaptor[Watcher[Pod]] = _
private var allocatorRunnable: ArgumentCaptor[Runnable] = _
private var requestExecutorRunnable: ArgumentCaptor[Runnable] = _
private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
private val driverPod = new PodBuilder()
.withNewMetadata()
.withName(DRIVER_POD_NAME)
.addToLabels(SPARK_APP_ID_LABEL, APP_ID)
.addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
.endMetadata()
.build()
private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
before {
MockitoAnnotations.initMocks(this)
sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
.set(KUBERNETES_NAMESPACE, NAMESPACE)
.set("spark.driver.host", SPARK_DRIVER_HOST)
.set("spark.driver.port", SPARK_DRIVER_PORT.toString)
.set(KUBERNETES_ALLOCATION_BATCH_DELAY.key, POD_ALLOCATION_INTERVAL)
executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
when(taskScheduler.sc).thenReturn(sc)
when(sc.conf).thenReturn(sparkConf)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
when(sparkContext.conf).thenReturn(sparkConf)
when(sparkContext.listenerBus).thenReturn(listenerBus)
when(taskSchedulerImpl.sc).thenReturn(sparkContext)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations)
when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture()))
.thenReturn(executorPodsWatch)
when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace)
when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName)
when(podsWithDriverName.get()).thenReturn(driverPod)
when(allocatorExecutor.scheduleWithFixedDelay(
allocatorRunnable.capture(),
mockitoEq(0L),
mockitoEq(TimeUnit.MINUTES.toMillis(1)),
mockitoEq(TimeUnit.MILLISECONDS))).thenReturn(null)
// Creating Futures in Scala backed by a Java executor service resolves to running
// ExecutorService#execute (as opposed to submit)
doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())
when(rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
// Used by the CoarseGrainedSchedulerBackend when making RPC calls.
when(driverEndpointRef.ask[Boolean]
(any(classOf[Any]))
(any())).thenReturn(successFuture)
when(successFuture.failed).thenReturn(Future[Throwable] {
// emulate behavior of the Future.failed method.
throw new NoSuchElementException()
}(ThreadUtils.sameThread))
}
test("Basic lifecycle expectations when starting and stopping the scheduler.") {
val scheduler = newSchedulerBackend()
scheduler.start()
assert(executorPodsWatcherArgument.getValue != null)
assert(allocatorRunnable.getValue != null)
scheduler.stop()
verify(executorPodsWatch).close()
}
test("Static allocation should request executors upon first allocator run.") {
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
val scheduler = newSchedulerBackend()
scheduler.start()
requestExecutorRunnable.getValue.run()
val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
allocatorRunnable.getValue.run()
verify(podOperations).create(firstResolvedPod)
verify(podOperations).create(secondResolvedPod)
}
test("Killing executors deletes the executor pods") {
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
val scheduler = newSchedulerBackend()
scheduler.start()
requestExecutorRunnable.getValue.run()
val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
when(podOperations.create(any(classOf[Pod])))
.thenAnswer(AdditionalAnswers.returnsFirstArg())
allocatorRunnable.getValue.run()
scheduler.doKillExecutors(Seq("2"))
requestExecutorRunnable.getAllValues.asScala.last.run()
verify(podOperations).delete(secondResolvedPod)
verify(podOperations, never()).delete(firstResolvedPod)
}
test("Executors should be requested in batches.") {
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
val scheduler = newSchedulerBackend()
scheduler.start()
requestExecutorRunnable.getValue.run()
when(podOperations.create(any(classOf[Pod])))
.thenAnswer(AdditionalAnswers.returnsFirstArg())
val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
allocatorRunnable.getValue.run()
verify(podOperations).create(firstResolvedPod)
verify(podOperations, never()).create(secondResolvedPod)
val registerFirstExecutorMessage = RegisterExecutor(
"1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String])
when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
.apply(registerFirstExecutorMessage)
allocatorRunnable.getValue.run()
verify(podOperations).create(secondResolvedPod)
}
test("Scaled down executors should be cleaned up") {
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
val scheduler = newSchedulerBackend()
scheduler.start()
// The scheduler backend spins up one executor pod.
requestExecutorRunnable.getValue.run()
when(podOperations.create(any(classOf[Pod])))
.thenAnswer(AdditionalAnswers.returnsFirstArg())
val resolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
allocatorRunnable.getValue.run()
val executorEndpointRef = mock[RpcEndpointRef]
when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
val registerFirstExecutorMessage = RegisterExecutor(
"1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
.apply(registerFirstExecutorMessage)
// Request that there are 0 executors and trigger deletion from driver.
scheduler.doRequestTotalExecutors(0)
requestExecutorRunnable.getAllValues.asScala.last.run()
scheduler.doKillExecutors(Seq("1"))
requestExecutorRunnable.getAllValues.asScala.last.run()
verify(podOperations, times(1)).delete(resolvedPod)
driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
val exitedPod = exitPod(resolvedPod, 0)
executorPodsWatcherArgument.getValue.eventReceived(Action.DELETED, exitedPod)
allocatorRunnable.getValue.run()
// No more deletion attempts of the executors.
// This is graceful termination and should not be detected as a failure.
verify(podOperations, times(1)).delete(resolvedPod)
verify(driverEndpointRef, times(1)).send(
RemoveExecutor("1", ExecutorExited(
0,
exitCausedByApp = false,
s"Container in pod ${exitedPod.getMetadata.getName} exited from" +
s" explicit termination request.")))
}
test("Executors that fail should not be deleted.") {
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
val scheduler = newSchedulerBackend()
scheduler.start()
val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
requestExecutorRunnable.getValue.run()
allocatorRunnable.getValue.run()
val executorEndpointRef = mock[RpcEndpointRef]
when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
val registerFirstExecutorMessage = RegisterExecutor(
"1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
.apply(registerFirstExecutorMessage)
driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
executorPodsWatcherArgument.getValue.eventReceived(
Action.ERROR, exitPod(firstResolvedPod, 1))
// A replacement executor should be created but the error pod should persist.
val replacementPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
scheduler.doRequestTotalExecutors(1)
requestExecutorRunnable.getValue.run()
allocatorRunnable.getAllValues.asScala.last.run()
verify(podOperations, never()).delete(firstResolvedPod)
verify(driverEndpointRef).send(
RemoveExecutor("1", ExecutorExited(
1,
exitCausedByApp = true,
s"Pod ${FIRST_EXECUTOR_POD.getMetadata.getName}'s executor container exited with" +
" exit status code 1.")))
}
test("Executors disconnected due to unknown reasons are deleted and replaced.") {
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
val executorLostReasonCheckMaxAttempts = sparkConf.get(
KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
val scheduler = newSchedulerBackend()
scheduler.start()
val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
requestExecutorRunnable.getValue.run()
allocatorRunnable.getValue.run()
val executorEndpointRef = mock[RpcEndpointRef]
when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
val registerFirstExecutorMessage = RegisterExecutor(
"1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
.apply(registerFirstExecutorMessage)
driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
1 to executorLostReasonCheckMaxAttempts foreach { _ =>
allocatorRunnable.getValue.run()
verify(podOperations, never()).delete(FIRST_EXECUTOR_POD)
}
val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
allocatorRunnable.getValue.run()
verify(podOperations).delete(firstResolvedPod)
verify(driverEndpointRef).send(
RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
}
test("Executors that fail to start on the Kubernetes API call rebuild in the next batch.") {
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
val scheduler = newSchedulerBackend()
scheduler.start()
val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
when(podOperations.create(firstResolvedPod))
.thenThrow(new RuntimeException("test"))
requestExecutorRunnable.getValue.run()
allocatorRunnable.getValue.run()
verify(podOperations, times(1)).create(firstResolvedPod)
val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
allocatorRunnable.getValue.run()
verify(podOperations).create(recreatedResolvedPod)
}
test("Executors that are initially created but the watch notices them fail are rebuilt" +
" in the next batch.") {
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
val scheduler = newSchedulerBackend()
scheduler.start()
val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
when(podOperations.create(FIRST_EXECUTOR_POD)).thenAnswer(AdditionalAnswers.returnsFirstArg())
requestExecutorRunnable.getValue.run()
allocatorRunnable.getValue.run()
verify(podOperations, times(1)).create(firstResolvedPod)
executorPodsWatcherArgument.getValue.eventReceived(Action.ERROR, firstResolvedPod)
val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
allocatorRunnable.getValue.run()
verify(podOperations).create(recreatedResolvedPod)
}
private def newSchedulerBackend(): KubernetesClusterSchedulerBackend = {
new KubernetesClusterSchedulerBackend(
taskSchedulerImpl,
when(kubernetesClient.pods()).thenReturn(podOperations)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
taskScheduler,
rpcEnv,
executorBuilder,
kubernetesClient,
allocatorExecutor,
requestExecutorsService) {
override def applicationId(): String = APP_ID
requestExecutorsService,
eventQueue,
podAllocator,
lifecycleEventHandler,
watchEvents,
pollEvents) {
override def applicationId(): String = TEST_SPARK_APP_ID
}
}
private def exitPod(basePod: Pod, exitCode: Int): Pod = {
new PodBuilder(basePod)
.editStatus()
.addNewContainerStatus()
.withNewState()
.withNewTerminated()
.withExitCode(exitCode)
.endTerminated()
.endState()
.endContainerStatus()
.endStatus()
.build()
test("Start all components") {
schedulerBackendUnderTest.start()
verify(podAllocator).setTotalExpectedExecutors(3)
verify(podAllocator).start(TEST_SPARK_APP_ID)
verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
verify(watchEvents).start(TEST_SPARK_APP_ID)
verify(pollEvents).start(TEST_SPARK_APP_ID)
}
private def expectPodCreationWithId(executorId: Int, expectedPod: Pod): Pod = {
val resolvedPod = new PodBuilder(expectedPod)
.editMetadata()
.addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
.endMetadata()
.build()
val resolvedContainer = new ContainerBuilder().build()
when(executorBuilder.buildFromFeatures(Matchers.argThat(
new BaseMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
override def matches(argument: scala.Any)
: Boolean = {
argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] &&
argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
.roleSpecificConf.executorId == executorId.toString
}
override def describeTo(description: Description): Unit = {}
}))).thenReturn(SparkPod(resolvedPod, resolvedContainer))
new PodBuilder(resolvedPod)
.editSpec()
.addToContainers(resolvedContainer)
.endSpec()
.build()
test("Stop all components") {
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
schedulerBackendUnderTest.stop()
verify(eventQueue).stop()
verify(watchEvents).stop()
verify(pollEvents).stop()
verify(labeledPods).delete()
verify(kubernetesClient).close()
}
test("Remove executor") {
schedulerBackendUnderTest.start()
schedulerBackendUnderTest.doRemoveExecutor(
"1", ExecutorKilled)
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
}
test("Kill executors") {
schedulerBackendUnderTest.start()
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods)
schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
verify(labeledPods, never()).delete()
requestExecutorsService.runNextPendingCommand()
verify(labeledPods).delete()
}
test("Request total executors") {
schedulerBackendUnderTest.start()
schedulerBackendUnderTest.doRequestTotalExecutors(5)
verify(podAllocator).setTotalExpectedExecutors(3)
verify(podAllocator, never()).setTotalExpectedExecutors(5)
requestExecutorsService.runNextPendingCommand()
verify(podAllocator).setTotalExpectedExecutors(5)
}
}