[SPARK-18099][YARN] Fail if same files added to distributed cache for --files and --archives
## What changes were proposed in this pull request? During spark-submit, if yarn dist cache is instructed to add same file under --files and --archives, This code change ensures the spark yarn distributed cache behaviour is retained i.e. to warn and fail if same files is mentioned in both --files and --archives. ## How was this patch tested? Manually tested: 1. if same jar is mentioned in --jars and --files it will continue to submit the job. - basically functionality [SPARK-14423] #12203 is unchanged 1. if same file is mentioned in --files and --archives it will fail to submit the job. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. … under archives and files Author: Kishor Patil <kpatil@yahoo-inc.com> Closes #15627 from kishorvpatil/spark18099.
This commit is contained in:
parent
16293311cd
commit
098e4ca9c7
|
@ -598,8 +598,16 @@ private[spark] class Client(
|
|||
).foreach { case (flist, resType, addToClasspath) =>
|
||||
flist.foreach { file =>
|
||||
val (_, localizedPath) = distribute(file, resType = resType)
|
||||
if (addToClasspath && localizedPath != null) {
|
||||
cachedSecondaryJarLinks += localizedPath
|
||||
// If addToClassPath, we ignore adding jar multiple times to distitrbuted cache.
|
||||
if (addToClasspath) {
|
||||
if (localizedPath != null) {
|
||||
cachedSecondaryJarLinks += localizedPath
|
||||
}
|
||||
} else {
|
||||
if (localizedPath != null) {
|
||||
throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" +
|
||||
" to the distributed cache.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -282,6 +282,48 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
|
|||
}
|
||||
}
|
||||
|
||||
test("distribute archive multiple times") {
|
||||
val libs = Utils.createTempDir()
|
||||
// Create jars dir and RELEASE file to avoid IllegalStateException.
|
||||
val jarsDir = new File(libs, "jars")
|
||||
assert(jarsDir.mkdir())
|
||||
new FileOutputStream(new File(libs, "RELEASE")).close()
|
||||
|
||||
val userLib1 = Utils.createTempDir()
|
||||
val testJar = TestUtils.createJarWithFiles(Map(), userLib1)
|
||||
|
||||
// Case 1: FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files
|
||||
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
|
||||
.set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath))
|
||||
.set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath))
|
||||
|
||||
val client = createClient(sparkConf)
|
||||
val tempDir = Utils.createTempDir()
|
||||
intercept[IllegalArgumentException] {
|
||||
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
|
||||
}
|
||||
|
||||
// Case 2: FILES_TO_DISTRIBUTE can't have duplicate files.
|
||||
val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
|
||||
.set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))
|
||||
|
||||
val clientFiles = createClient(sparkConfFiles)
|
||||
val tempDirForFiles = Utils.createTempDir()
|
||||
intercept[IllegalArgumentException] {
|
||||
clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil)
|
||||
}
|
||||
|
||||
// Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files.
|
||||
val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
|
||||
.set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))
|
||||
|
||||
val clientArchives = createClient(sparkConfArchives)
|
||||
val tempDirForArchives = Utils.createTempDir()
|
||||
intercept[IllegalArgumentException] {
|
||||
clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil)
|
||||
}
|
||||
}
|
||||
|
||||
test("distribute local spark jars") {
|
||||
val temp = Utils.createTempDir()
|
||||
val jarsDir = new File(temp, "jars")
|
||||
|
|
Loading…
Reference in a new issue