From 27d3b0a51cfd1caf05c242b45db9a78ef5868685 Mon Sep 17 00:00:00 2001 From: Rob Vesse Date: Thu, 6 Sep 2018 16:15:11 -0700 Subject: [PATCH] [SPARK-25222][K8S] Improve container status logging ## What changes were proposed in this pull request? Currently when running Spark on Kubernetes a logger is run by the client that watches the K8S API for events related to the Driver pod and logs them. However for the container status aspect of the logging this simply dumps the raw object which is not human readable e.g. ![screen shot 2018-08-24 at 10 37 46](https://user-images.githubusercontent.com/2104864/44577799-e0486880-a789-11e8-9ae9-fdeddacbbea8.png) ![screen shot 2018-08-24 at 10 38 14](https://user-images.githubusercontent.com/2104864/44577800-e0e0ff00-a789-11e8-81f5-3bb315dbbdb1.png) This is despite the fact that the logging class in question actually has methods to pretty print this information but only invokes these at the end of a job. This PR improves the logging to always use the pretty printing methods, additionally modifying them to include further useful information provided by the K8S API. A similar issue also exists when tasks are lost that will be addressed by further commits to this PR - [x] Improved `LoggingPodStatusWatcher` - [x] Improved container status on task failure ## How was this patch tested? Built and launched jobs with the updated Spark client and observed the new human readable output: ![screen shot 2018-08-24 at 11 09 32](https://user-images.githubusercontent.com/2104864/44579429-5353de00-a78e-11e8-9228-c750af8e6311.png) ![screen shot 2018-08-24 at 11 09 42](https://user-images.githubusercontent.com/2104864/44579430-5353de00-a78e-11e8-8fce-d5bb2a3ae65f.png) ![screen shot 2018-08-24 at 11 10 13](https://user-images.githubusercontent.com/2104864/44579431-53ec7480-a78e-11e8-9fa2-aeabc5b28ec4.png) ![screen shot 2018-08-24 at 17 47 44](https://user-images.githubusercontent.com/2104864/44596922-db090f00-a7c5-11e8-910c-bc2339f5a196.png) Suggested reviewers: liyinan926 mccheah Author: Rob Vesse Closes #22215 from rvesse/SPARK-25222. --- .../spark/deploy/k8s/KubernetesUtils.scala | 83 ++++++++++++++++++- .../k8s/submit/LoggingPodStatusWatcher.scala | 73 +--------------- .../k8s/ExecutorPodsLifecycleManager.scala | 9 +- .../ExecutorPodsLifecycleManagerSuite.scala | 9 +- 4 files changed, 95 insertions(+), 79 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 588cd9d40f..f5fae7cc8c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,7 +16,11 @@ */ package org.apache.spark.deploy.k8s -import org.apache.spark.SparkConf +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} + +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.util.Utils private[spark] object KubernetesUtils { @@ -60,4 +64,81 @@ private[spark] object KubernetesUtils { } def parseMasterUrl(url: String): String = url.substring("k8s://".length) + + def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = { + // Use more loggable format if value is null or empty + val indentStr = "\t" * indent + pairs.map { + case (k, v) => s"\n$indentStr $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}" + }.mkString("") + } + + /** + * Given a pod, output a human readable representation of its state + * + * @param pod Pod + * @return Human readable pod state + */ + def formatPodState(pod: Pod): String = { + val details = Seq[(String, String)]( + // pod metadata + ("pod name", pod.getMetadata.getName), + ("namespace", pod.getMetadata.getNamespace), + ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")), + ("pod uid", pod.getMetadata.getUid), + ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)), + + // spec details + ("service account name", pod.getSpec.getServiceAccountName), + ("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")), + ("node name", pod.getSpec.getNodeName), + + // status + ("start time", formatTime(pod.getStatus.getStartTime)), + ("phase", pod.getStatus.getPhase), + ("container status", containersDescription(pod, 2)) + ) + + formatPairsBundle(details) + } + + def containersDescription(p: Pod, indent: Int = 1): String = { + p.getStatus.getContainerStatuses.asScala.map { status => + Seq( + ("container name", status.getName), + ("container image", status.getImage)) ++ + containerStatusDescription(status) + }.map(p => formatPairsBundle(p, indent)).mkString("\n\n") + } + + def containerStatusDescription(containerStatus: ContainerStatus) + : Seq[(String, String)] = { + val state = containerStatus.getState + Option(state.getRunning) + .orElse(Option(state.getTerminated)) + .orElse(Option(state.getWaiting)) + .map { + case running: ContainerStateRunning => + Seq( + ("container state", "running"), + ("container started at", formatTime(running.getStartedAt))) + case waiting: ContainerStateWaiting => + Seq( + ("container state", "waiting"), + ("pending reason", waiting.getReason)) + case terminated: ContainerStateTerminated => + Seq( + ("container state", "terminated"), + ("container started at", formatTime(terminated.getStartedAt)), + ("container finished at", formatTime(terminated.getFinishedAt)), + ("exit code", terminated.getExitCode.toString), + ("termination reason", terminated.getReason)) + case unknown => + throw new SparkException(s"Unexpected container status type ${unknown.getClass}.") + }.getOrElse(Seq(("container state", "N/A"))) + } + + def formatTime(time: Time): String = { + if (time != null) time.getTime else "N/A" + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 173ac54162..1889fe5eb3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils @@ -99,82 +100,10 @@ private[k8s] class LoggingPodStatusWatcherImpl( scheduler.shutdown() } - private def formatPodState(pod: Pod): String = { - val details = Seq[(String, String)]( - // pod metadata - ("pod name", pod.getMetadata.getName), - ("namespace", pod.getMetadata.getNamespace), - ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")), - ("pod uid", pod.getMetadata.getUid), - ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)), - - // spec details - ("service account name", pod.getSpec.getServiceAccountName), - ("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")), - ("node name", pod.getSpec.getNodeName), - - // status - ("start time", formatTime(pod.getStatus.getStartTime)), - ("container images", - pod.getStatus.getContainerStatuses - .asScala - .map(_.getImage) - .mkString(", ")), - ("phase", pod.getStatus.getPhase), - ("status", pod.getStatus.getContainerStatuses.toString) - ) - - formatPairsBundle(details) - } - - private def formatPairsBundle(pairs: Seq[(String, String)]) = { - // Use more loggable format if value is null or empty - pairs.map { - case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}" - }.mkString("") - } - override def awaitCompletion(): Unit = { podCompletedFuture.await() logInfo(pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }.getOrElse("No containers were found in the driver pod.")) } - - private def containersDescription(p: Pod): String = { - p.getStatus.getContainerStatuses.asScala.map { status => - Seq( - ("Container name", status.getName), - ("Container image", status.getImage)) ++ - containerStatusDescription(status) - }.map(formatPairsBundle).mkString("\n\n") - } - - private def containerStatusDescription( - containerStatus: ContainerStatus): Seq[(String, String)] = { - val state = containerStatus.getState - Option(state.getRunning) - .orElse(Option(state.getTerminated)) - .orElse(Option(state.getWaiting)) - .map { - case running: ContainerStateRunning => - Seq( - ("Container state", "Running"), - ("Container started at", formatTime(running.getStartedAt))) - case waiting: ContainerStateWaiting => - Seq( - ("Container state", "Waiting"), - ("Pending reason", waiting.getReason)) - case terminated: ContainerStateTerminated => - Seq( - ("Container state", "Terminated"), - ("Exit code", terminated.getExitCode.toString)) - case unknown => - throw new SparkException(s"Unexpected container status type ${unknown.getClass}.") - }.getOrElse(Seq(("Container state", "N/A"))) - } - - private def formatTime(time: Time): String = { - if (time != null) time.getTime else "N/A" - } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index b28d939903..e2800cff7b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.ExecutorExited import org.apache.spark.util.Utils @@ -151,13 +152,15 @@ private[spark] class ExecutorPodsLifecycleManager( private def exitReasonMessage(podState: FinalPodState, execId: Long, exitCode: Int) = { val pod = podState.pod + val reason = Option(pod.getStatus.getReason) + val message = Option(pod.getStatus.getMessage) s""" |The executor with id $execId exited with exit code $exitCode. - |The API gave the following brief reason: ${pod.getStatus.getReason} - |The API gave the following message: ${pod.getStatus.getMessage} + |The API gave the following brief reason: ${reason.getOrElse("N/A")} + |The API gave the following message: ${message.getOrElse("N/A")} |The API gave the following container statuses: | - |${pod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")} + |${containersDescription(pod)} """.stripMargin } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index 562ace9f49..d8409383b4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -31,6 +31,7 @@ import scala.collection.mutable import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.scheduler.ExecutorExited import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ @@ -104,13 +105,15 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte } private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = { + val reason = Option(failedPod.getStatus.getReason) + val message = Option(failedPod.getStatus.getMessage) s""" |The executor with id $failedExecutorId exited with exit code 1. - |The API gave the following brief reason: ${failedPod.getStatus.getReason} - |The API gave the following message: ${failedPod.getStatus.getMessage} + |The API gave the following brief reason: ${reason.getOrElse("N/A")} + |The API gave the following message: ${message.getOrElse("N/A")} |The API gave the following container statuses: | - |${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")} + |${containersDescription(failedPod)} """.stripMargin }