diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 9b8384bcbb..29968c273c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -156,7 +156,6 @@ object SparkEnv extends Logging { val serializer = serializerManager.setDefault( System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer")) - logInfo("spark.serializer is " + System.getProperty("spark.serializer")) val closureSerializer = serializerManager.get( System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 8a66297f6f..81bf867188 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -245,7 +245,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } - new ShuffledRDD[K, V, (K, V)](self, partitioner) + new ShuffledRDD[K, V, (K, V)](self, partitioner) } /** @@ -265,9 +265,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - - def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): - RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { vs.iterator.map(v => (v, None)) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 263ff59ba6..55b25f145a 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -63,10 +63,14 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) // Allow the user to register their own classes by setting spark.kryo.registrator - Option(System.getProperty("spark.kryo.registrator")).foreach { regCls => - logDebug("Running user registrator: " + regCls) - val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] - reg.registerClasses(kryo) + try { + Option(System.getProperty("spark.kryo.registrator")).foreach { regCls => + logDebug("Running user registrator: " + regCls) + val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] + reg.registerClasses(kryo) + } + } catch { + case _: Exception => println("Failed to register spark.kryo.registrator") } // Register Chill's classes; we do this after our ranges and the user's own classes to let @@ -118,7 +122,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging { +private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { val kryo = ks.newKryo() // Make these lazy vals to avoid creating a buffer unless we use them diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 5082730ae3..2955986fec 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap * instance of the serializer object has been created, the get method returns that instead of * creating a new one. */ -private[spark] class SerializerManager extends org.apache.spark.Logging { +private[spark] class SerializerManager { private val serializers = new ConcurrentHashMap[String, Serializer] private var _default: Serializer = _