Merge pull request #84 from amatsukawa/graphlab_enhancements
GraphLab bug fix & set start vertex
This commit is contained in:
commit
8a56c1ff67
|
@ -24,6 +24,8 @@ object GraphLab {
|
||||||
* @param scatterFunc Executed after the apply function the scatter function takes
|
* @param scatterFunc Executed after the apply function the scatter function takes
|
||||||
* a triplet and signals whether the neighboring vertex program
|
* a triplet and signals whether the neighboring vertex program
|
||||||
* must be recomputed.
|
* must be recomputed.
|
||||||
|
* @param startVertices predicate to determine which vertices to start the computation on.
|
||||||
|
* these will be the active vertices in the first iteration.
|
||||||
* @param numIter The maximum number of iterations to run.
|
* @param numIter The maximum number of iterations to run.
|
||||||
* @param gatherDirection The direction of edges to consider during the gather phase
|
* @param gatherDirection The direction of edges to consider during the gather phase
|
||||||
* @param scatterDirection The direction of edges to consider during the scatter phase
|
* @param scatterDirection The direction of edges to consider during the scatter phase
|
||||||
|
@ -40,12 +42,13 @@ object GraphLab {
|
||||||
(gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
|
(gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
|
||||||
mergeFunc: (A, A) => A,
|
mergeFunc: (A, A) => A,
|
||||||
applyFunc: (Vid, VD, Option[A]) => VD,
|
applyFunc: (Vid, VD, Option[A]) => VD,
|
||||||
scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean): Graph[VD, ED] = {
|
scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
|
||||||
|
startVertices: (Vid, VD) => Boolean = (vid: Vid, data: VD) => true): Graph[VD, ED] = {
|
||||||
|
|
||||||
|
|
||||||
// Add an active attribute to all vertices to track convergence.
|
// Add an active attribute to all vertices to track convergence.
|
||||||
var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
|
var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
|
||||||
case (id, data) => (true, data)
|
case (id, data) => (startVertices(id, data), data)
|
||||||
}.cache()
|
}.cache()
|
||||||
|
|
||||||
// The gather function wrapper strips the active attribute and
|
// The gather function wrapper strips the active attribute and
|
||||||
|
@ -86,9 +89,9 @@ object GraphLab {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used to set the active status of vertices for the next round
|
// Used to set the active status of vertices for the next round
|
||||||
def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = {
|
def applyActive(vid: Vid, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
|
||||||
val (prevActive, vData) = data
|
val (prevActive, vData) = data
|
||||||
(newActive, vData)
|
(newActiveOpt.getOrElse(false), vData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Main Loop ---------------------------------------------------------------------
|
// Main Loop ---------------------------------------------------------------------
|
||||||
|
@ -110,7 +113,7 @@ object GraphLab {
|
||||||
val scattered: RDD[(Vid, Boolean)] =
|
val scattered: RDD[(Vid, Boolean)] =
|
||||||
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
|
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
|
||||||
|
|
||||||
activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache()
|
activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
|
||||||
|
|
||||||
// Calculate the number of active vertices
|
// Calculate the number of active vertices
|
||||||
numActive = activeGraph.vertices.map{
|
numActive = activeGraph.vertices.map{
|
||||||
|
|
Loading…
Reference in a new issue