Use mask for dynamic Pregel
This commit is contained in:
parent
7457abe282
commit
39b0256668
|
@ -1,5 +1,7 @@
|
|||
package org.apache.spark.graph
|
||||
|
||||
import org.apache.spark.graph.impl.VertexPartition
|
||||
|
||||
/**
|
||||
* An edge triplet represents two vertices and edge along with their
|
||||
* attributes.
|
||||
|
@ -11,7 +13,7 @@ package org.apache.spark.graph
|
|||
* tried specializing I got a warning about inherenting from a type
|
||||
* that is not a trait.
|
||||
*/
|
||||
class EdgeTriplet[VD, ED] extends Edge[ED] {
|
||||
class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] {
|
||||
// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest,
|
||||
// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] {
|
||||
|
||||
|
@ -19,12 +21,15 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
|
|||
/**
|
||||
* The source vertex attribute
|
||||
*/
|
||||
var srcAttr: VD = _ //nullValue[VD]
|
||||
var srcAttr: VD = _ //nullValue[VD]
|
||||
|
||||
/**
|
||||
* The destination vertex attribute
|
||||
*/
|
||||
var dstAttr: VD = _ //nullValue[VD]
|
||||
var dstAttr: VD = _ //nullValue[VD]
|
||||
|
||||
def srcMask: Boolean = vPart.isDefined(srcId)
|
||||
def dstMask: Boolean = vPart.isDefined(dstId)
|
||||
|
||||
/**
|
||||
* Set the edge properties of this triplet.
|
||||
|
|
|
@ -124,12 +124,3 @@ object GraphLab {
|
|||
activeGraph.mapVertices{case (vid, data) => data._2 }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -169,36 +169,29 @@ object Pregel {
|
|||
mergeMsg: (A, A) => A)
|
||||
: Graph[VD, ED] = {
|
||||
|
||||
def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = {
|
||||
msgOpt match {
|
||||
case Some(msg) => (vprog(id, attr._1, msg), true)
|
||||
case None => (attr._1, false)
|
||||
}
|
||||
}
|
||||
|
||||
def sendMsgFun(edge: EdgeTriplet[(VD,Boolean), ED]): Iterator[(Vid, A)] = {
|
||||
if(edge.srcAttr._2) {
|
||||
val et = new EdgeTriplet[VD, ED]
|
||||
et.srcId = edge.srcId
|
||||
et.srcAttr = edge.srcAttr._1
|
||||
et.dstId = edge.dstId
|
||||
et.dstAttr = edge.dstAttr._1
|
||||
et.attr = edge.attr
|
||||
sendMsg(et)
|
||||
def sendMsgFun(edge: EdgeTriplet[VD, ED]): Iterator[(Vid, A)] = {
|
||||
if (edge.srcMask) {
|
||||
sendMsg(edge)
|
||||
} else {
|
||||
Iterator.empty
|
||||
}
|
||||
}
|
||||
|
||||
var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )
|
||||
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
|
||||
// compute the messages
|
||||
var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache
|
||||
var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache()
|
||||
var activeMessages = messages.count
|
||||
// Loop
|
||||
var i = 0
|
||||
while (activeMessages > 0) {
|
||||
// receive the messages
|
||||
g = g.outerJoinVertices(messages)(vprogFun)
|
||||
val newVerts = g.vertices.zipJoin(messages)(vprog).cache() // updating the vertices
|
||||
val changedVerts = g.vertices.diff(newVerts)
|
||||
println("Replicating %d changed vertices instead of %d total vertices".format(
|
||||
changedVerts.count, newVerts.count))
|
||||
// replicate the changed vertices
|
||||
g = graph.deltaJoinVertices(newVerts, changedVerts)
|
||||
|
||||
val oldMessages = messages
|
||||
// compute the messages
|
||||
messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache
|
||||
|
@ -208,8 +201,8 @@ object Pregel {
|
|||
// count the iteration
|
||||
i += 1
|
||||
}
|
||||
// Return the final graph
|
||||
g.mapVertices((id, attr) => attr._1)
|
||||
|
||||
g
|
||||
} // end of apply
|
||||
|
||||
} // end of class Pregel
|
||||
|
|
|
@ -23,6 +23,9 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
|
|||
* destinations. `vertexPlacement` specifies where each vertex will be
|
||||
* replicated. `vTableReplicated` stores the replicated vertex attributes, which
|
||||
* are co-partitioned with the relevant edges.
|
||||
*
|
||||
* mask in vertices means filter
|
||||
* mask in vTableReplicated means active
|
||||
*/
|
||||
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||
@transient val vertices: VertexRDD[VD],
|
||||
|
@ -44,8 +47,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
val edManifest = classManifest[ED]
|
||||
|
||||
edges.zipEdgePartitions(vTableReplicated.bothAttrs) { (edgePartition, vTableReplicatedIter) =>
|
||||
val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
|
||||
new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)(vdManifest, edManifest)
|
||||
val (_, vPart) = vTableReplicatedIter.next()
|
||||
new EdgeTripletIterator(vPart.index, vPart.values, edgePartition)(vdManifest, edManifest)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,14 +144,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
val vdManifest = classManifest[VD]
|
||||
val newETable =
|
||||
edges.zipEdgePartitions(vTableReplicated.bothAttrs) { (edgePartition, vTableReplicatedIter) =>
|
||||
val (pid, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
|
||||
val (pid, vPart) = vTableReplicatedIter.next()
|
||||
val et = new EdgeTriplet[VD, ED]
|
||||
val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](
|
||||
vidToIndex, vertexArray)(classManifest[Vid], vdManifest)
|
||||
val newEdgePartition = edgePartition.map { e =>
|
||||
et.set(e)
|
||||
et.srcAttr = vmap(e.srcId)
|
||||
et.dstAttr = vmap(e.dstId)
|
||||
et.srcAttr = vPart(e.srcId)
|
||||
et.dstAttr = vPart(e.dstId)
|
||||
f(et)
|
||||
}
|
||||
Iterator((pid, newEdgePartition))
|
||||
|
@ -209,44 +210,22 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
|
||||
// Map and combine.
|
||||
val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) =>
|
||||
val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
|
||||
assert(vidToIndex.capacity == vertexArray.size)
|
||||
val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)(
|
||||
classManifest[Vid], vdManifest)
|
||||
val (_, vertexPartition) = vTableReplicatedIter.next()
|
||||
|
||||
// Note: This doesn't allow users to send messages to arbitrary vertices.
|
||||
val msgArray = new Array[A](vertexArray.size)
|
||||
val msgBS = new BitSet(vertexArray.size)
|
||||
// Iterate over the partition
|
||||
val et = new EdgeTriplet[VD, ED]
|
||||
|
||||
edgePartition.foreach { e =>
|
||||
val et = new EdgeTriplet[VD, ED](vertexPartition)
|
||||
val filteredEdges = edgePartition.iterator.flatMap { e =>
|
||||
et.set(e)
|
||||
if (mapUsesSrcAttr) {
|
||||
et.srcAttr = vmap(e.srcId)
|
||||
et.srcAttr = vertexPartition(e.srcId)
|
||||
}
|
||||
if (mapUsesDstAttr) {
|
||||
et.dstAttr = vmap(e.dstId)
|
||||
}
|
||||
// TODO(rxin): rewrite the foreach using a simple while loop to speed things up.
|
||||
// Also given we are only allowing zero, one, or two messages, we can completely unroll
|
||||
// the for loop.
|
||||
mapFunc(et).foreach { case (vid, msg) =>
|
||||
// verify that the vid is valid
|
||||
assert(vid == et.srcId || vid == et.dstId)
|
||||
// Get the index of the key
|
||||
val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
|
||||
// Populate the aggregator map
|
||||
if (msgBS.get(ind)) {
|
||||
msgArray(ind) = reduceFunc(msgArray(ind), msg)
|
||||
} else {
|
||||
msgArray(ind) = msg
|
||||
msgBS.set(ind)
|
||||
}
|
||||
et.dstAttr = vertexPartition(e.dstId)
|
||||
}
|
||||
mapFunc(et)
|
||||
}
|
||||
// construct an iterator of tuples. Iterator[(Vid, A)]
|
||||
msgBS.iterator.map { ind => (vidToIndex.getValue(ind), msgArray(ind)) }
|
||||
// Note: This doesn't allow users to send messages to arbitrary vertices.
|
||||
vertexPartition.aggregateUsingIndex(filteredEdges, reduceFunc).iterator
|
||||
}
|
||||
|
||||
// do the final reduction reusing the index map
|
||||
|
|
|
@ -16,19 +16,19 @@ class VTableReplicated[VD: ClassManifest](
|
|||
vertexPlacement: VertexPlacement,
|
||||
prevVTableReplicated: Option[VTableReplicated[VD]] = None) {
|
||||
|
||||
val bothAttrs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
|
||||
val bothAttrs: RDD[(Pid, VertexPartition[VD])] =
|
||||
createVTableReplicated(vTable, eTable, vertexPlacement, true, true)
|
||||
|
||||
val srcAttrOnly: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
|
||||
val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] =
|
||||
createVTableReplicated(vTable, eTable, vertexPlacement, true, false)
|
||||
|
||||
val dstAttrOnly: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
|
||||
val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] =
|
||||
createVTableReplicated(vTable, eTable, vertexPlacement, false, true)
|
||||
|
||||
val noAttrs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
|
||||
val noAttrs: RDD[(Pid, VertexPartition[VD])] =
|
||||
createVTableReplicated(vTable, eTable, vertexPlacement, false, false)
|
||||
|
||||
def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = {
|
||||
def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
|
||||
(includeSrc, includeDst) match {
|
||||
case (true, true) => bothAttrs
|
||||
case (true, false) => srcAttrOnly
|
||||
|
@ -42,7 +42,7 @@ class VTableReplicated[VD: ClassManifest](
|
|||
eTable: EdgeRDD[_],
|
||||
vertexPlacement: VertexPlacement,
|
||||
includeSrcAttr: Boolean,
|
||||
includeDstAttr: Boolean): RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = {
|
||||
includeDstAttr: Boolean): RDD[(Pid, VertexPartition[VD])] = {
|
||||
|
||||
val placement = vertexPlacement.get(includeSrcAttr, includeDstAttr)
|
||||
val vdManifest = classManifest[VD]
|
||||
|
@ -55,25 +55,14 @@ class VTableReplicated[VD: ClassManifest](
|
|||
|
||||
prevVTableReplicated match {
|
||||
case Some(vTableReplicated) =>
|
||||
val prev: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
|
||||
val prev: RDD[(Pid, VertexPartition[VD])] =
|
||||
vTableReplicated.get(includeSrcAttr, includeDstAttr)
|
||||
|
||||
prev.zipPartitions(msgsByPartition) { (vTableIter, msgsIter) =>
|
||||
val (pid, (vidToIndex, oldVertexArray)) = vTableIter.next()
|
||||
|
||||
val vertexArray = vdManifest.newArray(oldVertexArray.length)
|
||||
System.arraycopy(oldVertexArray, 0, vertexArray, 0, vertexArray.length)
|
||||
|
||||
for ((_, block) <- msgsIter) {
|
||||
for (i <- 0 until block.vids.size) {
|
||||
val vid = block.vids(i)
|
||||
val attr = block.attrs(i)
|
||||
val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
|
||||
vertexArray(ind) = attr
|
||||
}
|
||||
}
|
||||
|
||||
Iterator((pid, (vidToIndex, vertexArray)))
|
||||
val (pid, vertexPartition) = vTableIter.next()
|
||||
val (_, block) = msgsIter.next()
|
||||
val newVPart = vertexPartition.updateUsingIndex(block.iterator)(vdManifest)
|
||||
Iterator((pid, newVPart))
|
||||
}.cache()
|
||||
|
||||
case None =>
|
||||
|
@ -107,7 +96,7 @@ class VTableReplicated[VD: ClassManifest](
|
|||
vertexArray(ind) = attr
|
||||
}
|
||||
}
|
||||
Iterator((pid, (vidToIndex, vertexArray)))
|
||||
Iterator((pid, new VertexPartition(vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)))
|
||||
}.cache()
|
||||
}
|
||||
}
|
||||
|
@ -131,4 +120,6 @@ object VTableReplicated {
|
|||
}
|
||||
}
|
||||
|
||||
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD])
|
||||
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) {
|
||||
def iterator: Iterator[(Vid, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
|
||||
}
|
||||
|
|
|
@ -42,6 +42,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
|
|||
/** Return the vertex attribute for the given vertex ID. */
|
||||
def apply(vid: Vid): VD = values(index.getPos(vid))
|
||||
|
||||
def isDefined(vid: Vid): Boolean = mask.get(index.getPos(vid))
|
||||
|
||||
/**
|
||||
* Pass each vertex attribute along with the vertex id through a map
|
||||
* function and retain the original RDD's partitioning and index.
|
||||
|
@ -167,6 +169,19 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
|
|||
new VertexPartition[VD2](index, newValues, newMask)
|
||||
}
|
||||
|
||||
def updateUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]])
|
||||
: VertexPartition[VD2] = {
|
||||
val newMask = new BitSet(capacity)
|
||||
val newValues = new Array[VD2](capacity)
|
||||
System.arraycopy(values, 0, newValues, 0, newValues.length)
|
||||
iter.foreach { case (vid, vdata) =>
|
||||
val pos = index.getPos(vid)
|
||||
newMask.set(pos)
|
||||
newValues(pos) = vdata
|
||||
}
|
||||
new VertexPartition[VD2](index, newValues, newMask)
|
||||
}
|
||||
|
||||
def aggregateUsingIndex[VD2: ClassManifest](
|
||||
iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] =
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue