diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index 0bd73e936b..d723ab7b1e 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -157,27 +157,34 @@ class KryoSerializer extends spark.serializer.Serializer with Logging { // Register maps with a special serializer since they have complex internal structure class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any]) - extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] { + extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] { + + //hack, look at https://groups.google.com/forum/#!msg/kryo-users/Eu5V4bxCfws/k-8UQ22y59AJ + private final val FAKE_REFERENCE = new Object() override def write( - kryo: Kryo, - output: KryoOutput, - obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) { + kryo: Kryo, + output: KryoOutput, + obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) { val map = obj.asInstanceOf[scala.collection.Map[Any, Any]] - kryo.writeObject(output, map.size.asInstanceOf[java.lang.Integer]) + output.writeInt(map.size) for ((k, v) <- map) { kryo.writeClassAndObject(output, k) kryo.writeClassAndObject(output, v) } } override def read ( - kryo: Kryo, - input: KryoInput, - cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]]) + kryo: Kryo, + input: KryoInput, + cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]]) : Array[(Any, Any)] => scala.collection.Map[Any, Any] = { - val size = kryo.readObject(input, classOf[java.lang.Integer]).intValue + kryo.reference(FAKE_REFERENCE) + val size = input.readInt() val elems = new Array[(Any, Any)](size) - for (i <- 0 until size) - elems(i) = (kryo.readClassAndObject(input), kryo.readClassAndObject(input)) + for (i <- 0 until size) { + val k = kryo.readClassAndObject(input) + val v = kryo.readClassAndObject(input) + elems(i)=(k,v) + } buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]] } } diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala index 06d446ea24..327e2ff848 100644 --- a/core/src/test/scala/spark/KryoSerializerSuite.scala +++ b/core/src/test/scala/spark/KryoSerializerSuite.scala @@ -82,6 +82,7 @@ class KryoSerializerSuite extends FunSuite { check(mutable.HashMap(1 -> "one", 2 -> "two")) check(mutable.HashMap("one" -> 1, "two" -> 2)) check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4)))) + check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three"))) } test("custom registrator") { diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5b241cc4ef..d44bf3b5e3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -132,7 +132,7 @@ object SparkBuild extends Build { "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", - "de.javakaffee" % "kryo-serializers" % "0.20", + "de.javakaffee" % "kryo-serializers" % "0.22", "com.typesafe.akka" % "akka-actor" % "2.0.3", "com.typesafe.akka" % "akka-remote" % "2.0.3", "com.typesafe.akka" % "akka-slf4j" % "2.0.3",