Revert unnecessary changes to core
While benchmarking, we accidentally committed some unnecessary changes to core such as adding logging. These changes make it more difficult to merge from Spark upstream, so this commit reverts them.
This commit is contained in:
parent
8bd5f89662
commit
971f824014
|
@ -156,7 +156,6 @@ object SparkEnv extends Logging {
|
||||||
|
|
||||||
val serializer = serializerManager.setDefault(
|
val serializer = serializerManager.setDefault(
|
||||||
System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
|
System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
|
||||||
logInfo("spark.serializer is " + System.getProperty("spark.serializer"))
|
|
||||||
|
|
||||||
val closureSerializer = serializerManager.get(
|
val closureSerializer = serializerManager.get(
|
||||||
System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
|
System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
|
||||||
|
|
|
@ -265,9 +265,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
|
||||||
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
|
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
|
||||||
* partition the output RDD.
|
* partition the output RDD.
|
||||||
*/
|
*/
|
||||||
|
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
|
||||||
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner):
|
|
||||||
RDD[(K, (V, Option[W]))] = {
|
|
||||||
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
|
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
|
||||||
if (ws.isEmpty) {
|
if (ws.isEmpty) {
|
||||||
vs.iterator.map(v => (v, None))
|
vs.iterator.map(v => (v, None))
|
||||||
|
|
|
@ -63,11 +63,15 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
|
||||||
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
|
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
|
||||||
|
|
||||||
// Allow the user to register their own classes by setting spark.kryo.registrator
|
// Allow the user to register their own classes by setting spark.kryo.registrator
|
||||||
|
try {
|
||||||
Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
|
Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
|
||||||
logDebug("Running user registrator: " + regCls)
|
logDebug("Running user registrator: " + regCls)
|
||||||
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
|
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
|
||||||
reg.registerClasses(kryo)
|
reg.registerClasses(kryo)
|
||||||
}
|
}
|
||||||
|
} catch {
|
||||||
|
case _: Exception => println("Failed to register spark.kryo.registrator")
|
||||||
|
}
|
||||||
|
|
||||||
// Register Chill's classes; we do this after our ranges and the user's own classes to let
|
// Register Chill's classes; we do this after our ranges and the user's own classes to let
|
||||||
// our code override the generic serialziers in Chill for things like Seq
|
// our code override the generic serialziers in Chill for things like Seq
|
||||||
|
@ -118,7 +122,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging {
|
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
|
||||||
val kryo = ks.newKryo()
|
val kryo = ks.newKryo()
|
||||||
|
|
||||||
// Make these lazy vals to avoid creating a buffer unless we use them
|
// Make these lazy vals to avoid creating a buffer unless we use them
|
||||||
|
|
|
@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
|
||||||
* instance of the serializer object has been created, the get method returns that instead of
|
* instance of the serializer object has been created, the get method returns that instead of
|
||||||
* creating a new one.
|
* creating a new one.
|
||||||
*/
|
*/
|
||||||
private[spark] class SerializerManager extends org.apache.spark.Logging {
|
private[spark] class SerializerManager {
|
||||||
|
|
||||||
private val serializers = new ConcurrentHashMap[String, Serializer]
|
private val serializers = new ConcurrentHashMap[String, Serializer]
|
||||||
private var _default: Serializer = _
|
private var _default: Serializer = _
|
||||||
|
|
Loading…
Reference in a new issue