diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index b4155fed8a..713d35dcf6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -21,9 +21,11 @@ import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, Watch} +import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.mutable import scala.util.control.NonFatal +import util.control.Breaks._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkApplication @@ -127,25 +129,37 @@ private[spark] class Client( .endSpec() .build() val driverPodName = resolvedDriverPod.getMetadata.getName - Utils.tryWithResource( - kubernetesClient - .pods() - .withName(driverPodName) - .watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - val sId = Seq(conf.namespace, driverPodName).mkString(":") - watcher.watchOrStop(sId) + var watch: Watch = null + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + } catch { + case NonFatal(e) => + kubernetesClient.pods().delete(createdDriverPod) + throw e + } + val sId = Seq(conf.namespace, driverPodName).mkString(":") + breakable { + while (true) { + val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + // Reset resource to old before we start the watch, this is important for race conditions + watcher.reset() + watch = podWithName.watch(watcher) + + // Send the latest pod state we know to the watcher to make sure we didn't miss anything + watcher.eventReceived(Action.MODIFIED, podWithName.get()) + + // Break the while loop if the pod is completed or we don't want to wait + if(watcher.watchOrStop(sId)) { + watch.close() + break + } + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index ce3c80c0f8..aa27a9ef50 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import java.net.HttpURLConnection.HTTP_GONE import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.KubernetesDriverConf @@ -26,7 +27,8 @@ import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { - def watchOrStop(submissionId: String): Unit + def watchOrStop(submissionId: String): Boolean + def reset(): Unit } /** @@ -42,10 +44,16 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) private var podCompleted = false + private var resourceTooOldReceived = false + private var pod = Option.empty[Pod] private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + override def reset(): Unit = { + resourceTooOldReceived = false + } + override def eventReceived(action: Action, pod: Pod): Unit = { this.pod = Option(pod) action match { @@ -62,7 +70,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") - closeWatch() + if(e != null && e.getCode == HTTP_GONE) { + resourceTooOldReceived = true + logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") + } else { + closeWatch() + } } private def logLongStatus(): Unit = { @@ -78,20 +91,26 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) this.notifyAll() } - override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) { + override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) { logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") val interval = conf.get(REPORT_INTERVAL) synchronized { - while (!podCompleted) { + while (!podCompleted && !resourceTooOldReceived) { wait(interval) logInfo(s"Application status for $appId (phase: $phase)") } } - logInfo( - pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } - .getOrElse("No containers were found in the driver pod.")) - logInfo(s"Application ${conf.appName} with submission ID $sId finished") + + if(podCompleted) { + logInfo( + pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${conf.appName} with submission ID $sId finished") + } + podCompleted } else { logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes") + // Always act like the application has completed since we don't want to wait for app completion + true } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 5d49ac0bba..d9ec3feb52 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -136,6 +136,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) + when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture())