[SPARK-28947][K8S] Status logging not happens at an interval for liveness

### What changes were proposed in this pull request?

This pr invoke the start method of `LoggingPodStatusWatcherImpl` for status logging at intervals.

### Why are the changes needed?

This pr invoke the start method of `LoggingPodStatusWatcherImpl` is declared but never called

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

manually test

Closes #25648 from yaooqinn/SPARK-28947.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Kent Yao 2019-10-15 12:34:39 -07:00 committed by Marcelo Vanzin
parent 39d53d3e74
commit 02c5b4f763
3 changed files with 33 additions and 58 deletions

View file

@ -86,15 +86,12 @@ private[spark] object ClientArguments {
* @param builder Responsible for building the base driver pod based on a composition of
* implemented features.
* @param kubernetesClient the client to talk to the Kubernetes API server
* @param waitForAppCompletion a flag indicating whether the client should wait for the application
* to complete
* @param watcher a watcher that monitors and logs the application status
*/
private[spark] class Client(
conf: KubernetesDriverConf,
builder: KubernetesDriverBuilder,
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
watcher: LoggingPodStatusWatcher) extends Logging {
def run(): Unit = {
@ -124,10 +121,11 @@ private[spark] class Client(
.endVolume()
.endSpec()
.build()
val driverPodName = resolvedDriverPod.getMetadata.getName
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.withName(driverPodName)
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
@ -141,16 +139,8 @@ private[spark] class Client(
throw e
}
val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
s"${resolvedDriverPod.getMetadata.getName}"
if (waitForAppCompletion) {
logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...")
watcher.awaitCompletion()
logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.")
} else {
logInfo(s"Deployed Spark application ${conf.appName} with " +
s"submission ID ${sId} into Kubernetes.")
}
val sId = Seq(conf.namespace, driverPodName).mkString(":")
watcher.watchOrStop(sId)
}
}
@ -199,13 +189,11 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
}
private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// For constructing the app ID, we can't use the Spark application name, as the app ID is going
// to be added as a label to group resources belonging to the same application. Label values are
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below.
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
kubernetesAppId,
@ -215,9 +203,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
@ -231,7 +217,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
kubernetesConf,
new KubernetesDriverBuilder(),
kubernetesClient,
waitForAppCompletion,
watcher)
client.run()
}

View file

@ -16,49 +16,36 @@
*/
package org.apache.spark.deploy.k8s.submit
import java.util.concurrent.{CountDownLatch, TimeUnit}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesDriverConf
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils
private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
def awaitCompletion(): Unit
def watchOrStop(submissionId: String): Unit
}
/**
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
* every state change and also at an interval for liveness.
*
* @param appId application ID.
* @param maybeLoggingInterval ms between each state request. If provided, must be a positive
* number.
* @param conf kubernetes driver conf.
*/
private[k8s] class LoggingPodStatusWatcherImpl(
appId: String,
maybeLoggingInterval: Option[Long])
private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
extends LoggingPodStatusWatcher with Logging {
private val podCompletedFuture = new CountDownLatch(1)
// start timer for periodic logging
private val scheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
private val logRunnable: Runnable = () => logShortStatus()
private val appId = conf.appId
private var podCompleted = false
private var pod = Option.empty[Pod]
private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
def start(): Unit = {
maybeLoggingInterval.foreach { interval =>
scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
}
}
override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)
action match {
@ -78,11 +65,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
closeWatch()
}
private def logShortStatus() = {
logInfo(s"Application status for $appId (phase: $phase)")
}
private def logLongStatus() = {
private def logLongStatus(): Unit = {
logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown"))
}
@ -90,15 +73,25 @@ private[k8s] class LoggingPodStatusWatcherImpl(
phase == "Succeeded" || phase == "Failed"
}
private def closeWatch(): Unit = {
podCompletedFuture.countDown()
scheduler.shutdown()
private def closeWatch(): Unit = synchronized {
podCompleted = true
this.notifyAll()
}
override def awaitCompletion(): Unit = {
podCompletedFuture.await()
logInfo(pod.map { p =>
s"Container final statuses:\n\n${containersDescription(p)}"
}.getOrElse("No containers were found in the driver pod."))
override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
val interval = conf.get(REPORT_INTERVAL)
synchronized {
while (!podCompleted) {
wait(interval)
logInfo(s"Application status for $appId (phase: $phase)")
}
}
logInfo(
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
.getOrElse("No containers were found in the driver pod."))
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
} else {
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
}
}

View file

@ -146,7 +146,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kconf,
driverBuilder,
kubernetesClient,
false,
loggingPodStatusWatcher)
submissionClient.run()
verify(podOperations).create(FULL_EXPECTED_POD)
@ -157,7 +156,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kconf,
driverBuilder,
kubernetesClient,
false,
loggingPodStatusWatcher)
submissionClient.run()
val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
@ -181,9 +179,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kconf,
driverBuilder,
kubernetesClient,
true,
loggingPodStatusWatcher)
submissionClient.run()
verify(loggingPodStatusWatcher).awaitCompletion()
verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver")
}
}