[SPARK-25222][K8S] Improve container status logging

## What changes were proposed in this pull request?

Currently when running Spark on Kubernetes a logger is run by the client that watches the K8S API for events related to the Driver pod and logs them.  However for the container status aspect of the logging this simply dumps the raw object which is not human readable e.g.

![screen shot 2018-08-24 at 10 37 46](https://user-images.githubusercontent.com/2104864/44577799-e0486880-a789-11e8-9ae9-fdeddacbbea8.png)
![screen shot 2018-08-24 at 10 38 14](https://user-images.githubusercontent.com/2104864/44577800-e0e0ff00-a789-11e8-81f5-3bb315dbbdb1.png)

This is despite the fact that the logging class in question actually has methods to pretty print this information but only invokes these at the end of a job.

This PR improves the logging to always use the pretty printing methods, additionally modifying them to include further useful information provided by the K8S API.

A similar issue also exists when tasks are lost that will be addressed by further commits to this PR

- [x] Improved `LoggingPodStatusWatcher`
- [x] Improved container status on task failure

## How was this patch tested?

Built and launched jobs with the updated Spark client and observed the new human readable output:

![screen shot 2018-08-24 at 11 09 32](https://user-images.githubusercontent.com/2104864/44579429-5353de00-a78e-11e8-9228-c750af8e6311.png)
![screen shot 2018-08-24 at 11 09 42](https://user-images.githubusercontent.com/2104864/44579430-5353de00-a78e-11e8-8fce-d5bb2a3ae65f.png)
![screen shot 2018-08-24 at 11 10 13](https://user-images.githubusercontent.com/2104864/44579431-53ec7480-a78e-11e8-9fa2-aeabc5b28ec4.png)
![screen shot 2018-08-24 at 17 47 44](https://user-images.githubusercontent.com/2104864/44596922-db090f00-a7c5-11e8-910c-bc2339f5a196.png)

Suggested reviewers: liyinan926 mccheah

Author: Rob Vesse <rvesse@dotnetrdf.org>

Closes #22215 from rvesse/SPARK-25222.
This commit is contained in:
Rob Vesse 2018-09-06 16:15:11 -07:00 committed by mcheah
parent c84bc40d7f
commit 27d3b0a51c
4 changed files with 95 additions and 79 deletions

View file

@ -16,7 +16,11 @@
*/
package org.apache.spark.deploy.k8s
import org.apache.spark.SparkConf
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.util.Utils
private[spark] object KubernetesUtils {
@ -60,4 +64,81 @@ private[spark] object KubernetesUtils {
}
def parseMasterUrl(url: String): String = url.substring("k8s://".length)
def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = {
// Use more loggable format if value is null or empty
val indentStr = "\t" * indent
pairs.map {
case (k, v) => s"\n$indentStr $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
}.mkString("")
}
/**
* Given a pod, output a human readable representation of its state
*
* @param pod Pod
* @return Human readable pod state
*/
def formatPodState(pod: Pod): String = {
val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName),
("namespace", pod.getMetadata.getNamespace),
("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),
// spec details
("service account name", pod.getSpec.getServiceAccountName),
("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName),
// status
("start time", formatTime(pod.getStatus.getStartTime)),
("phase", pod.getStatus.getPhase),
("container status", containersDescription(pod, 2))
)
formatPairsBundle(details)
}
def containersDescription(p: Pod, indent: Int = 1): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
("container name", status.getName),
("container image", status.getImage)) ++
containerStatusDescription(status)
}.map(p => formatPairsBundle(p, indent)).mkString("\n\n")
}
def containerStatusDescription(containerStatus: ContainerStatus)
: Seq[(String, String)] = {
val state = containerStatus.getState
Option(state.getRunning)
.orElse(Option(state.getTerminated))
.orElse(Option(state.getWaiting))
.map {
case running: ContainerStateRunning =>
Seq(
("container state", "running"),
("container started at", formatTime(running.getStartedAt)))
case waiting: ContainerStateWaiting =>
Seq(
("container state", "waiting"),
("pending reason", waiting.getReason))
case terminated: ContainerStateTerminated =>
Seq(
("container state", "terminated"),
("container started at", formatTime(terminated.getStartedAt)),
("container finished at", formatTime(terminated.getFinishedAt)),
("exit code", terminated.getExitCode.toString),
("termination reason", terminated.getReason))
case unknown =>
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
}.getOrElse(Seq(("container state", "N/A")))
}
def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
}
}

View file

@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils
@ -99,82 +100,10 @@ private[k8s] class LoggingPodStatusWatcherImpl(
scheduler.shutdown()
}
private def formatPodState(pod: Pod): String = {
val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName),
("namespace", pod.getMetadata.getNamespace),
("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),
// spec details
("service account name", pod.getSpec.getServiceAccountName),
("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName),
// status
("start time", formatTime(pod.getStatus.getStartTime)),
("container images",
pod.getStatus.getContainerStatuses
.asScala
.map(_.getImage)
.mkString(", ")),
("phase", pod.getStatus.getPhase),
("status", pod.getStatus.getContainerStatuses.toString)
)
formatPairsBundle(details)
}
private def formatPairsBundle(pairs: Seq[(String, String)]) = {
// Use more loggable format if value is null or empty
pairs.map {
case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
}.mkString("")
}
override def awaitCompletion(): Unit = {
podCompletedFuture.await()
logInfo(pod.map { p =>
s"Container final statuses:\n\n${containersDescription(p)}"
}.getOrElse("No containers were found in the driver pod."))
}
private def containersDescription(p: Pod): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
("Container name", status.getName),
("Container image", status.getImage)) ++
containerStatusDescription(status)
}.map(formatPairsBundle).mkString("\n\n")
}
private def containerStatusDescription(
containerStatus: ContainerStatus): Seq[(String, String)] = {
val state = containerStatus.getState
Option(state.getRunning)
.orElse(Option(state.getTerminated))
.orElse(Option(state.getWaiting))
.map {
case running: ContainerStateRunning =>
Seq(
("Container state", "Running"),
("Container started at", formatTime(running.getStartedAt)))
case waiting: ContainerStateWaiting =>
Seq(
("Container state", "Waiting"),
("Pending reason", waiting.getReason))
case terminated: ContainerStateTerminated =>
Seq(
("Container state", "Terminated"),
("Exit code", terminated.getExitCode.toString))
case unknown =>
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
}.getOrElse(Seq(("Container state", "N/A")))
}
private def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
}
}

