diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala index 1f92233df9..28efb5d1c1 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -13,7 +13,7 @@ import org.apache.spark.graph.impl.VertexPartition * tried specializing I got a warning about inherenting from a type * that is not a trait. */ -class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] { +class EdgeTriplet[VD, ED] extends Edge[ED] { // class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest, // @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] { @@ -28,9 +28,6 @@ class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] { */ var dstAttr: VD = _ //nullValue[VD] - def srcMask: Boolean = vPart.isDefined(srcId) - def dstMask: Boolean = vPart.isDefined(dstId) - /** * Set the edge properties of this triplet. */ diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 4a1c10283f..6a267d8c28 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -91,17 +91,9 @@ object Pregel { mergeMsg: (A, A) => A) : Graph[VD, ED] = { - def sendMsgFun(edge: EdgeTriplet[VD, ED]): Iterator[(Vid, A)] = { - if (edge.srcMask) { - sendMsg(edge) - } else { - Iterator.empty - } - } - var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ) // compute the messages - var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache() + var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache() var activeMessages = messages.count() // Loop var i = 0 @@ -113,7 +105,7 @@ object Pregel { val oldMessages = messages // compute the messages - messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache() + messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache() activeMessages = messages.count() // after counting we can unpersist the old messages oldMessages.unpersist(blocking=false)