From 42815548c7ef498439b9ba47134a6f3e1b519c83 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 2 Jul 2018 10:24:04 -0700 Subject: [PATCH] [SPARK-24683][K8S] Fix k8s no resource ## What changes were proposed in this pull request? Make SparkSubmit pass in the main class even if `SparkLauncher.NO_RESOURCE` is the primary resource. ## How was this patch tested? New integration test written to capture this case. Author: mcheah Closes #21660 from mccheah/fix-k8s-no-resource. --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 2 ++ .../deploy/k8s/submit/KubernetesDriverBuilder.scala | 3 ++- .../deploy/k8s/integrationtest/KubernetesSuite.scala | 10 ++++++++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e83d82f847..2da778a297 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -702,6 +702,8 @@ private[spark] class SparkSubmit extends Logging { childArgs ++= Array("--primary-java-resource", args.primaryResource) childArgs ++= Array("--main-class", args.mainClass) } + } else { + childArgs ++= Array("--main-class", args.mainClass) } if (args.childArgs != null) { args.childArgs.foreach { arg => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 5762d8245f..0dd1c37661 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -64,7 +64,8 @@ private[spark] class KubernetesDriverBuilder( case JavaMainAppResource(_) => provideJavaStep(kubernetesConf) case PythonMainAppResource(_) => - providePythonStep(kubernetesConf)}.getOrElse(provideJavaStep(kubernetesConf)) + providePythonStep(kubernetesConf)} + .getOrElse(provideJavaStep(kubernetesConf)) val allFeatures: Seq[KubernetesFeatureConfigStep] = (baseFeatures :+ bindingsStep) ++ diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 65c513cf24..6e334c83fb 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -21,17 +21,17 @@ import java.nio.file.{Path, Paths} import java.util.UUID import java.util.regex.Pattern -import scala.collection.JavaConverters._ - import com.google.common.io.PatternFilenameFilter import io.fabric8.kubernetes.api.model.{Container, Pod} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} +import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} import org.apache.spark.deploy.k8s.integrationtest.config._ +import org.apache.spark.launcher.SparkLauncher private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter { @@ -109,6 +109,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite runSparkPiAndVerifyCompletion() } + test("Use SparkLauncher.NO_RESOURCE") { + sparkAppConf.setJars(Seq(containerLocalSparkDistroExamplesJar)) + runSparkPiAndVerifyCompletion( + appResource = SparkLauncher.NO_RESOURCE) + } + test("Run SparkPi with a master URL without a scheme.") { val url = kubernetesTestComponents.kubernetesClient.getMasterUrl val k8sMasterUrl = if (url.getPort < 0) {