Merge pull request #125 from velvia/2013-10/local-jar-uri

Add support for local:// URI scheme for addJars()

This PR adds support for a new URI scheme for SparkContext.addJars():  `local://file/path`.
The *local* scheme indicates that the `/file/path` exists on every worker node.    The reason for its existence is for big library JARs, which would be really expensive to serve using the standard HTTP fileserver distribution method, especially for big clusters.  Today the only inexpensive method (assuming such a file is on every host, via say NFS, rsync, etc.) of doing this is to add the JAR to the SPARK_CLASSPATH, but we want a method where the user does not need to modify the Spark configuration.

I would add something to the docs, but it's not obvious where to add it.

Oh, and it would be great if this could be merged in time for 0.8.1.
This commit is contained in:
Matei Zaharia 2013-10-30 12:03:44 -07:00
commit 618c1f6cf3
2 changed files with 21 additions and 1 deletions

View file

@ -683,7 +683,7 @@ class SparkContext(
/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI.
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
if (path == null) {
@ -696,6 +696,7 @@ class SparkContext(
} else {
val uri = new URI(path)
key = uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
if (env.hadoop.isYarnMode()) {
// In order for this to work on yarn the user must specify the --addjars option to
@ -713,6 +714,9 @@ class SparkContext(
} else {
env.httpFileServer.addJar(new File(uri.getPath))
}
// A JAR file which exists locally on every worker node
case "local" =>
"file:" + uri.getPath
case _ =>
path
}

View file

@ -120,4 +120,20 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile.replace("file", "local"))
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
}