[SPARK-14836][YARN] Zip all the jars before uploading to distributed cache
## What changes were proposed in this pull request? <copy form JIRA> Currently if neither `spark.yarn.jars` nor `spark.yarn.archive` is set (by default), Spark on yarn code will upload all the jars in the folder separately into distributed cache, this is quite time consuming, and very verbose, instead of upload jars separately into distributed cache, here changes to zip all the jars first, and then put into distributed cache. This will significantly improve the speed of starting time. ## How was this patch tested? Unit test and local integrated test is done. Verified with SparkPi both in spark cluster and client mode. Author: jerryshao <sshao@hortonworks.com> Closes #12597 from jerryshao/SPARK-14836.
This commit is contained in:
parent
4f4721a21c
commit
2398e3d69c
|
@ -496,11 +496,26 @@ private[spark] class Client(
|
|||
"to uploading libraries under SPARK_HOME.")
|
||||
val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
|
||||
sparkConf.getenv("SPARK_HOME")))
|
||||
val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
|
||||
new File(Utils.getLocalDir(sparkConf)))
|
||||
val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
|
||||
|
||||
try {
|
||||
jarsStream.setLevel(0)
|
||||
jarsDir.listFiles().foreach { f =>
|
||||
if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) {
|
||||
distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR))
|
||||
if (f.isFile && f.getName.toLowerCase().endsWith(".jar") && f.canRead) {
|
||||
jarsStream.putNextEntry(new ZipEntry(f.getName))
|
||||
Files.copy(f, jarsStream)
|
||||
jarsStream.closeEntry()
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
jarsStream.close()
|
||||
}
|
||||
|
||||
distribute(jarsArchive.toURI.getPath,
|
||||
resType = LocalResourceType.ARCHIVE,
|
||||
destName = Some(LOCALIZED_LIB_DIR))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -285,8 +285,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
|
|||
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
|
||||
val client = createClient(sparkConf)
|
||||
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
|
||||
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(),
|
||||
anyBoolean(), any())
|
||||
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
|
||||
}
|
||||
|
||||
|
@ -295,13 +293,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
|
|||
val jarsDir = new File(libs, "jars")
|
||||
assert(jarsDir.mkdir())
|
||||
new FileOutputStream(new File(libs, "RELEASE")).close()
|
||||
val userLibs = Utils.createTempDir()
|
||||
val userLib1 = Utils.createTempDir()
|
||||
val userLib2 = Utils.createTempDir()
|
||||
|
||||
val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
|
||||
val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
|
||||
val jar2 = TestUtils.createJarWithFiles(Map(), userLib1)
|
||||
// Copy jar2 to jar3 with same name
|
||||
val jar3 = {
|
||||
val target = new File(userLibs, new File(jar1.toURI).getName)
|
||||
val target = new File(userLib2, new File(jar2.toURI).getName)
|
||||
val input = new FileInputStream(jar2.getPath)
|
||||
val output = new FileOutputStream(target)
|
||||
Utils.copyStream(input, output, closeStreams = true)
|
||||
|
@ -315,7 +314,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
|
|||
val tempDir = Utils.createTempDir()
|
||||
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
|
||||
|
||||
// Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
|
||||
// Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar2 will be
|
||||
// ignored.
|
||||
sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue