From a6ddc9d08352540e492c8c7aa3b602c3e5902538 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 27 Feb 2019 09:49:31 -0800 Subject: [PATCH] [SPARK-24736][K8S] Let spark-submit handle dependency resolution. Before this change, there was some code in the k8s backend to deal with how to resolve dependencies and make them available to the Spark application. It turns out that none of that code is necessary, since spark-submit already handles all that for applications started in client mode - like the k8s driver that is run inside a Spark-created pod. For that reason, specifically for pyspark, there's no need for the k8s backend to deal with PYTHONPATH; or, in general, to change the URIs provided by the user at all. spark-submit takes care of that. For testing, I created a pyspark script that depends on another module that is shipped with --py-files. Then I used: - --py-files http://.../dep.py http://.../test.py - --py-files http://.../dep.zip http://.../test.py - --py-files local:/.../dep.py local:/.../test.py - --py-files local:/.../dep.zip local:/.../test.py Without this change, all of the above commands fail. With the change, the driver is able to see the dependencies in all the above cases; but executors don't see the dependencies in the last two. That's a bug in shared Spark code that deals with local: dependencies in pyspark (SPARK-26934). I also tested a Scala app using the main jar from an http server. Closes #23793 from vanzin/SPARK-24736. Authored-by: Marcelo Vanzin Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/deploy/SparkSubmit.scala | 6 +- .../apache/spark/deploy/k8s/Constants.scala | 1 - .../spark/deploy/k8s/KubernetesConf.scala | 10 +-- .../spark/deploy/k8s/KubernetesUtils.scala | 20 ------ .../k8s/features/BasicDriverFeatureStep.scala | 9 --- .../features/DriverCommandFeatureStep.scala | 67 +++++-------------- .../submit/KubernetesClientApplication.scala | 13 +--- .../deploy/k8s/KubernetesConfSuite.scala | 3 +- .../spark/deploy/k8s/KubernetesTestConf.scala | 3 +- .../BasicDriverFeatureStepSuite.scala | 23 ------- .../DriverCommandFeatureStepSuite.scala | 64 ++++-------------- .../spark/bindings/python/Dockerfile | 1 - .../src/main/dockerfiles/spark/entrypoint.sh | 4 -- 13 files changed, 38 insertions(+), 186 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index f4d9fe0663..2843bd5b33 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -520,6 +520,7 @@ private[spark] class SparkSubmit extends Logging { confKey = PRINCIPAL.key), OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = KEYTAB.key), + OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key), // Propagate attributes for dependency resolution at the driver side OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"), @@ -694,9 +695,6 @@ private[spark] class SparkSubmit extends Logging { if (args.isPython) { childArgs ++= Array("--primary-py-file", args.primaryResource) childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner") - if (args.pyFiles != null) { - childArgs ++= Array("--other-py-files", args.pyFiles) - } } else if (args.isR) { childArgs ++= Array("--primary-r-file", args.primaryResource) childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner") @@ -744,7 +742,7 @@ private[spark] class SparkSubmit extends Logging { // explicitly sets `spark.submit.pyFiles` in his/her default properties file. val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES) val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(",")) - val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) { + val formattedPyFiles = if (deployMode != CLUSTER) { PythonRunner.formatPaths(resolvedPyFiles).mkString(",") } else { // Ignoring formatting python path in yarn and mesos cluster mode, these two modes diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 76041e7de5..a3c74ff7b2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -69,7 +69,6 @@ private[spark] object Constants { val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" // BINDINGS - val ENV_PYSPARK_FILES = "PYSPARK_FILES" val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION" // Pod spec templates diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 6febad981a..4a63ea9a86 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -73,8 +73,7 @@ private[spark] class KubernetesDriverConf( val appId: String, val mainAppResource: MainAppResource, val mainClass: String, - val appArgs: Array[String], - val pyFiles: Seq[String]) + val appArgs: Array[String]) extends KubernetesConf(sparkConf) { override val resourceNamePrefix: String = { @@ -175,14 +174,11 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: MainAppResource, mainClass: String, - appArgs: Array[String], - maybePyFiles: Option[String]): KubernetesDriverConf = { + appArgs: Array[String]): KubernetesDriverConf = { // Parse executor volumes in order to verify configuration before the driver pod is created. KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) - val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) - new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, mainClass, appArgs, - pyFiles) + new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, mainClass, appArgs) } def createExecutorConf( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 6fafac3ee1..b3f58b0323 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -66,26 +66,6 @@ private[spark] object KubernetesUtils extends Logging { opt2.foreach { _ => require(opt1.isEmpty, errMessage) } } - /** - * For the given collection of file URIs, resolves them as follows: - * - File URIs with scheme local:// resolve to just the path of the URI. - * - Otherwise, the URIs are returned as-is. - */ - def resolveFileUrisAndPath(fileUris: Iterable[String]): Iterable[String] = { - fileUris.map { uri => - resolveFileUri(uri) - } - } - - def resolveFileUri(uri: String): String = { - val fileUri = Utils.resolveURI(uri) - val fileScheme = Option(fileUri.getScheme).getOrElse("file") - fileScheme match { - case "local" => fileUri.getPath - case _ => uri - } - } - def loadPodFromTemplate( kubernetesClient: KubernetesClient, templateFile: File, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index e664b647bd..17c00eb7c3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -153,15 +153,6 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix, KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) - - Seq(JARS, FILES).foreach { key => - val value = conf.get(key) - val resolved = KubernetesUtils.resolveFileUrisAndPath(value) - if (resolved.nonEmpty) { - additionalProps.put(key.key, resolved.mkString(",")) - } - } - additionalProps.toMap } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index bd3f8a1681..9c9cd1e1ac 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -37,8 +37,8 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) override def configurePod(pod: SparkPod): SparkPod = { conf.mainAppResource match { - case JavaMainAppResource(_) => - configureForJava(pod) + case JavaMainAppResource(res) => + configureForJava(pod, res.getOrElse(SparkLauncher.NO_RESOURCE)) case PythonMainAppResource(res) => configureForPython(pod, res) @@ -49,45 +49,33 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) } override def getAdditionalPodSystemProperties(): Map[String, String] = { - conf.mainAppResource match { - case JavaMainAppResource(res) => - res.map(additionalJavaProperties).getOrElse(Map.empty) + val appType = conf.mainAppResource match { + case JavaMainAppResource(_) => + APP_RESOURCE_TYPE_JAVA - case PythonMainAppResource(res) => - additionalPythonProperties(res) + case PythonMainAppResource(_) => + APP_RESOURCE_TYPE_PYTHON - case RMainAppResource(res) => - additionalRProperties(res) + case RMainAppResource(_) => + APP_RESOURCE_TYPE_R } + + Map(APP_RESOURCE_TYPE.key -> appType) } - private def configureForJava(pod: SparkPod): SparkPod = { - // The user application jar is merged into the spark.jars list and managed through that - // property, so use a "blank" resource for the Java driver. - val driverContainer = baseDriverContainer(pod, SparkLauncher.NO_RESOURCE).build() + private def configureForJava(pod: SparkPod, res: String): SparkPod = { + val driverContainer = baseDriverContainer(pod, res).build() SparkPod(pod.pod, driverContainer) } private def configureForPython(pod: SparkPod, res: String): SparkPod = { - val maybePythonFiles = if (conf.pyFiles.nonEmpty) { - // Delineation by ":" is to append the PySpark Files to the PYTHONPATH - // of the respective PySpark pod - val resolved = KubernetesUtils.resolveFileUrisAndPath(conf.pyFiles) - Some(new EnvVarBuilder() - .withName(ENV_PYSPARK_FILES) - .withValue(resolved.mkString(":")) - .build()) - } else { - None - } val pythonEnvs = Seq(new EnvVarBuilder() .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) .withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION)) - .build()) ++ - maybePythonFiles + .build()) - val pythonContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)) + val pythonContainer = baseDriverContainer(pod, res) .addAllToEnv(pythonEnvs.asJava) .build() @@ -95,7 +83,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) } private def configureForR(pod: SparkPod, res: String): SparkPod = { - val rContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)).build() + val rContainer = baseDriverContainer(pod, res).build() SparkPod(pod.pod, rContainer) } @@ -107,27 +95,4 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) .addToArgs(resource) .addToArgs(conf.appArgs: _*) } - - private def additionalJavaProperties(resource: String): Map[String, String] = { - resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList(JARS, Seq(resource)) - } - - private def additionalPythonProperties(resource: String): Map[String, String] = { - resourceType(APP_RESOURCE_TYPE_PYTHON) ++ - mergeFileList(FILES, Seq(resource) ++ conf.pyFiles) - } - - private def additionalRProperties(resource: String): Map[String, String] = { - resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList(FILES, Seq(resource)) - } - - private def mergeFileList(key: ConfigEntry[Seq[String]], filesToAdd: Seq[String]) - : Map[String, String] = { - val existing = conf.get(key) - Map(key.key -> (existing ++ filesToAdd).distinct.mkString(",")) - } - - private def resourceType(resType: String): Map[String, String] = { - Map(APP_RESOURCE_TYPE.key -> resType) - } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 3888778bf8..042012e9c7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -39,13 +39,11 @@ import org.apache.spark.util.Utils * @param mainAppResource the main application resource if any * @param mainClass the main class of the application to run * @param driverArgs arguments to the driver - * @param maybePyFiles additional Python files via --py-files */ private[spark] case class ClientArguments( mainAppResource: MainAppResource, mainClass: String, - driverArgs: Array[String], - maybePyFiles: Option[String]) + driverArgs: Array[String]) private[spark] object ClientArguments { @@ -53,7 +51,6 @@ private[spark] object ClientArguments { var mainAppResource: MainAppResource = JavaMainAppResource(None) var mainClass: Option[String] = None val driverArgs = mutable.ArrayBuffer.empty[String] - var maybePyFiles : Option[String] = None args.sliding(2, 2).toList.foreach { case Array("--primary-java-resource", primaryJavaResource: String) => @@ -62,8 +59,6 @@ private[spark] object ClientArguments { mainAppResource = PythonMainAppResource(primaryPythonResource) case Array("--primary-r-file", primaryRFile: String) => mainAppResource = RMainAppResource(primaryRFile) - case Array("--other-py-files", pyFiles: String) => - maybePyFiles = Some(pyFiles) case Array("--main-class", clazz: String) => mainClass = Some(clazz) case Array("--arg", arg: String) => @@ -78,8 +73,7 @@ private[spark] object ClientArguments { ClientArguments( mainAppResource, mainClass.get, - driverArgs.toArray, - maybePyFiles) + driverArgs.toArray) } } @@ -214,8 +208,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { kubernetesAppId, clientArguments.mainAppResource, clientArguments.mainClass, - clientArguments.driverArgs, - clientArguments.maybePyFiles) + clientArguments.driverArgs) // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index f4d40b0b35..d51b1e661b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -69,8 +69,7 @@ class KubernetesConfSuite extends SparkFunSuite { KubernetesTestConf.APP_ID, JavaMainAppResource(None), KubernetesTestConf.MAIN_CLASS, - APP_ARGS, - None) + APP_ARGS) assert(conf.labels === Map( SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala index 1d77a6d181..ee830a91f3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala @@ -44,7 +44,6 @@ object KubernetesTestConf { mainAppResource: MainAppResource = JavaMainAppResource(None), mainClass: String = MAIN_CLASS, appArgs: Array[String] = Array.empty, - pyFiles: Seq[String] = Nil, resourceNamePrefix: Option[String] = None, labels: Map[String, String] = Map.empty, environment: Map[String, String] = Map.empty, @@ -64,7 +63,7 @@ object KubernetesTestConf { setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, secretEnvNamesToKeyRefs) setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes) - new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, pyFiles) + new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs) } // scalastyle:on argcount diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index ccf88cc53f..7cfc4d2774 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -137,29 +137,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(configuredPythonPod.container.getImage === "spark-driver-py:latest") } - test("Additional system properties resolve jars and set cluster-mode confs.") { - val allJars = Seq("local:///opt/spark/jar1.jar", "hdfs:///opt/spark/jar2.jar") - val allFiles = Seq("https://localhost:9000/file1.txt", "local:///opt/spark/file2.txt") - val sparkConf = new SparkConf() - .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") - .setJars(allJars) - .set(FILES, allFiles) - .set(CONTAINER_IMAGE, "spark-driver:latest") - val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) - - val step = new BasicDriverFeatureStep(kubernetesConf) - val additionalProperties = step.getAdditionalPodSystemProperties() - val expectedSparkConf = Map( - KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", - "spark.app.id" -> KubernetesTestConf.APP_ID, - KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> kubernetesConf.resourceNamePrefix, - "spark.kubernetes.submitInDriver" -> "true", - JARS.key -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", - FILES.key -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt", - MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) - assert(additionalProperties === expectedSparkConf) - } - // Memory overhead tests. Tuples are: // test name, main resource, overhead factor, expected factor Seq( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala index f74ac92802..de80c5614c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala @@ -23,12 +23,13 @@ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils class DriverCommandFeatureStepSuite extends SparkFunSuite { test("java resource") { - val mainResource = "local:///main.jar" + val mainResource = "local:/main.jar" val spec = applyFeatureStep( JavaMainAppResource(Some(mainResource)), appArgs = Array("5", "7")) @@ -36,72 +37,33 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { "driver", "--properties-file", SPARK_CONF_PATH, "--class", KubernetesTestConf.MAIN_CLASS, - "spark-internal", "5", "7")) - - val jars = Utils.stringToSeq(spec.systemProperties("spark.jars")) - assert(jars.toSet === Set(mainResource)) + mainResource, "5", "7")) } - test("python resource with no extra files") { - val mainResource = "local:///main.py" + test("python resource") { + val mainResource = "local:/main.py" val sparkConf = new SparkConf(false) - .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") - - val spec = applyFeatureStep( - PythonMainAppResource(mainResource), - conf = sparkConf) - assert(spec.pod.container.getArgs.asScala === List( - "driver", - "--properties-file", SPARK_CONF_PATH, - "--class", KubernetesTestConf.MAIN_CLASS, - "/main.py")) - val envs = spec.pod.container.getEnv.asScala - .map { env => (env.getName, env.getValue) } - .toMap - assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3") - - val files = Utils.stringToSeq(spec.systemProperties("spark.files")) - assert(files.toSet === Set(mainResource)) - } - - test("python resource with extra files") { - val expectedMainResource = "/main.py" - val expectedPySparkFiles = "/example2.py:/example3.py" - val filesInConf = Set("local:///example.py") - - val mainResource = s"local://$expectedMainResource" - val pyFiles = Seq("local:///example2.py", "local:///example3.py") - - val sparkConf = new SparkConf(false) - .set("spark.files", filesInConf.mkString(",")) .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") val spec = applyFeatureStep( PythonMainAppResource(mainResource), conf = sparkConf, - appArgs = Array("5", "7", "9"), - pyFiles = pyFiles) + appArgs = Array("5", "7", "9")) assert(spec.pod.container.getArgs.asScala === List( "driver", "--properties-file", SPARK_CONF_PATH, "--class", KubernetesTestConf.MAIN_CLASS, - "/main.py", "5", "7", "9")) + mainResource, "5", "7", "9")) val envs = spec.pod.container.getEnv.asScala .map { env => (env.getName, env.getValue) } .toMap - val expected = Map( - ENV_PYSPARK_FILES -> expectedPySparkFiles, - ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2") + val expected = Map(ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2") assert(envs === expected) - - val files = Utils.stringToSeq(spec.systemProperties("spark.files")) - assert(files.toSet === pyFiles.toSet ++ filesInConf ++ Set(mainResource)) } test("R resource") { - val expectedMainResource = "/main.R" - val mainResource = s"local://$expectedMainResource" + val mainResource = "local:/main.R" val spec = applyFeatureStep( RMainAppResource(mainResource), @@ -111,19 +73,17 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { "driver", "--properties-file", SPARK_CONF_PATH, "--class", KubernetesTestConf.MAIN_CLASS, - "/main.R", "5", "7", "9")) + mainResource, "5", "7", "9")) } private def applyFeatureStep( resource: MainAppResource, conf: SparkConf = new SparkConf(false), - appArgs: Array[String] = Array(), - pyFiles: Seq[String] = Nil): KubernetesDriverSpec = { + appArgs: Array[String] = Array()): KubernetesDriverSpec = { val kubernetesConf = KubernetesTestConf.createDriverConf( sparkConf = conf, mainAppResource = resource, - appArgs = appArgs, - pyFiles = pyFiles) + appArgs = appArgs) val step = new DriverCommandFeatureStep(kubernetesConf) val pod = step.configurePod(SparkPod.initialPod()) val props = step.getAdditionalPodSystemProperties() diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile index 5044900d1d..8237c92232 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile @@ -39,7 +39,6 @@ RUN apk add --no-cache python && \ COPY python/pyspark ${SPARK_HOME}/python/pyspark COPY python/lib ${SPARK_HOME}/python/lib -ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip WORKDIR /opt/spark/work-dir ENTRYPOINT [ "/opt/entrypoint.sh" ] diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 613febca7e..2097fb8865 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -44,10 +44,6 @@ if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH" fi -if [ -n "$PYSPARK_FILES" ]; then - PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES" -fi - if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then pyv="$(python -V 2>&1)" export PYTHON_VERSION="${pyv:7}"