diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index f97bf67fa5..0379adeb07 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -158,8 +158,9 @@ object SparkSubmit { args.files = mergeFileLists(args.files, args.primaryResource) } args.files = mergeFileLists(args.files, args.pyFiles) - // Format python file paths properly before adding them to the PYTHONPATH - sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",") + if (args.pyFiles != null) { + sysProps("spark.submit.pyFiles") = args.pyFiles + } } // Special flag to avoid deprecation warnings at the client @@ -284,6 +285,29 @@ object SparkSubmit { sysProps.getOrElseUpdate(k, v) } + // Resolve paths in certain spark properties + val pathConfigs = Seq( + "spark.jars", + "spark.files", + "spark.yarn.jar", + "spark.yarn.dist.files", + "spark.yarn.dist.archives") + pathConfigs.foreach { config => + // Replace old URIs with resolved URIs, if they exist + sysProps.get(config).foreach { oldValue => + sysProps(config) = Utils.resolveURIs(oldValue) + } + } + + // Resolve and format python file paths properly before adding them to the PYTHONPATH. + // The resolving part is redundant in the case of --py-files, but necessary if the user + // explicitly sets `spark.submit.pyFiles` in his/her default properties file. + sysProps.get("spark.submit.pyFiles").foreach { pyFiles => + val resolvedPyFiles = Utils.resolveURIs(pyFiles) + val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",") + sysProps("spark.submit.pyFiles") = formattedPyFiles + } + (childArgs, childClasspath, sysProps, childMainClass) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 1cdf50d5c0..d8cd0ff2c9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -292,7 +292,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { runSparkSubmit(args) } - test("spark submit includes jars passed in through --jar") { + test("includes jars passed in through --jars") { val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA")) val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB")) @@ -306,6 +306,110 @@ class SparkSubmitSuite extends FunSuite with Matchers { runSparkSubmit(args) } + test("resolves command line argument paths correctly") { + val jars = "/jar1,/jar2" // --jars + val files = "hdfs:/file1,file2" // --files + val archives = "file:/archive1,archive2" // --archives + val pyFiles = "py-file1,py-file2" // --py-files + + // Test jars and files + val clArgs = Seq( + "--master", "local", + "--class", "org.SomeClass", + "--jars", jars, + "--files", files, + "thejar.jar") + val appArgs = new SparkSubmitArguments(clArgs) + val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3 + appArgs.jars should be (Utils.resolveURIs(jars)) + appArgs.files should be (Utils.resolveURIs(files)) + sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar")) + sysProps("spark.files") should be (Utils.resolveURIs(files)) + + // Test files and archives (Yarn) + val clArgs2 = Seq( + "--master", "yarn-client", + "--class", "org.SomeClass", + "--files", files, + "--archives", archives, + "thejar.jar" + ) + val appArgs2 = new SparkSubmitArguments(clArgs2) + val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3 + appArgs2.files should be (Utils.resolveURIs(files)) + appArgs2.archives should be (Utils.resolveURIs(archives)) + sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files)) + sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives)) + + // Test python files + val clArgs3 = Seq( + "--master", "local", + "--py-files", pyFiles, + "mister.py" + ) + val appArgs3 = new SparkSubmitArguments(clArgs3) + val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3 + appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles)) + sysProps3("spark.submit.pyFiles") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) + } + + test("resolves config paths correctly") { + val jars = "/jar1,/jar2" // spark.jars + val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files + val archives = "file:/archive1,archive2" // spark.yarn.dist.archives + val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles + + // Test jars and files + val f1 = File.createTempFile("test-submit-jars-files", "") + val writer1 = new PrintWriter(f1) + writer1.println("spark.jars " + jars) + writer1.println("spark.files " + files) + writer1.close() + val clArgs = Seq( + "--master", "local", + "--class", "org.SomeClass", + "--properties-file", f1.getPath, + "thejar.jar" + ) + val appArgs = new SparkSubmitArguments(clArgs) + val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3 + sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar")) + sysProps("spark.files") should be(Utils.resolveURIs(files)) + + // Test files and archives (Yarn) + val f2 = File.createTempFile("test-submit-files-archives", "") + val writer2 = new PrintWriter(f2) + writer2.println("spark.yarn.dist.files " + files) + writer2.println("spark.yarn.dist.archives " + archives) + writer2.close() + val clArgs2 = Seq( + "--master", "yarn-client", + "--class", "org.SomeClass", + "--properties-file", f2.getPath, + "thejar.jar" + ) + val appArgs2 = new SparkSubmitArguments(clArgs2) + val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3 + sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files)) + sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives)) + + // Test python files + val f3 = File.createTempFile("test-submit-python-files", "") + val writer3 = new PrintWriter(f3) + writer3.println("spark.submit.pyFiles " + pyFiles) + writer3.close() + val clArgs3 = Seq( + "--master", "local", + "--properties-file", f3.getPath, + "mister.py" + ) + val appArgs3 = new SparkSubmitArguments(clArgs3) + val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3 + sysProps3("spark.submit.pyFiles") should be( + PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) + } + test("SPARK_CONF_DIR overrides spark-defaults.conf") { forConfDir(Map("spark.executor.memory" -> "2.3g")) { path => val unusedJar = TestUtils.createJarWithClasses(Seq.empty) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 1c112334cc..8ffe3e2b13 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -217,9 +217,14 @@ class UtilsSuite extends FunSuite { test("resolveURI") { def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = { - assume(before.split(",").length == 1) - assert(Utils.resolveURI(before, testWindows) === new URI(after)) - assert(Utils.resolveURI(after, testWindows) === new URI(after)) + // This should test only single paths + assume(before.split(",").length === 1) + // Repeated invocations of resolveURI should yield the same result + def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString + assert(resolve(after) === after) + assert(resolve(resolve(after)) === after) + assert(resolve(resolve(resolve(after))) === after) + // Also test resolveURIs with single paths assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after)) assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after)) } @@ -235,16 +240,27 @@ class UtilsSuite extends FunSuite { assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true) intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") } intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") } + } - // Test resolving comma-delimited paths - assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2") - assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2") - assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") === - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3") - assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") === + test("resolveURIs with multiple paths") { + def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = { + assume(before.split(",").length > 1) + assert(Utils.resolveURIs(before, testWindows) === after) + assert(Utils.resolveURIs(after, testWindows) === after) + // Repeated invocations of resolveURIs should yield the same result + def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows) + assert(resolve(after) === after) + assert(resolve(resolve(after)) === after) + assert(resolve(resolve(resolve(after))) === after) + } + val cwd = System.getProperty("user.dir") + assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2") + assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2") + assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3") + assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5") - assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) === - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi") + assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", + s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true) } test("nonLocalPaths") {