Don't expose {src,dst}Mask in EdgeTriplet
Exposing this information is not necessary due to the addition of skipStale.
This commit is contained in:
parent
87f2909561
commit
6bf51a5e15
|
@ -13,7 +13,7 @@ import org.apache.spark.graph.impl.VertexPartition
|
||||||
* tried specializing I got a warning about inherenting from a type
|
* tried specializing I got a warning about inherenting from a type
|
||||||
* that is not a trait.
|
* that is not a trait.
|
||||||
*/
|
*/
|
||||||
class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] {
|
class EdgeTriplet[VD, ED] extends Edge[ED] {
|
||||||
// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest,
|
// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest,
|
||||||
// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] {
|
// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] {
|
||||||
|
|
||||||
|
@ -28,9 +28,6 @@ class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] {
|
||||||
*/
|
*/
|
||||||
var dstAttr: VD = _ //nullValue[VD]
|
var dstAttr: VD = _ //nullValue[VD]
|
||||||
|
|
||||||
def srcMask: Boolean = vPart.isDefined(srcId)
|
|
||||||
def dstMask: Boolean = vPart.isDefined(dstId)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the edge properties of this triplet.
|
* Set the edge properties of this triplet.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -91,17 +91,9 @@ object Pregel {
|
||||||
mergeMsg: (A, A) => A)
|
mergeMsg: (A, A) => A)
|
||||||
: Graph[VD, ED] = {
|
: Graph[VD, ED] = {
|
||||||
|
|
||||||
def sendMsgFun(edge: EdgeTriplet[VD, ED]): Iterator[(Vid, A)] = {
|
|
||||||
if (edge.srcMask) {
|
|
||||||
sendMsg(edge)
|
|
||||||
} else {
|
|
||||||
Iterator.empty
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
|
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
|
||||||
// compute the messages
|
// compute the messages
|
||||||
var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache()
|
var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
|
||||||
var activeMessages = messages.count()
|
var activeMessages = messages.count()
|
||||||
// Loop
|
// Loop
|
||||||
var i = 0
|
var i = 0
|
||||||
|
@ -113,7 +105,7 @@ object Pregel {
|
||||||
|
|
||||||
val oldMessages = messages
|
val oldMessages = messages
|
||||||
// compute the messages
|
// compute the messages
|
||||||
messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache()
|
messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
|
||||||
activeMessages = messages.count()
|
activeMessages = messages.count()
|
||||||
// after counting we can unpersist the old messages
|
// after counting we can unpersist the old messages
|
||||||
oldMessages.unpersist(blocking=false)
|
oldMessages.unpersist(blocking=false)
|
||||||
|
|
Loading…
Reference in a new issue