diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 588cd9d40f..f5fae7cc8c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,7 +16,11 @@ */ package org.apache.spark.deploy.k8s -import org.apache.spark.SparkConf +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} + +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.util.Utils private[spark] object KubernetesUtils { @@ -60,4 +64,81 @@ private[spark] object KubernetesUtils { } def parseMasterUrl(url: String): String = url.substring("k8s://".length) + + def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = { + // Use more loggable format if value is null or empty + val indentStr = "\t" * indent + pairs.map { + case (k, v) => s"\n$indentStr $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}" + }.mkString("") + } + + /** + * Given a pod, output a human readable representation of its state + * + * @param pod Pod + * @return Human readable pod state + */ + def formatPodState(pod: Pod): String = { + val details = Seq[(String, String)]( + // pod metadata + ("pod name", pod.getMetadata.getName), + ("namespace", pod.getMetadata.getNamespace), + ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")), + ("pod uid", pod.getMetadata.getUid), + ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)), + + // spec details + ("service account name", pod.getSpec.getServiceAccountName), + ("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")), + ("node name", pod.getSpec.getNodeName), + + // status + ("start time", formatTime(pod.getStatus.getStartTime)), + ("phase", pod.getStatus.getPhase), + ("container status", containersDescription(pod, 2)) + ) + + formatPairsBundle(details) + } + + def containersDescription(p: Pod, indent: Int = 1): String = { + p.getStatus.getContainerStatuses.asScala.map { status => + Seq( + ("container name", status.getName), + ("container image", status.getImage)) ++ + containerStatusDescription(status) + }.map(p => formatPairsBundle(p, indent)).mkString("\n\n") + } + + def containerStatusDescription(containerStatus: ContainerStatus) + : Seq[(String, String)] = { + val state = containerStatus.getState + Option(state.getRunning) + .orElse(Option(state.getTerminated)) + .orElse(Option(state.getWaiting)) + .map { + case running: ContainerStateRunning => + Seq( + ("container state", "running"), + ("container started at", formatTime(running.getStartedAt))) + case waiting: ContainerStateWaiting => + Seq( + ("container state", "waiting"), + ("pending reason", waiting.getReason)) + case terminated: ContainerStateTerminated => + Seq( + ("container state", "terminated"), + ("container started at", formatTime(terminated.getStartedAt)), + ("container finished at", formatTime(terminated.getFinishedAt)), + ("exit code", terminated.getExitCode.toString), + ("termination reason", terminated.getReason)) + case unknown => + throw new SparkException(s"Unexpected container status type ${unknown.getClass}.") + }.getOrElse(Seq(("container state", "N/A"))) + } + + def formatTime(time: Time): String = { + if (time != null) time.getTime else "N/A" + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 173ac54162..1889fe5eb3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils @@ -99,82 +100,10 @@ private[k8s] class LoggingPodStatusWatcherImpl( scheduler.shutdown() } - private def formatPodState(pod: Pod): String = { - val details = Seq[(String, String)]( - // pod metadata - ("pod name", pod.getMetadata.getName), - ("namespace", pod.getMetadata.getNamespace), - ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")), - ("pod uid", pod.getMetadata.getUid), - ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)), - - // spec details - ("service account name", pod.getSpec.getServiceAccountName), - ("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")), - ("node name", pod.getSpec.getNodeName), - - // status - ("start time", formatTime(pod.getStatus.getStartTime)), - ("container images", - pod.getStatus.getContainerStatuses - .asScala - .map(_.getImage) - .mkString(", ")), - ("phase", pod.getStatus.getPhase), - ("status", pod.getStatus.getContainerStatuses.toString) - ) - - formatPairsBundle(details) - } - - private def formatPairsBundle(pairs: Seq[(String, String)]) = { - // Use more loggable format if value is null or empty - pairs.map { - case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}" - }.mkString("") - } - override def awaitCompletion(): Unit = { podCompletedFuture.await() logInfo(pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }.getOrElse("No containers were found in the driver pod.")) } - - private def containersDescription(p: Pod): String = { - p.getStatus.getContainerStatuses.asScala.map { status => - Seq( - ("Container name", status.getName), - ("Container image", status.getImage)) ++ - containerStatusDescription(status) - }.map(formatPairsBundle).mkString("\n\n") - } - - private def containerStatusDescription( - containerStatus: ContainerStatus): Seq[(String, String)] = { - val state = containerStatus.getState - Option(state.getRunning) - .orElse(Option(state.getTerminated)) - .orElse(Option(state.getWaiting)) - .map { - case running: ContainerStateRunning => - Seq( - ("Container state", "Running"), - ("Container started at", formatTime(running.getStartedAt))) - case waiting: ContainerStateWaiting => - Seq( - ("Container state", "Waiting"), - ("Pending reason", waiting.getReason)) - case terminated: ContainerStateTerminated => - Seq( - ("Container state", "Terminated"), - ("Exit code", terminated.getExitCode.toString)) - case unknown => - throw new SparkException(s"Unexpected container status type ${unknown.getClass}.") - }.getOrElse(Seq(("Container state", "N/A"))) - } - - private def formatTime(time: Time): String = { - if (time != null) time.getTime else "N/A" - } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index b28d939903..e2800cff7b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.ExecutorExited import org.apache.spark.util.Utils @@ -151,13 +152,15 @@ private[spark] class ExecutorPodsLifecycleManager( private def exitReasonMessage(podState: FinalPodState, execId: Long, exitCode: Int) = { val pod = podState.pod + val reason = Option(pod.getStatus.getReason) + val message = Option(pod.getStatus.getMessage) s""" |The executor with id $execId exited with exit code $exitCode. - |The API gave the following brief reason: ${pod.getStatus.getReason} - |The API gave the following message: ${pod.getStatus.getMessage} + |The API gave the following brief reason: ${reason.getOrElse("N/A")} + |The API gave the following message: ${message.getOrElse("N/A")} |The API gave the following container statuses: | - |${pod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")} + |${containersDescription(pod)} """.stripMargin } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index 562ace9f49..d8409383b4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -31,6 +31,7 @@ import scala.collection.mutable import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.scheduler.ExecutorExited import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ @@ -104,13 +105,15 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte } private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = { + val reason = Option(failedPod.getStatus.getReason) + val message = Option(failedPod.getStatus.getMessage) s""" |The executor with id $failedExecutorId exited with exit code 1. - |The API gave the following brief reason: ${failedPod.getStatus.getReason} - |The API gave the following message: ${failedPod.getStatus.getMessage} + |The API gave the following brief reason: ${reason.getOrElse("N/A")} + |The API gave the following message: ${message.getOrElse("N/A")} |The API gave the following container statuses: | - |${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")} + |${containersDescription(failedPod)} """.stripMargin }