[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s
### What changes were proposed in this pull request? Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed. For more relevant information see here: https://github.com/fabric8io/kubernetes-client/issues/1075 ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Running spark-submit to a k8s cluster. Not sure how to make an automated test for this. If someone can help me out that would be great. Closes #28423 from stijndehaes/bugfix/k8s-submit-resource-version-change. Authored-by: Stijn De Haes <stijndehaes@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com>
This commit is contained in:
parent
4da93b00d7
commit
0432379f99
|
@ -21,9 +21,11 @@ import java.util.{Collections, UUID}
|
|||
import java.util.Properties
|
||||
|
||||
import io.fabric8.kubernetes.api.model._
|
||||
import io.fabric8.kubernetes.client.KubernetesClient
|
||||
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
|
||||
import io.fabric8.kubernetes.client.Watcher.Action
|
||||
import scala.collection.mutable
|
||||
import scala.util.control.NonFatal
|
||||
import util.control.Breaks._
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.deploy.SparkApplication
|
||||
|
@ -127,25 +129,37 @@ private[spark] class Client(
|
|||
.endSpec()
|
||||
.build()
|
||||
val driverPodName = resolvedDriverPod.getMetadata.getName
|
||||
Utils.tryWithResource(
|
||||
kubernetesClient
|
||||
.pods()
|
||||
.withName(driverPodName)
|
||||
.watch(watcher)) { _ =>
|
||||
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
|
||||
try {
|
||||
val otherKubernetesResources =
|
||||
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
|
||||
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
|
||||
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
kubernetesClient.pods().delete(createdDriverPod)
|
||||
throw e
|
||||
}
|
||||
|
||||
val sId = Seq(conf.namespace, driverPodName).mkString(":")
|
||||
watcher.watchOrStop(sId)
|
||||
var watch: Watch = null
|
||||
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
|
||||
try {
|
||||
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
|
||||
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
|
||||
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
kubernetesClient.pods().delete(createdDriverPod)
|
||||
throw e
|
||||
}
|
||||
val sId = Seq(conf.namespace, driverPodName).mkString(":")
|
||||
breakable {
|
||||
while (true) {
|
||||
val podWithName = kubernetesClient
|
||||
.pods()
|
||||
.withName(driverPodName)
|
||||
// Reset resource to old before we start the watch, this is important for race conditions
|
||||
watcher.reset()
|
||||
watch = podWithName.watch(watcher)
|
||||
|
||||
// Send the latest pod state we know to the watcher to make sure we didn't miss anything
|
||||
watcher.eventReceived(Action.MODIFIED, podWithName.get())
|
||||
|
||||
// Break the while loop if the pod is completed or we don't want to wait
|
||||
if(watcher.watchOrStop(sId)) {
|
||||
watch.close()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit
|
|||
import io.fabric8.kubernetes.api.model.Pod
|
||||
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
|
||||
import io.fabric8.kubernetes.client.Watcher.Action
|
||||
import java.net.HttpURLConnection.HTTP_GONE
|
||||
|
||||
import org.apache.spark.deploy.k8s.Config._
|
||||
import org.apache.spark.deploy.k8s.KubernetesDriverConf
|
||||
|
@ -26,7 +27,8 @@ import org.apache.spark.deploy.k8s.KubernetesUtils._
|
|||
import org.apache.spark.internal.Logging
|
||||
|
||||
private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
|
||||
def watchOrStop(submissionId: String): Unit
|
||||
def watchOrStop(submissionId: String): Boolean
|
||||
def reset(): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -42,10 +44,16 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
|
|||
|
||||
private var podCompleted = false
|
||||
|
||||
private var resourceTooOldReceived = false
|
||||
|
||||
private var pod = Option.empty[Pod]
|
||||
|
||||
private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
|
||||
|
||||
override def reset(): Unit = {
|
||||
resourceTooOldReceived = false
|
||||
}
|
||||
|
||||
override def eventReceived(action: Action, pod: Pod): Unit = {
|
||||
this.pod = Option(pod)
|
||||
action match {
|
||||
|
@ -62,7 +70,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
|
|||
|
||||
override def onClose(e: KubernetesClientException): Unit = {
|
||||
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
|
||||
closeWatch()
|
||||
if(e != null && e.getCode == HTTP_GONE) {
|
||||
resourceTooOldReceived = true
|
||||
logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
|
||||
} else {
|
||||
closeWatch()
|
||||
}
|
||||
}
|
||||
|
||||
private def logLongStatus(): Unit = {
|
||||
|
@ -78,20 +91,26 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
|
|||
this.notifyAll()
|
||||
}
|
||||
|
||||
override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
|
||||
override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
|
||||
logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
|
||||
val interval = conf.get(REPORT_INTERVAL)
|
||||
synchronized {
|
||||
while (!podCompleted) {
|
||||
while (!podCompleted && !resourceTooOldReceived) {
|
||||
wait(interval)
|
||||
logInfo(s"Application status for $appId (phase: $phase)")
|
||||
}
|
||||
}
|
||||
logInfo(
|
||||
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
|
||||
.getOrElse("No containers were found in the driver pod."))
|
||||
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
|
||||
|
||||
if(podCompleted) {
|
||||
logInfo(
|
||||
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
|
||||
.getOrElse("No containers were found in the driver pod."))
|
||||
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
|
||||
}
|
||||
podCompleted
|
||||
} else {
|
||||
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
|
||||
// Always act like the application has completed since we don't want to wait for app completion
|
||||
true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -136,6 +136,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
|
||||
when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
|
||||
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
|
||||
when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
|
||||
doReturn(resourceList)
|
||||
.when(kubernetesClient)
|
||||
.resourceList(createdResourcesArgumentCaptor.capture())
|
||||
|
|
Loading…
Reference in a new issue