In static Pregel, replicate only changed vertices
This commit is contained in:
parent
3fc4534d19
commit
15374fed97
|
@ -278,8 +278,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
||||||
(mapFunc: (Vid, VD, Option[U]) => VD2)
|
(mapFunc: (Vid, VD, Option[U]) => VD2)
|
||||||
: Graph[VD2, ED]
|
: Graph[VD2, ED]
|
||||||
|
|
||||||
def deltaJoin[VD2: ClassManifest]
|
def deltaJoinVertices(newVerts: VertexRDD[VD], changedVerts: VertexRDD[VD]): Graph[VD, ED]
|
||||||
(updates: VertexRDD[VD2])(updateF: (Vid, VD, VD2) => VD): Graph[VD, ED]
|
|
||||||
|
|
||||||
// Save a copy of the GraphOps object so there is always one unique GraphOps object
|
// Save a copy of the GraphOps object so there is always one unique GraphOps object
|
||||||
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
|
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
|
||||||
|
|
|
@ -103,7 +103,12 @@ object Pregel {
|
||||||
// compute the messages
|
// compute the messages
|
||||||
val messages = g.mapReduceTriplets(sendMsg, mergeMsg) // broadcast & aggregation
|
val messages = g.mapReduceTriplets(sendMsg, mergeMsg) // broadcast & aggregation
|
||||||
// receive the messages
|
// receive the messages
|
||||||
g = g.deltaJoin(messages)(vprog).cache() // updating the graph
|
val newVerts = g.vertices.zipJoin(messages)(vprog).cache() // updating the vertices
|
||||||
|
val changedVerts = g.vertices.diff(newVerts)
|
||||||
|
println("Replicating %d changed vertices instead of %d total vertices".format(
|
||||||
|
changedVerts.count, newVerts.count))
|
||||||
|
// replicate the changed vertices
|
||||||
|
g = graph.deltaJoinVertices(newVerts, changedVerts)
|
||||||
// count the iteration
|
// count the iteration
|
||||||
i += 1
|
i += 1
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,6 +177,12 @@ class VertexRDD[@specialized VD: ClassManifest](
|
||||||
def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexRDD[VD2] =
|
def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexRDD[VD2] =
|
||||||
this.mapVertexPartitions(_.map(f))
|
this.mapVertexPartitions(_.map(f))
|
||||||
|
|
||||||
|
def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
|
||||||
|
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
|
||||||
|
thisPart.diff(otherPart)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inner join this VertexSet with another VertexSet which has the
|
* Inner join this VertexSet with another VertexSet which has the
|
||||||
* same Index. This function will fail if both VertexSets do not
|
* same Index. This function will fail if both VertexSets do not
|
||||||
|
@ -268,13 +274,6 @@ class VertexRDD[@specialized VD: ClassManifest](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def deltaJoin[VD2: ClassManifest](other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD): VertexRDD[VD] =
|
|
||||||
{
|
|
||||||
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
|
|
||||||
thisPart.deltaJoin(otherPart)(f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def aggregateUsingIndex[VD2: ClassManifest](
|
def aggregateUsingIndex[VD2: ClassManifest](
|
||||||
messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
|
messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
|
||||||
{
|
{
|
||||||
|
|
|
@ -28,25 +28,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
@transient val vertices: VertexRDD[VD],
|
@transient val vertices: VertexRDD[VD],
|
||||||
@transient val edges: EdgeRDD[ED],
|
@transient val edges: EdgeRDD[ED],
|
||||||
@transient val vertexPlacement: VertexPlacement,
|
@transient val vertexPlacement: VertexPlacement,
|
||||||
@transient val prevVTableReplicated: Option[VTableReplicated[VD]] = None)
|
@transient val vTableReplicated: VTableReplicated[VD])
|
||||||
extends Graph[VD, ED] {
|
extends Graph[VD, ED] {
|
||||||
|
|
||||||
//def this() = this(null, null, null, null)
|
|
||||||
|
|
||||||
def this(
|
def this(
|
||||||
vertices: RDD[VertexPartition[VD]],
|
vertices: VertexRDD[VD],
|
||||||
edges: RDD[(Pid, EdgePartition[ED])],
|
edges: EdgeRDD[ED],
|
||||||
vertexPlacement: VertexPlacement) = {
|
vertexPlacement: VertexPlacement) = {
|
||||||
this(new VertexRDD(vertices), new EdgeRDD(edges), vertexPlacement)
|
this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement))
|
||||||
}
|
}
|
||||||
|
|
||||||
@transient private val vTableReplicated: VTableReplicated[VD] =
|
|
||||||
new VTableReplicated(vertices, edges, vertexPlacement, prevVTableReplicated)
|
|
||||||
|
|
||||||
/** Return a RDD of edges. */
|
|
||||||
// @transient override val edges: RDD[Edge[ED]] =
|
|
||||||
// edges.mapPartitions(_.next()._2.iterator, true)
|
|
||||||
|
|
||||||
/** Return a RDD that brings edges with its source and destination vertices together. */
|
/** Return a RDD that brings edges with its source and destination vertices together. */
|
||||||
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
|
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
|
||||||
val vdManifest = classManifest[VD]
|
val vdManifest = classManifest[VD]
|
||||||
|
@ -269,11 +260,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
new GraphImpl(newVTable, edges, vertexPlacement)
|
new GraphImpl(newVTable, edges, vertexPlacement)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def deltaJoin[VD2: ClassManifest]
|
override def deltaJoinVertices(
|
||||||
(updates: VertexRDD[VD2])(updateF: (Vid, VD, VD2) => VD): Graph[VD, ED] =
|
newVerts: VertexRDD[VD],
|
||||||
{
|
changedVerts: VertexRDD[VD]): Graph[VD, ED] = {
|
||||||
val newVTable = vertices.deltaJoin(updates)(updateF)
|
val newVTableReplicated = new VTableReplicated(
|
||||||
new GraphImpl(newVTable, edges, vertexPlacement, Some(vTableReplicated))
|
changedVerts, edges, vertexPlacement, Some(vTableReplicated))
|
||||||
|
new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
|
||||||
}
|
}
|
||||||
} // end of class GraphImpl
|
} // end of class GraphImpl
|
||||||
|
|
||||||
|
|
|
@ -89,6 +89,21 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
|
||||||
new VertexPartition(index, values, newMask)
|
new VertexPartition(index, values, newMask)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def diff(other: VertexPartition[VD]): VertexPartition[VD] = {
|
||||||
|
assert(index == other.index)
|
||||||
|
|
||||||
|
val newMask = mask & other.mask
|
||||||
|
|
||||||
|
var i = newMask.nextSetBit(0)
|
||||||
|
while (i >= 0) {
|
||||||
|
if (values(i) == other.values(i)) {
|
||||||
|
newMask.unset(i)
|
||||||
|
}
|
||||||
|
i = mask.nextSetBit(i + 1)
|
||||||
|
}
|
||||||
|
new VertexPartition[VD](index, other.values, newMask)
|
||||||
|
}
|
||||||
|
|
||||||
/** Inner join another VertexPartition. */
|
/** Inner join another VertexPartition. */
|
||||||
def join[VD2: ClassManifest, VD3: ClassManifest]
|
def join[VD2: ClassManifest, VD3: ClassManifest]
|
||||||
(other: VertexPartition[VD2])
|
(other: VertexPartition[VD2])
|
||||||
|
@ -110,29 +125,6 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Inner join another VertexPartition, only keeping values that change. */
|
|
||||||
def deltaJoin[VD2: ClassManifest]
|
|
||||||
(other: VertexPartition[VD2])
|
|
||||||
(f: (Vid, VD, VD2) => VD): VertexPartition[VD] =
|
|
||||||
{
|
|
||||||
assert(index == other.index)
|
|
||||||
|
|
||||||
val newValues = new Array[VD](capacity)
|
|
||||||
val newMask = mask & other.mask
|
|
||||||
|
|
||||||
var i = newMask.nextSetBit(0)
|
|
||||||
while (i >= 0) {
|
|
||||||
newValues(i) = f(index.getValue(i), values(i), other.values(i))
|
|
||||||
// Only set the mask if the value changes (we are using precise comparison here).
|
|
||||||
// TODO: Use delta comparison for double type.
|
|
||||||
if (newValues(i) == values(i)) {
|
|
||||||
newMask.unset(i)
|
|
||||||
}
|
|
||||||
i = mask.nextSetBit(i + 1)
|
|
||||||
}
|
|
||||||
new VertexPartition[VD](index, newValues, newMask)
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Left outer join another VertexPartition. */
|
/** Left outer join another VertexPartition. */
|
||||||
def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
|
def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
|
||||||
(other: VertexPartition[VD2])
|
(other: VertexPartition[VD2])
|
||||||
|
|
Loading…
Reference in a new issue