Merge pull request #86 from ankurdave/vid-varenc

Finish work on #85
This commit is contained in:
Ankur Dave 2013-12-05 12:37:02 -08:00
commit e0bcaa0942

View file

@ -32,7 +32,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Int]]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg.vid, optimizePositive = false)
writeInt(msg.data)
this
}
@ -40,7 +40,9 @@ class IntVertexBroadcastMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
new VertexBroadcastMsg[Int](0, readLong(), readInt()).asInstanceOf[T]
val a = readVarLong(optimizePositive = false)
val b = readInt()
new VertexBroadcastMsg[Int](0, a, b).asInstanceOf[T]
}
}
}
@ -53,7 +55,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Long]]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg.vid, optimizePositive = false)
writeLong(msg.data)
this
}
@ -61,7 +63,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
val a = readLong()
val a = readVarLong(optimizePositive = false)
val b = readLong()
new VertexBroadcastMsg[Long](0, a, b).asInstanceOf[T]
}
@ -76,7 +78,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Double]]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg.vid, optimizePositive = false)
writeDouble(msg.data)
this
}
@ -84,7 +86,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
def readObject[T](): T = {
val a = readLong()
val a = readVarLong(optimizePositive = false)
val b = readDouble()
new VertexBroadcastMsg[Double](0, a, b).asInstanceOf[T]
}
@ -107,7 +109,7 @@ class IntAggMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
val a = readLong()
val a = readVarLong(optimizePositive = false)
val b = readUnsignedVarInt()
(a, b).asInstanceOf[T]
}