View file

@ -24,6 +24,7 @@ import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.util.Utils
@ -151,13 +152,15 @@ private[spark] class ExecutorPodsLifecycleManager(
private def exitReasonMessage(podState: FinalPodState, execId: Long, exitCode: Int) = {
val pod = podState.pod
val reason = Option(pod.getStatus.getReason)
val message = Option(pod.getStatus.getMessage)
s"""
|The executor with id $execId exited with exit code $exitCode.
|The API gave the following brief reason: ${pod.getStatus.getReason}
|The API gave the following message: ${pod.getStatus.getMessage}
|The API gave the following brief reason: ${reason.getOrElse("N/A")}
|The API gave the following message: ${message.getOrElse("N/A")}
|The API gave the following container statuses:
|
|${pod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
|${containersDescription(pod)}
""".stripMargin
}

View file

@ -31,6 +31,7 @@ import scala.collection.mutable
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
@ -104,13 +105,15 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
}
private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = {
val reason = Option(failedPod.getStatus.getReason)
val message = Option(failedPod.getStatus.getMessage)
s"""
|The executor with id $failedExecutorId exited with exit code 1.
|The API gave the following brief reason: ${failedPod.getStatus.getReason}
|The API gave the following message: ${failedPod.getStatus.getMessage}
|The API gave the following brief reason: ${reason.getOrElse("N/A")}
|The API gave the following message: ${message.getOrElse("N/A")}
|The API gave the following container statuses:
|
|${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
|${containersDescription(failedPod)}
""".stripMargin
}