[SPARK-2270] Kryo cannot serialize results returned by asJavaIterable
and thus groupBy/cogroup are broken in Java APIs when Kryo is used). @pwendell this should be merged into 1.0.1. Thanks @sorenmacbeth for reporting this & helping out with the fix. Author: Reynold Xin <rxin@apache.org> Closes #1206 from rxin/kryo-iterable-2270 and squashes the following commits: 09da0aa [Reynold Xin] Updated the comment. 009bf64 [Reynold Xin] [SPARK-2270] Kryo cannot serialize results returned by asJavaIterable (and thus groupBy/cogroup are broken in Java APIs when Kryo is used).
This commit is contained in:
parent
9aa603296c
commit
7ff2c754f3
|
@ -64,6 +64,9 @@ class KryoSerializer(conf: SparkConf)
|
||||||
kryo.register(cls)
|
kryo.register(cls)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
|
||||||
|
kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)
|
||||||
|
|
||||||
// Allow sending SerializableWritable
|
// Allow sending SerializableWritable
|
||||||
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
|
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
|
||||||
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
|
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
|
||||||
|
@ -183,3 +186,50 @@ private[serializer] object KryoSerializer {
|
||||||
classOf[Array[Byte]]
|
classOf[Array[Byte]]
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Kryo serializer for serializing results returned by asJavaIterable.
|
||||||
|
*
|
||||||
|
* The underlying object is scala.collection.convert.Wrappers$IterableWrapper.
|
||||||
|
* Kryo deserializes this into an AbstractCollection, which unfortunately doesn't work.
|
||||||
|
*/
|
||||||
|
private class JavaIterableWrapperSerializer
|
||||||
|
extends com.esotericsoftware.kryo.Serializer[java.lang.Iterable[_]] {
|
||||||
|
|
||||||
|
import JavaIterableWrapperSerializer._
|
||||||
|
|
||||||
|
override def write(kryo: Kryo, out: KryoOutput, obj: java.lang.Iterable[_]): Unit = {
|
||||||
|
// If the object is the wrapper, simply serialize the underlying Scala Iterable object.
|
||||||
|
// Otherwise, serialize the object itself.
|
||||||
|
if (obj.getClass == wrapperClass && underlyingMethodOpt.isDefined) {
|
||||||
|
kryo.writeClassAndObject(out, underlyingMethodOpt.get.invoke(obj))
|
||||||
|
} else {
|
||||||
|
kryo.writeClassAndObject(out, obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]])
|
||||||
|
: java.lang.Iterable[_] = {
|
||||||
|
kryo.readClassAndObject(in) match {
|
||||||
|
case scalaIterable: Iterable[_] =>
|
||||||
|
scala.collection.JavaConversions.asJavaIterable(scalaIterable)
|
||||||
|
case javaIterable: java.lang.Iterable[_] =>
|
||||||
|
javaIterable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private object JavaIterableWrapperSerializer extends Logging {
|
||||||
|
// The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper).
|
||||||
|
val wrapperClass =
|
||||||
|
scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
|
||||||
|
|
||||||
|
// Get the underlying method so we can use it to get the Scala collection for serialization.
|
||||||
|
private val underlyingMethodOpt = {
|
||||||
|
try Some(wrapperClass.getDeclaredMethod("underlying")) catch {
|
||||||
|
case e: Exception =>
|
||||||
|
logError("Failed to find the underlying field in " + wrapperClass, e)
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -128,6 +128,21 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
|
||||||
check(1.0 until 1000000.0 by 2.0)
|
check(1.0 until 1000000.0 by 2.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("asJavaIterable") {
|
||||||
|
// Serialize a collection wrapped by asJavaIterable
|
||||||
|
val ser = new KryoSerializer(conf).newInstance()
|
||||||
|
val a = ser.serialize(scala.collection.convert.WrapAsJava.asJavaIterable(Seq(12345)))
|
||||||
|
val b = ser.deserialize[java.lang.Iterable[Int]](a)
|
||||||
|
assert(b.iterator().next() === 12345)
|
||||||
|
|
||||||
|
// Serialize a normal Java collection
|
||||||
|
val col = new java.util.ArrayList[Int]
|
||||||
|
col.add(54321)
|
||||||
|
val c = ser.serialize(col)
|
||||||
|
val d = ser.deserialize[java.lang.Iterable[Int]](c)
|
||||||
|
assert(b.iterator().next() === 12345)
|
||||||
|
}
|
||||||
|
|
||||||
test("custom registrator") {
|
test("custom registrator") {
|
||||||
val ser = new KryoSerializer(conf).newInstance()
|
val ser = new KryoSerializer(conf).newInstance()
|
||||||
def check[T: ClassTag](t: T) {
|
def check[T: ClassTag](t: T) {
|
||||||
|
|
Loading…
Reference in a new issue