Better reuse of buffers in Kryo serialization

This commit is contained in:
Matei Zaharia 2011-03-08 12:36:36 -08:00
parent 7408230bfa
commit 7febdfbe29

View file

@ -58,10 +58,8 @@ object ZigZag {
}
}
class KryoSerializationStream(kryo: Kryo, out: OutputStream)
class KryoSerializationStream(kryo: Kryo, buf: ByteBuffer, out: OutputStream)
extends SerializationStream {
val buf = ByteBuffer.allocateDirect(1024*1024)
def writeObject[T](t: T) {
kryo.writeClassAndObject(buf, t)
ZigZag.writeInt(buf.position(), out)
@ -74,10 +72,8 @@ extends SerializationStream {
def close() { out.close() }
}
class KryoDeserializationStream(kryo: Kryo, in: InputStream)
class KryoDeserializationStream(buf: ObjectBuffer, in: InputStream)
extends DeserializationStream {
val buf = new ObjectBuffer(kryo, 1024*1024)
def readObject[T](): T = {
val len = ZigZag.readInt(in)
buf.readClassAndObject(in, len).asInstanceOf[T]
@ -86,8 +82,8 @@ extends DeserializationStream {
def close() { in.close() }
}
class KryoSerializer(kryo: Kryo) extends Serializer {
val buf = new ObjectBuffer(kryo, 1024*1024)
class KryoSerializer(strat: KryoSerialization) extends Serializer {
val buf = strat.threadBuf.get()
def serialize[T](t: T): Array[Byte] = {
buf.writeClassAndObject(t)
@ -98,11 +94,11 @@ class KryoSerializer(kryo: Kryo) extends Serializer {
}
def outputStream(s: OutputStream): SerializationStream = {
new KryoSerializationStream(kryo, s)
new KryoSerializationStream(strat.kryo, strat.threadByteBuf.get(), s)
}
def inputStream(s: InputStream): DeserializationStream = {
new KryoDeserializationStream(kryo, s)
new KryoDeserializationStream(buf, s)
}
}
@ -114,6 +110,14 @@ trait KryoRegistrator {
class KryoSerialization extends SerializationStrategy with Logging {
val kryo = createKryo()
val threadBuf = new ThreadLocal[ObjectBuffer] {
override def initialValue = new ObjectBuffer(kryo, 128*1024*1024)
}
val threadByteBuf = new ThreadLocal[ByteBuffer] {
override def initialValue = ByteBuffer.allocate(128*1024*1024)
}
def createKryo(): Kryo = {
val kryo = new Kryo()
@ -158,5 +162,5 @@ class KryoSerialization extends SerializationStrategy with Logging {
kryo
}
def newSerializer(): Serializer = new KryoSerializer(kryo)
def newSerializer(): Serializer = new KryoSerializer(this)
}