Always write Vids using variable encoding

Also, autoformat Serializers.scala.
This commit is contained in:
Ankur Dave 2013-12-04 18:20:10 -08:00
parent e0347ba6c7
commit a3bb98b88a

View file

@ -6,7 +6,6 @@ import java.nio.ByteBuffer
import org.apache.spark.graph._
import org.apache.spark.serializer._
class VidMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
@ -26,7 +25,6 @@ class VidMsgSerializer extends Serializer {
}
}
/** A special shuffle serializer for VertexBroadcastMessage[Int]. */
class IntVertexBroadcastMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
@ -34,7 +32,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Int]]
writeLong(msg.vid)
writeVarLong(msg._1, optimizePositive = false)
writeInt(msg.data)
this
}
@ -55,7 +53,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Long]]
writeLong(msg.vid)
writeVarLong(msg._1, optimizePositive = false)
writeLong(msg.data)
this
}
@ -78,7 +76,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Double]]
writeLong(msg.vid)
writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg.data)
this
}
@ -94,7 +92,6 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer {
}
}
/** A special shuffle serializer for AggregationMessage[Int]. */
class IntAggMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
@ -102,7 +99,7 @@ class IntAggMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[(Vid, Int)]
writeLong(msg._1)
writeVarLong(msg._1, optimizePositive = false)
writeUnsignedVarInt(msg._2)
this
}
@ -141,7 +138,6 @@ class LongAggMsgSerializer extends Serializer {
}
}
/** A special shuffle serializer for AggregationMessage[Double]. */
class DoubleAggMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
@ -282,7 +278,6 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial
override def close(): Unit = s.close()
}
sealed abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream {
// The implementation should override this one.
def readObject[T](): T
@ -350,7 +345,6 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
override def close(): Unit = s.close()
}
sealed trait ShuffleSerializerInstance extends SerializerInstance {
override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException