From b4b78ce26567ce7ab83d47ce3b6af87c866bcacb Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 10 Jun 2021 13:39:39 -0700 Subject: [PATCH] [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception ### What changes were proposed in this pull request? A follow-up for SPARK-32975 to avoid unexpected the `None.get` exception Run SparkPi with docker desktop, as podName is an option, we will got ```logtalk 21/06/09 01:09:12 ERROR Utils: Uncaught exception in thread main java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:110) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1417) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.start(ExecutorPodsAllocator.scala:111) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220) at org.apache.spark.SparkContext.(SparkContext.scala:581) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2686) at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:948) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:942) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:30) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` ### Why are the changes needed? fix a regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Manual. Closes #32830 from yaooqinn/SPARK-32975. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- .../cluster/k8s/ExecutorPodsAllocator.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 7b17f44291..c101567a1d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -102,13 +102,15 @@ private[spark] class ExecutorPodsAllocator( @volatile private var deletedExecutorIds = Set.empty[Long] def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { - // Wait until the driver pod is ready before starting executors, as the headless service won't - // be resolvable by DNS until the driver pod is ready. - Utils.tryLogNonFatalError { - kubernetesClient - .pods() - .withName(kubernetesDriverPodName.get) - .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) + driverPod.foreach { pod => + // Wait until the driver pod is ready before starting executors, as the headless service won't + // be resolvable by DNS until the driver pod is ready. + Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .withName(pod.getMetadata.getName) + .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) + } } snapshotsStore.addSubscriber(podAllocationDelay) { onNewSnapshots(applicationId, schedulerBackend, _)