Merge pull request #149 from dennybritz/serfix
Instantiating custom serializer using user's classpath
This commit is contained in:
commit
d1759c0290
|
@ -38,16 +38,16 @@ class Executor extends org.apache.mesos.Executor with Logging {
|
||||||
// Make sure an appropriate class loader is set for remote actors
|
// Make sure an appropriate class loader is set for remote actors
|
||||||
RemoteActor.classLoader = getClass.getClassLoader
|
RemoteActor.classLoader = getClass.getClassLoader
|
||||||
|
|
||||||
|
// Create our ClassLoader (using spark properties) and set it on this thread
|
||||||
|
classLoader = createClassLoader()
|
||||||
|
Thread.currentThread.setContextClassLoader(classLoader)
|
||||||
|
|
||||||
// Initialize Spark environment (using system properties read above)
|
// Initialize Spark environment (using system properties read above)
|
||||||
env = SparkEnv.createFromSystemProperties(false)
|
env = SparkEnv.createFromSystemProperties(false)
|
||||||
SparkEnv.set(env)
|
SparkEnv.set(env)
|
||||||
// Old stuff that isn't yet using env
|
// Old stuff that isn't yet using env
|
||||||
Broadcast.initialize(false)
|
Broadcast.initialize(false)
|
||||||
|
|
||||||
// Create our ClassLoader (using spark properties) and set it on this thread
|
|
||||||
classLoader = createClassLoader()
|
|
||||||
Thread.currentThread.setContextClassLoader(classLoader)
|
|
||||||
|
|
||||||
// Start worker thread pool
|
// Start worker thread pool
|
||||||
threadPool = new ThreadPoolExecutor(
|
threadPool = new ThreadPoolExecutor(
|
||||||
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
|
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
|
||||||
|
|
|
@ -26,7 +26,7 @@ object SparkEnv {
|
||||||
val cache = Class.forName(cacheClass).newInstance().asInstanceOf[Cache]
|
val cache = Class.forName(cacheClass).newInstance().asInstanceOf[Cache]
|
||||||
|
|
||||||
val serializerClass = System.getProperty("spark.serializer", "spark.JavaSerializer")
|
val serializerClass = System.getProperty("spark.serializer", "spark.JavaSerializer")
|
||||||
val serializer = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
|
val serializer = Class.forName(serializerClass, true, Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[Serializer]
|
||||||
|
|
||||||
val closureSerializerClass =
|
val closureSerializerClass =
|
||||||
System.getProperty("spark.closure.serializer", "spark.JavaSerializer")
|
System.getProperty("spark.closure.serializer", "spark.JavaSerializer")
|
||||||
|
|
Loading…
Reference in a new issue