Spark-939: allow user jars to take precedence over spark jars
I still need to do a small bit of re-factoring [mostly the one Java file I'll switch it back to a Scala file and use it in both the close loaders], but comments on other things I should do would be great. Author: Holden Karau <holden@pigscanfly.ca> Closes #217 from holdenk/spark-939-allow-user-jars-to-take-precedence-over-spark-jars and squashes the following commits: cf0cac9 [Holden Karau] Fix the executorclassloader 1955232 [Holden Karau] Fix long line in TestUtils 8f89965 [Holden Karau] Fix tests for new class name 7546549 [Holden Karau] CR feedback, merge some of the testutils methods down, rename the classloader 644719f [Holden Karau] User the class generator for the repl class loader tests too f0b7114 [Holden Karau] Fix the core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala tests 204b199 [Holden Karau] Fix the generated classes 9f68f10 [Holden Karau] Start rewriting the ExecutorURLClassLoaderSuite to not use the hard coded classes 858aba2 [Holden Karau] Remove a bunch of test junk 261aaee [Holden Karau] simplify executorurlclassloader a bit 7a7bf5f [Holden Karau] CR feedback d4ae848 [Holden Karau] rewrite component into scala aa95083 [Holden Karau] CR feedback 7752594 [Holden Karau] re-add https comment a0ef85a [Holden Karau] Fix style issues 125ea7f [Holden Karau] Easier to just remove those files, we don't need them bb8d179 [Holden Karau] Fix issues with the repl class loader 241b03d [Holden Karau] fix my rat excludes a343350 [Holden Karau] Update rat-excludes and remove a useless file d90d217 [Holden Karau] Fix fall back with custom class loader and add a test for it 4919bf9 [Holden Karau] Fix parent calling class loader issue 8a67302 [Holden Karau] Test are good 9e2d236 [Holden Karau] It works comrade 691ee00 [Holden Karau] It works ish dc4fe44 [Holden Karau] Does not depend on being in my home directory 47046ff [Holden Karau] Remove bad import' 22d83cb [Holden Karau] Add a test suite for the executor url class loader suite 7ef4628 [Holden Karau] Clean up 792d961 [Holden Karau] Almost works 16aecd1 [Holden Karau] Doesn't quite work 8d2241e [Holden Karau] Adda FakeClass for testing ClassLoader precedence options 648b559 [Holden Karau] Both class loaders compile. Now for testing e1d9f71 [Holden Karau] One loader workers.
This commit is contained in:
parent
b9e0c937df
commit
fa0524fd02
|
@ -39,4 +39,4 @@ work
|
|||
.*\.q
|
||||
golden
|
||||
test.out/*
|
||||
.*iml
|
||||
.*iml
|
|
@ -26,7 +26,14 @@ import scala.collection.JavaConversions._
|
|||
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
|
||||
import com.google.common.io.Files
|
||||
|
||||
object TestUtils {
|
||||
/**
|
||||
* Utilities for tests. Included in main codebase since it's used by multiple
|
||||
* projects.
|
||||
*
|
||||
* TODO: See if we can move this to the test codebase by specifying
|
||||
* test dependencies between projects.
|
||||
*/
|
||||
private[spark] object TestUtils {
|
||||
|
||||
/**
|
||||
* Create a jar that defines classes with the given names.
|
||||
|
@ -34,13 +41,14 @@ object TestUtils {
|
|||
* Note: if this is used during class loader tests, class names should be unique
|
||||
* in order to avoid interference between tests.
|
||||
*/
|
||||
def createJarWithClasses(classNames: Seq[String]): URL = {
|
||||
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
|
||||
val tempDir = Files.createTempDir()
|
||||
val files = for (name <- classNames) yield createCompiledClass(name, tempDir)
|
||||
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
|
||||
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
|
||||
createJar(files, jarFile)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a jar file that contains this set of files. All files will be located at the root
|
||||
* of the jar.
|
||||
|
@ -80,9 +88,11 @@ object TestUtils {
|
|||
}
|
||||
|
||||
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
|
||||
def createCompiledClass(className: String, destDir: File): File = {
|
||||
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
|
||||
val compiler = ToolProvider.getSystemJavaCompiler
|
||||
val sourceFile = new JavaSourceFromString(className, s"public class $className {}")
|
||||
val sourceFile = new JavaSourceFromString(className,
|
||||
"public class " + className + " { @Override public String toString() { " +
|
||||
"return \"" + value + "\";}}")
|
||||
|
||||
// Calling this outputs a class file in pwd. It's easier to just rename the file than
|
||||
// build a custom FileManager that controls the output location.
|
|
@ -291,15 +291,19 @@ private[spark] class Executor(
|
|||
* Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
|
||||
* created by the interpreter to the search path
|
||||
*/
|
||||
private def createClassLoader(): ExecutorURLClassLoader = {
|
||||
val loader = Thread.currentThread().getContextClassLoader
|
||||
private def createClassLoader(): MutableURLClassLoader = {
|
||||
val loader = this.getClass.getClassLoader
|
||||
|
||||
// For each of the jars in the jarSet, add them to the class loader.
|
||||
// We assume each of the files has already been fetched.
|
||||
val urls = currentJars.keySet.map { uri =>
|
||||
new File(uri.split("/").last).toURI.toURL
|
||||
}.toArray
|
||||
new ExecutorURLClassLoader(urls, loader)
|
||||
val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
|
||||
userClassPathFirst match {
|
||||
case true => new ChildExecutorURLClassLoader(urls, loader)
|
||||
case false => new ExecutorURLClassLoader(urls, loader)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -310,11 +314,14 @@ private[spark] class Executor(
|
|||
val classUri = conf.get("spark.repl.class.uri", null)
|
||||
if (classUri != null) {
|
||||
logInfo("Using REPL class URI: " + classUri)
|
||||
val userClassPathFirst: java.lang.Boolean =
|
||||
conf.getBoolean("spark.files.userClassPathFirst", false)
|
||||
try {
|
||||
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
|
||||
.asInstanceOf[Class[_ <: ClassLoader]]
|
||||
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader])
|
||||
constructor.newInstance(classUri, parent)
|
||||
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader],
|
||||
classOf[Boolean])
|
||||
constructor.newInstance(classUri, parent, userClassPathFirst)
|
||||
} catch {
|
||||
case _: ClassNotFoundException =>
|
||||
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
|
||||
|
|
|
@ -19,13 +19,56 @@ package org.apache.spark.executor
|
|||
|
||||
import java.net.{URLClassLoader, URL}
|
||||
|
||||
import org.apache.spark.util.ParentClassLoader
|
||||
|
||||
/**
|
||||
* The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
|
||||
* We also make changes so user classes can come before the default classes.
|
||||
*/
|
||||
|
||||
private[spark] trait MutableURLClassLoader extends ClassLoader {
|
||||
def addURL(url: URL)
|
||||
def getURLs: Array[URL]
|
||||
}
|
||||
|
||||
private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
|
||||
extends MutableURLClassLoader {
|
||||
|
||||
private object userClassLoader extends URLClassLoader(urls, null){
|
||||
override def addURL(url: URL) {
|
||||
super.addURL(url)
|
||||
}
|
||||
override def findClass(name: String): Class[_] = {
|
||||
super.findClass(name)
|
||||
}
|
||||
}
|
||||
|
||||
private val parentClassLoader = new ParentClassLoader(parent)
|
||||
|
||||
override def findClass(name: String): Class[_] = {
|
||||
try {
|
||||
userClassLoader.findClass(name)
|
||||
} catch {
|
||||
case e: ClassNotFoundException => {
|
||||
parentClassLoader.loadClass(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def addURL(url: URL) {
|
||||
userClassLoader.addURL(url)
|
||||
}
|
||||
|
||||
def getURLs() = {
|
||||
userClassLoader.getURLs()
|
||||
}
|
||||
}
|
||||
|
||||
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
|
||||
extends URLClassLoader(urls, parent) {
|
||||
extends URLClassLoader(urls, parent) with MutableURLClassLoader {
|
||||
|
||||
override def addURL(url: URL) {
|
||||
super.addURL(url)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.util
|
||||
|
||||
/**
|
||||
* A class loader which makes findClass accesible to the child
|
||||
*/
|
||||
private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(parent) {
|
||||
|
||||
override def findClass(name: String) = {
|
||||
super.findClass(name)
|
||||
}
|
||||
|
||||
override def loadClass(name: String): Class[_] = {
|
||||
super.loadClass(name)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.executor
|
||||
|
||||
import java.io.File
|
||||
import java.net.URLClassLoader
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.TestUtils
|
||||
|
||||
class ExecutorURLClassLoaderSuite extends FunSuite {
|
||||
|
||||
val childClassNames = List("FakeClass1", "FakeClass2")
|
||||
val parentClassNames = List("FakeClass1", "FakeClass2", "FakeClass3")
|
||||
val urls = List(TestUtils.createJarWithClasses(childClassNames, "1")).toArray
|
||||
val urls2 = List(TestUtils.createJarWithClasses(parentClassNames, "2")).toArray
|
||||
|
||||
test("child first") {
|
||||
val parentLoader = new URLClassLoader(urls2, null)
|
||||
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
|
||||
val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
|
||||
val fakeClassVersion = fakeClass.toString
|
||||
assert(fakeClassVersion === "1")
|
||||
}
|
||||
|
||||
test("parent first") {
|
||||
val parentLoader = new URLClassLoader(urls2, null)
|
||||
val classLoader = new ExecutorURLClassLoader(urls, parentLoader)
|
||||
val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
|
||||
val fakeClassVersion = fakeClass.toString
|
||||
assert(fakeClassVersion === "2")
|
||||
}
|
||||
|
||||
test("child first can fall back") {
|
||||
val parentLoader = new URLClassLoader(urls2, null)
|
||||
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
|
||||
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
|
||||
val fakeClassVersion = fakeClass.toString
|
||||
assert(fakeClassVersion === "2")
|
||||
}
|
||||
|
||||
test("child first can fail") {
|
||||
val parentLoader = new URLClassLoader(urls2, null)
|
||||
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
|
||||
intercept[java.lang.ClassNotFoundException] {
|
||||
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -596,6 +596,15 @@ Apart from these, the following properties are also available, and may be useful
|
|||
the driver.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>spark.files.userClassPathFirst</td>
|
||||
<td>false</td>
|
||||
<td>
|
||||
(Experimental) Whether to give user-added jars precedence over Spark's own jars when
|
||||
loading classes in Executors. This feature can be used to mitigate conflicts between
|
||||
Spark's dependencies and user dependencies. It is currently an experimental feature.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>spark.authenticate</td>
|
||||
<td>false</td>
|
||||
|
|
|
@ -195,6 +195,7 @@ object SparkBuild extends Build {
|
|||
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
|
||||
|
||||
resolvers ++= Seq(
|
||||
// HTTPS is unavailable for Maven Central
|
||||
"Maven Repository" at "http://repo.maven.apache.org/maven2",
|
||||
"Apache Repository" at "https://repository.apache.org/content/repositories/releases",
|
||||
"JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/",
|
||||
|
|
|
@ -26,21 +26,23 @@ import org.apache.hadoop.fs.{FileSystem, Path}
|
|||
|
||||
import org.apache.spark.SparkEnv
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
import org.apache.spark.util.ParentClassLoader
|
||||
|
||||
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
|
||||
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
|
||||
|
||||
|
||||
/**
|
||||
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
|
||||
* used to load classes defined by the interpreter when the REPL is used
|
||||
*/
|
||||
class ExecutorClassLoader(classUri: String, parent: ClassLoader)
|
||||
extends ClassLoader(parent) {
|
||||
* used to load classes defined by the interpreter when the REPL is used.
|
||||
* Allows the user to specify if user class path should be first
|
||||
*/
|
||||
class ExecutorClassLoader(classUri: String, parent: ClassLoader,
|
||||
userClassPathFirst: Boolean) extends ClassLoader {
|
||||
val uri = new URI(classUri)
|
||||
val directory = uri.getPath
|
||||
|
||||
val parentLoader = new ParentClassLoader(parent)
|
||||
|
||||
// Hadoop FileSystem object for our URI, if it isn't using HTTP
|
||||
var fileSystem: FileSystem = {
|
||||
if (uri.getScheme() == "http") {
|
||||
|
@ -49,8 +51,27 @@ extends ClassLoader(parent) {
|
|||
FileSystem.get(uri, new Configuration())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override def findClass(name: String): Class[_] = {
|
||||
userClassPathFirst match {
|
||||
case true => findClassLocally(name).getOrElse(parentLoader.loadClass(name))
|
||||
case false => {
|
||||
try {
|
||||
parentLoader.loadClass(name)
|
||||
} catch {
|
||||
case e: ClassNotFoundException => {
|
||||
val classOption = findClassLocally(name)
|
||||
classOption match {
|
||||
case None => throw new ClassNotFoundException(name, e)
|
||||
case Some(a) => a
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def findClassLocally(name: String): Option[Class[_]] = {
|
||||
try {
|
||||
val pathInDirectory = name.replace('.', '/') + ".class"
|
||||
val inputStream = {
|
||||
|
@ -68,9 +89,9 @@ extends ClassLoader(parent) {
|
|||
}
|
||||
val bytes = readAndTransformClass(name, inputStream)
|
||||
inputStream.close()
|
||||
return defineClass(name, bytes, 0, bytes.length)
|
||||
Some(defineClass(name, bytes, 0, bytes.length))
|
||||
} catch {
|
||||
case e: Exception => throw new ClassNotFoundException(name, e)
|
||||
case e: Exception => None
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.repl
|
||||
|
||||
import java.io.File
|
||||
import java.net.URLClassLoader
|
||||
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import com.google.common.io.Files
|
||||
|
||||
import org.apache.spark.TestUtils
|
||||
|
||||
class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
|
||||
|
||||
val childClassNames = List("ReplFakeClass1", "ReplFakeClass2")
|
||||
val parentClassNames = List("ReplFakeClass1", "ReplFakeClass2", "ReplFakeClass3")
|
||||
val tempDir1 = Files.createTempDir()
|
||||
val tempDir2 = Files.createTempDir()
|
||||
val url1 = "file://" + tempDir1
|
||||
val urls2 = List(tempDir2.toURI.toURL).toArray
|
||||
|
||||
override def beforeAll() {
|
||||
childClassNames.foreach(TestUtils.createCompiledClass(_, tempDir1, "1"))
|
||||
parentClassNames.foreach(TestUtils.createCompiledClass(_, tempDir2, "2"))
|
||||
}
|
||||
|
||||
test("child first") {
|
||||
val parentLoader = new URLClassLoader(urls2, null)
|
||||
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
|
||||
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
|
||||
val fakeClassVersion = fakeClass.toString
|
||||
assert(fakeClassVersion === "1")
|
||||
}
|
||||
|
||||
test("parent first") {
|
||||
val parentLoader = new URLClassLoader(urls2, null)
|
||||
val classLoader = new ExecutorClassLoader(url1, parentLoader, false)
|
||||
val fakeClass = classLoader.loadClass("ReplFakeClass1").newInstance()
|
||||
val fakeClassVersion = fakeClass.toString
|
||||
assert(fakeClassVersion === "2")
|
||||
}
|
||||
|
||||
test("child first can fall back") {
|
||||
val parentLoader = new URLClassLoader(urls2, null)
|
||||
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
|
||||
val fakeClass = classLoader.loadClass("ReplFakeClass3").newInstance()
|
||||
val fakeClassVersion = fakeClass.toString
|
||||
assert(fakeClassVersion === "2")
|
||||
}
|
||||
|
||||
test("child first can fail") {
|
||||
val parentLoader = new URLClassLoader(urls2, null)
|
||||
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
|
||||
intercept[java.lang.ClassNotFoundException] {
|
||||
classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in a new issue