[SPARK-3319] [SPARK-3338] Resolve Spark submit config paths
The bulk of this PR is comprised of tests. All changes in functionality are made in `SparkSubmit.scala` (~20 lines). **SPARK-3319.** There is currently a divergence in behavior when the user passes in additional jars through `--jars` and through setting `spark.jars` in the default properties file. The former will happily resolve the paths (e.g. convert `my.jar` to `file:/absolute/path/to/my.jar`), while the latter does not. We should resolve paths consistently in both cases. This also applies to the following pairs of command line arguments and Spark configs: - `--jars` ~ `spark.jars` - `--files` ~ `spark.files` / `spark.yarn.dist.files` - `--archives` ~ `spark.yarn.dist.archives` - `--py-files` ~ `spark.submit.pyFiles` **SPARK-3338.** This PR also fixes the following bug: if the user sets `spark.submit.pyFiles` in his/her properties file, it does not actually get picked up even if `--py-files` is not set. This is simply because the config is overridden by an empty string. Author: Andrew Or <andrewor14@gmail.com> Author: Andrew Or <andrew@databricks.com> Closes #2232 from andrewor14/resolve-config-paths and squashes the following commits: fff2869 [Andrew Or] Add spark.yarn.jar da3a1c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths f0fae64 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths 05e03d6 [Andrew Or] Add tests for resolving both command line and config paths 460117e [Andrew Or] Resolve config paths properly fe039d3 [Andrew Or] Beef up tests to test fixed-pointed-ness of Utils.resolveURI(s)
This commit is contained in:
parent
9142c9b80b
commit
24c5129257
|
@ -158,8 +158,9 @@ object SparkSubmit {
|
||||||
args.files = mergeFileLists(args.files, args.primaryResource)
|
args.files = mergeFileLists(args.files, args.primaryResource)
|
||||||
}
|
}
|
||||||
args.files = mergeFileLists(args.files, args.pyFiles)
|
args.files = mergeFileLists(args.files, args.pyFiles)
|
||||||
// Format python file paths properly before adding them to the PYTHONPATH
|
if (args.pyFiles != null) {
|
||||||
sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
|
sysProps("spark.submit.pyFiles") = args.pyFiles
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Special flag to avoid deprecation warnings at the client
|
// Special flag to avoid deprecation warnings at the client
|
||||||
|
@ -284,6 +285,29 @@ object SparkSubmit {
|
||||||
sysProps.getOrElseUpdate(k, v)
|
sysProps.getOrElseUpdate(k, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Resolve paths in certain spark properties
|
||||||
|
val pathConfigs = Seq(
|
||||||
|
"spark.jars",
|
||||||
|
"spark.files",
|
||||||
|
"spark.yarn.jar",
|
||||||
|
"spark.yarn.dist.files",
|
||||||
|
"spark.yarn.dist.archives")
|
||||||
|
pathConfigs.foreach { config =>
|
||||||
|
// Replace old URIs with resolved URIs, if they exist
|
||||||
|
sysProps.get(config).foreach { oldValue =>
|
||||||
|
sysProps(config) = Utils.resolveURIs(oldValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve and format python file paths properly before adding them to the PYTHONPATH.
|
||||||
|
// The resolving part is redundant in the case of --py-files, but necessary if the user
|
||||||
|
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
|
||||||
|
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
|
||||||
|
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
|
||||||
|
val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
|
||||||
|
sysProps("spark.submit.pyFiles") = formattedPyFiles
|
||||||
|
}
|
||||||
|
|
||||||
(childArgs, childClasspath, sysProps, childMainClass)
|
(childArgs, childClasspath, sysProps, childMainClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -292,7 +292,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
|
||||||
runSparkSubmit(args)
|
runSparkSubmit(args)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("spark submit includes jars passed in through --jar") {
|
test("includes jars passed in through --jars") {
|
||||||
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
|
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
|
||||||
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
|
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
|
||||||
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
|
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
|
||||||
|
@ -306,6 +306,110 @@ class SparkSubmitSuite extends FunSuite with Matchers {
|
||||||
runSparkSubmit(args)
|
runSparkSubmit(args)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("resolves command line argument paths correctly") {
|
||||||
|
val jars = "/jar1,/jar2" // --jars
|
||||||
|
val files = "hdfs:/file1,file2" // --files
|
||||||
|
val archives = "file:/archive1,archive2" // --archives
|
||||||
|
val pyFiles = "py-file1,py-file2" // --py-files
|
||||||
|
|
||||||
|
// Test jars and files
|
||||||
|
val clArgs = Seq(
|
||||||
|
"--master", "local",
|
||||||
|
"--class", "org.SomeClass",
|
||||||
|
"--jars", jars,
|
||||||
|
"--files", files,
|
||||||
|
"thejar.jar")
|
||||||
|
val appArgs = new SparkSubmitArguments(clArgs)
|
||||||
|
val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
|
||||||
|
appArgs.jars should be (Utils.resolveURIs(jars))
|
||||||
|
appArgs.files should be (Utils.resolveURIs(files))
|
||||||
|
sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
|
||||||
|
sysProps("spark.files") should be (Utils.resolveURIs(files))
|
||||||
|
|
||||||
|
// Test files and archives (Yarn)
|
||||||
|
val clArgs2 = Seq(
|
||||||
|
"--master", "yarn-client",
|
||||||
|
"--class", "org.SomeClass",
|
||||||
|
"--files", files,
|
||||||
|
"--archives", archives,
|
||||||
|
"thejar.jar"
|
||||||
|
)
|
||||||
|
val appArgs2 = new SparkSubmitArguments(clArgs2)
|
||||||
|
val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
|
||||||
|
appArgs2.files should be (Utils.resolveURIs(files))
|
||||||
|
appArgs2.archives should be (Utils.resolveURIs(archives))
|
||||||
|
sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
|
||||||
|
sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))
|
||||||
|
|
||||||
|
// Test python files
|
||||||
|
val clArgs3 = Seq(
|
||||||
|
"--master", "local",
|
||||||
|
"--py-files", pyFiles,
|
||||||
|
"mister.py"
|
||||||
|
)
|
||||||
|
val appArgs3 = new SparkSubmitArguments(clArgs3)
|
||||||
|
val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
|
||||||
|
appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
|
||||||
|
sysProps3("spark.submit.pyFiles") should be (
|
||||||
|
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
|
||||||
|
}
|
||||||
|
|
||||||
|
test("resolves config paths correctly") {
|
||||||
|
val jars = "/jar1,/jar2" // spark.jars
|
||||||
|
val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files
|
||||||
|
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
|
||||||
|
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
|
||||||
|
|
||||||
|
// Test jars and files
|
||||||
|
val f1 = File.createTempFile("test-submit-jars-files", "")
|
||||||
|
val writer1 = new PrintWriter(f1)
|
||||||
|
writer1.println("spark.jars " + jars)
|
||||||
|
writer1.println("spark.files " + files)
|
||||||
|
writer1.close()
|
||||||
|
val clArgs = Seq(
|
||||||
|
"--master", "local",
|
||||||
|
"--class", "org.SomeClass",
|
||||||
|
"--properties-file", f1.getPath,
|
||||||
|
"thejar.jar"
|
||||||
|
)
|
||||||
|
val appArgs = new SparkSubmitArguments(clArgs)
|
||||||
|
val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
|
||||||
|
sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
|
||||||
|
sysProps("spark.files") should be(Utils.resolveURIs(files))
|
||||||
|
|
||||||
|
// Test files and archives (Yarn)
|
||||||
|
val f2 = File.createTempFile("test-submit-files-archives", "")
|
||||||
|
val writer2 = new PrintWriter(f2)
|
||||||
|
writer2.println("spark.yarn.dist.files " + files)
|
||||||
|
writer2.println("spark.yarn.dist.archives " + archives)
|
||||||
|
writer2.close()
|
||||||
|
val clArgs2 = Seq(
|
||||||
|
"--master", "yarn-client",
|
||||||
|
"--class", "org.SomeClass",
|
||||||
|
"--properties-file", f2.getPath,
|
||||||
|
"thejar.jar"
|
||||||
|
)
|
||||||
|
val appArgs2 = new SparkSubmitArguments(clArgs2)
|
||||||
|
val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
|
||||||
|
sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
|
||||||
|
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
|
||||||
|
|
||||||
|
// Test python files
|
||||||
|
val f3 = File.createTempFile("test-submit-python-files", "")
|
||||||
|
val writer3 = new PrintWriter(f3)
|
||||||
|
writer3.println("spark.submit.pyFiles " + pyFiles)
|
||||||
|
writer3.close()
|
||||||
|
val clArgs3 = Seq(
|
||||||
|
"--master", "local",
|
||||||
|
"--properties-file", f3.getPath,
|
||||||
|
"mister.py"
|
||||||
|
)
|
||||||
|
val appArgs3 = new SparkSubmitArguments(clArgs3)
|
||||||
|
val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
|
||||||
|
sysProps3("spark.submit.pyFiles") should be(
|
||||||
|
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
|
||||||
|
}
|
||||||
|
|
||||||
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
|
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
|
||||||
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
|
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
|
||||||
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
|
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
|
||||||
|
|
|
@ -217,9 +217,14 @@ class UtilsSuite extends FunSuite {
|
||||||
|
|
||||||
test("resolveURI") {
|
test("resolveURI") {
|
||||||
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
|
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
|
||||||
assume(before.split(",").length == 1)
|
// This should test only single paths
|
||||||
assert(Utils.resolveURI(before, testWindows) === new URI(after))
|
assume(before.split(",").length === 1)
|
||||||
assert(Utils.resolveURI(after, testWindows) === new URI(after))
|
// Repeated invocations of resolveURI should yield the same result
|
||||||
|
def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString
|
||||||
|
assert(resolve(after) === after)
|
||||||
|
assert(resolve(resolve(after)) === after)
|
||||||
|
assert(resolve(resolve(resolve(after))) === after)
|
||||||
|
// Also test resolveURIs with single paths
|
||||||
assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
|
assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
|
||||||
assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
|
assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
|
||||||
}
|
}
|
||||||
|
@ -235,16 +240,27 @@ class UtilsSuite extends FunSuite {
|
||||||
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
|
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
|
||||||
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
|
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
|
||||||
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
|
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
|
||||||
|
}
|
||||||
|
|
||||||
// Test resolving comma-delimited paths
|
test("resolveURIs with multiple paths") {
|
||||||
assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
|
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
|
||||||
assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
|
assume(before.split(",").length > 1)
|
||||||
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
|
assert(Utils.resolveURIs(before, testWindows) === after)
|
||||||
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
|
assert(Utils.resolveURIs(after, testWindows) === after)
|
||||||
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
|
// Repeated invocations of resolveURIs should yield the same result
|
||||||
|
def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows)
|
||||||
|
assert(resolve(after) === after)
|
||||||
|
assert(resolve(resolve(after)) === after)
|
||||||
|
assert(resolve(resolve(resolve(after))) === after)
|
||||||
|
}
|
||||||
|
val cwd = System.getProperty("user.dir")
|
||||||
|
assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2")
|
||||||
|
assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
|
||||||
|
assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
|
||||||
|
assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5",
|
||||||
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
|
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
|
||||||
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) ===
|
assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi",
|
||||||
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
|
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("nonLocalPaths") {
|
test("nonLocalPaths") {
|
||||||
|
|
Loading…
Reference in a new issue