spark-instrumented-optimizer/core/src/main/scala/spark/JavaSerializer.scala
Matei Zaharia 63051dd2bc Merge in engine improvements from the Spark Streaming project, developed
jointly with Tathagata Das and Haoyuan Li. This commit imports the changes
and ports them to Mesos 0.9, but does not yet pass unit tests due to
various classes not supporting a graceful stop() yet.
2012-06-07 12:45:38 -07:00

61 lines
1.8 KiB
Scala

package spark
import java.io._
import java.nio.ByteBuffer
class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T) { objOut.writeObject(t) }
def flush() { objOut.flush() }
def close() { objOut.close() }
}
class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
extends DeserializationStream {
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
def readObject[T](): T = objIn.readObject().asInstanceOf[T]
def close() { objIn.close() }
}
class JavaSerializerInstance extends SerializerInstance {
def serialize[T](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
ByteBuffer.wrap(bos.toByteArray)
}
def deserialize[T](bytes: ByteBuffer): T = {
val bis = new ByteArrayInputStream(bytes.array())
val in = deserializeStream(bis)
in.readObject().asInstanceOf[T]
}
def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
val bis = new ByteArrayInputStream(bytes.array())
val in = deserializeStream(bis, loader)
in.readObject().asInstanceOf[T]
}
def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s)
}
def deserializeStream(s: InputStream): DeserializationStream = {
new JavaDeserializationStream(s, currentThread.getContextClassLoader)
}
def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
new JavaDeserializationStream(s, loader)
}
}
class JavaSerializer extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
}