From 971f8240147b309f98edb5904657bf3bbd018dd8 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 18 Oct 2013 16:03:13 -0700 Subject: [PATCH] Revert unnecessary changes to core While benchmarking, we accidentally committed some unnecessary changes to core such as adding logging. These changes make it more difficult to merge from Spark upstream, so this commit reverts them. --- .../src/main/scala/org/apache/spark/SparkEnv.scala | 1 - .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 ++---- .../apache/spark/serializer/KryoSerializer.scala | 14 +++++++++----- .../spark/serializer/SerializerManager.scala | 2 +- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 9b8384bcbb..29968c273c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -156,7 +156,6 @@ object SparkEnv extends Logging { val serializer = serializerManager.setDefault( System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer")) - logInfo("spark.serializer is " + System.getProperty("spark.serializer")) val closureSerializer = serializerManager.get( System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 8a66297f6f..81bf867188 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -245,7 +245,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } - new ShuffledRDD[K, V, (K, V)](self, partitioner) + new ShuffledRDD[K, V, (K, V)](self, partitioner) } /** @@ -265,9 +265,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - - def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): - RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { vs.iterator.map(v => (v, None)) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 263ff59ba6..55b25f145a 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -63,10 +63,14 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) // Allow the user to register their own classes by setting spark.kryo.registrator - Option(System.getProperty("spark.kryo.registrator")).foreach { regCls => - logDebug("Running user registrator: " + regCls) - val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] - reg.registerClasses(kryo) + try { + Option(System.getProperty("spark.kryo.registrator")).foreach { regCls => + logDebug("Running user registrator: " + regCls) + val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] + reg.registerClasses(kryo) + } + } catch { + case _: Exception => println("Failed to register spark.kryo.registrator") } // Register Chill's classes; we do this after our ranges and the user's own classes to let @@ -118,7 +122,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging { +private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { val kryo = ks.newKryo() // Make these lazy vals to avoid creating a buffer unless we use them diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 5082730ae3..2955986fec 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap * instance of the serializer object has been created, the get method returns that instead of * creating a new one. */ -private[spark] class SerializerManager extends org.apache.spark.Logging { +private[spark] class SerializerManager { private val serializers = new ConcurrentHashMap[String, Serializer] private var _default: Serializer = _