Merge pull request #100 from ankurdave/mrTriplets-active-set

Support activeSet option in mapReduceTriplets
This commit is contained in:
Joey 2013-12-16 17:19:25 -08:00
commit 0476c84c51
13 changed files with 429 additions and 247 deletions

View file

@ -45,20 +45,18 @@ class EdgeRDD[@specialized ED: ClassManifest](
def mapEdgePartitions[ED2: ClassManifest](f: EdgePartition[ED] => EdgePartition[ED2])
: EdgeRDD[ED2]= {
val cleanF = sparkContext.clean(f)
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, cleanF(ep)))
Iterator(Tuple2(pid, f(ep)))
}, preservesPartitioning = true))
}
def zipEdgePartitions[T: ClassManifest, U: ClassManifest]
(other: RDD[T])
(f: (EdgePartition[ED], Iterator[T]) => Iterator[U]): RDD[U] = {
val cleanF = sparkContext.clean(f)
partitionsRDD.zipPartitions(other, preservesPartitioning = true) { (ePartIter, otherIter) =>
val (_, edgePartition) = ePartIter.next()
cleanF(edgePartition, otherIter)
f(edgePartition, otherIter)
}
}

View file

@ -13,7 +13,7 @@ import org.apache.spark.graph.impl.VertexPartition
* tried specializing I got a warning about inherenting from a type
* that is not a trait.
*/
class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] {
class EdgeTriplet[VD, ED] extends Edge[ED] {
// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest,
// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] {
@ -28,8 +28,8 @@ class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] {
*/
var dstAttr: VD = _ //nullValue[VD]
def srcMask: Boolean = vPart.isDefined(srcId)
def dstMask: Boolean = vPart.isDefined(dstId)
var srcStale: Boolean = false
var dstStale: Boolean = false
/**
* Set the edge properties of this triplet.

View file

@ -227,6 +227,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* be commutative and assosciative and is used to combine the output
* of the map phase.
*
* @param activeSet optionally, a set of "active" vertices and a direction of edges to consider
* when running `mapFunc`. For example, if the direction is Out, `mapFunc` will only be run on
* edges originating from vertices in the active set. `activeSet` must have the same index as the
* graph's vertices.
*
* @example We can use this function to compute the inDegree of each
* vertex
* {{{
@ -244,7 +249,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
*/
def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A)
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]
/**
@ -280,8 +286,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
(mapFunc: (Vid, VD, Option[U]) => VD2)
: Graph[VD2, ED]
def deltaJoinVertices(changedVerts: VertexRDD[VD]): Graph[VD, ED]
// Save a copy of the GraphOps object so there is always one unique GraphOps object
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
val ops = new GraphOps(this)

View file

@ -2,7 +2,6 @@ package org.apache.spark.graph
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.util.ClosureCleaner
import org.apache.spark.SparkException
@ -116,9 +115,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
dir: EdgeDirection)
: VertexRDD[A] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// Define a new map function over edge triplets
val mf = (et: EdgeTriplet[VD,ED]) => {
// Compute the message to the dst vertex
@ -140,7 +136,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
}
}
ClosureCleaner.clean(mf)
graph.mapReduceTriplets(mf, reduceFunc)
} // end of aggregateNeighbors
@ -233,7 +228,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
*/
def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
: Graph[VD, ED] = {
ClosureCleaner.clean(mapFunc)
val uf = (id: Vid, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)

View file

@ -91,29 +91,22 @@ object Pregel {
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
def sendMsgFun(edge: EdgeTriplet[VD, ED]): Iterator[(Vid, A)] = {
if (edge.srcMask) {
sendMsg(edge)
} else {
Iterator.empty
}
}
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
// compute the messages
var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache()
var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
var activeMessages = messages.count()
// Loop
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// receive the messages
val changedVerts = g.vertices.deltaJoin(messages)(vprog).cache() // updating the vertices
// replicate the changed vertices
g = g.deltaJoinVertices(changedVerts)
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Update the graph with the new vertices.
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
val oldMessages = messages
// compute the messages
messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache()
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
// get to send messages.
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
activeMessages = messages.count()
// after counting we can unpersist the old messages
oldMessages.unpersist(blocking=false)

View file

@ -60,7 +60,7 @@ class VertexRDD[@specialized VD: ClassManifest](
/**
* Construct a new VertexRDD that is indexed by only the keys in the RDD.
* The resulting VertexSet will be based on a different index and can
* The resulting VertexRDD will be based on a different index and can
* no longer be quickly joined with this RDD.
*/
def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
@ -113,8 +113,7 @@ class VertexRDD[@specialized VD: ClassManifest](
*/
def mapVertexPartitions[VD2: ClassManifest](f: VertexPartition[VD] => VertexPartition[VD2])
: VertexRDD[VD2] = {
val cleanF = sparkContext.clean(f)
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(cleanF), preservesPartitioning = true)
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
new VertexRDD(newPartitionsRDD)
}
@ -122,16 +121,15 @@ class VertexRDD[@specialized VD: ClassManifest](
* Return a new VertexRDD by applying a function to corresponding
* VertexPartitions of this VertexRDD and another one.
*/
private def zipVertexPartitions[VD2: ClassManifest, VD3: ClassManifest]
def zipVertexPartitions[VD2: ClassManifest, VD3: ClassManifest]
(other: VertexRDD[VD2])
(f: (VertexPartition[VD], VertexPartition[VD2]) => VertexPartition[VD3]): VertexRDD[VD3] = {
val cleanF = sparkContext.clean(f)
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
val thisPart = thisIter.next()
val otherPart = otherIter.next()
Iterator(cleanF(thisPart, otherPart))
Iterator(f(thisPart, otherPart))
}
new VertexRDD(newPartitionsRDD)
}
@ -159,7 +157,7 @@ class VertexRDD[@specialized VD: ClassManifest](
*
* @param f the function applied to each value in the RDD
* @return a new VertexRDD with values obtained by applying `f` to
* each of the entries in the original VertexSet. The resulting
* each of the entries in the original VertexRDD. The resulting
* VertexRDD retains the same index.
*/
def mapValues[VD2: ClassManifest](f: VD => VD2): VertexRDD[VD2] =
@ -173,12 +171,16 @@ class VertexRDD[@specialized VD: ClassManifest](
*
* @param f the function applied to each value in the RDD
* @return a new VertexRDD with values obtained by applying `f` to
* each of the entries in the original VertexSet. The resulting
* each of the entries in the original VertexRDD. The resulting
* VertexRDD retains the same index.
*/
def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
* Hides vertices that are the same between this and other. For vertices that are different, keeps
* the values from `other`.
*/
def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.diff(otherPart)
@ -199,24 +201,14 @@ class VertexRDD[@specialized VD: ClassManifest](
* this and the other vertex set to a new vertex attribute.
* @return a VertexRDD containing only the vertices in both this
* and the other VertexSet and with tuple attributes.
*
*/
def zipJoin[VD2: ClassManifest, VD3: ClassManifest]
(other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD3): VertexRDD[VD3] =
{
(other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD3): VertexRDD[VD3] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.join(otherPart)(f)
}
}
def deltaJoin[VD2: ClassManifest]
(other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD): VertexRDD[VD] =
{
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.deltaJoin(otherPart)(f)
}
}
/**
* Left join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
@ -236,31 +228,30 @@ class VertexRDD[@specialized VD: ClassManifest](
*
*/
def leftZipJoin[VD2: ClassManifest, VD3: ClassManifest]
(other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] =
{
(other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.leftJoin(otherPart)(f)
}
}
/**
* Left join this VertexSet with an RDD containing vertex attribute
* pairs. If the other RDD is backed by a VertexSet with the same
* Left join this VertexRDD with an RDD containing vertex attribute
* pairs. If the other RDD is backed by a VertexRDD with the same
* index than the efficient leftZipJoin implementation is used. The
* resulting vertex set contains an entry for each vertex in this
* set. If the other VertexSet is missing any vertex in this
* VertexSet then a `None` attribute is generated.
* set. If the other VertexRDD is missing any vertex in this
* VertexRDD then a `None` attribute is generated.
*
* If there are duplicates, the vertex is picked at random.
*
* @tparam VD2 the attribute type of the other VertexSet
* @tparam VD3 the attribute type of the resulting VertexSet
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
*
* @param other the other VertexSet with which to join.
* @param other the other VertexRDD with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @return a VertexRDD containing all the vertices in this
* VertexSet with the attribute emitted by f.
* VertexRDD with the attribute emitted by f.
*/
def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
(other: RDD[(Vid, VD2)])
@ -284,13 +275,47 @@ class VertexRDD[@specialized VD: ClassManifest](
}
}
/**
* Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other`
* must have the same index.
*/
def innerZipJoin[U: ClassManifest, VD2: ClassManifest](other: VertexRDD[U])
(f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.innerJoin(otherPart)(f)
}
}
/**
* Replace vertices with corresponding vertices in `other`, and drop vertices without a
* corresponding vertex in `other`.
*/
def innerJoin[U: ClassManifest, VD2: ClassManifest](other: RDD[(Vid, U)])
(f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match {
case other: VertexRDD[_] =>
innerZipJoin(other)(f)
case _ =>
new VertexRDD(
partitionsRDD.zipPartitions(
other.partitionBy(this.partitioner.get), preservesPartitioning = true)
{ (part, msgs) =>
val vertexPartition: VertexPartition[VD] = part.next()
Iterator(vertexPartition.innerJoin(msgs)(f))
}
)
}
}
def aggregateUsingIndex[VD2: ClassManifest](
messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
{
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
val vertextPartition: VertexPartition[VD] = thisIter.next()
Iterator(vertextPartition.aggregateUsingIndex(msgIter, reduceFunc))
val vertexPartition: VertexPartition[VD] = thisIter.next()
Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
}
new VertexRDD[VD2](parts)
}
@ -299,7 +324,7 @@ class VertexRDD[@specialized VD: ClassManifest](
/**
* The VertexRDD singleton is used to construct VertexSets
* The VertexRDD singleton is used to construct VertexRDDs
*/
object VertexRDD {

View file

@ -166,24 +166,29 @@ object PageRank extends Logging {
.mapVertices((vid, degree) => resetProb).cache()
var numDeltas: Long = ranks.count()
var prevDeltas: Option[VertexRDD[Double]] = None
var i = 0
val weight = (1.0 - resetProb)
while (numDeltas > 0) {
// Compute new deltas
// Compute new deltas. Only deltas that existed in the last round (i.e., were greater than
// `tol`) get to send messages; those that were less than `tol` would send messages less than
// `tol` as well.
val deltas = deltaGraph
.mapReduceTriplets[Double](
et => {
if (et.srcMask) Iterator((et.dstId, et.srcAttr * et.attr * weight))
else Iterator.empty
},
_ + _)
et => Iterator((et.dstId, et.srcAttr * et.attr * weight)),
_ + _,
prevDeltas.map((_, EdgeDirection.Out)))
.filter { case (vid, delta) => delta > tol }
.cache()
prevDeltas = Some(deltas)
numDeltas = deltas.count()
logInfo("Standalone PageRank: iter %d has %d deltas".format(i, numDeltas))
// Apply deltas. Sets the mask for each vertex to false if it does not appear in deltas.
deltaGraph = deltaGraph.deltaJoinVertices(deltas).cache()
// Update deltaGraph with the deltas
deltaGraph = deltaGraph.outerJoinVertices(deltas) { (vid, old, newOpt) =>
newOpt.getOrElse(old)
}.cache()
// Update ranks
ranks = ranks.leftZipJoin(deltas) { (vid, oldRank, deltaOpt) =>

View file

@ -47,21 +47,20 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
this(vertices, edges, new VertexPlacement(edges, vertices))
}
/** Return a RDD that brings edges with its source and destination vertices together. */
/** Return a RDD that brings edges together with their source and destination vertices. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
val vdManifest = classManifest[VD]
val edManifest = classManifest[ED]
edges.zipEdgePartitions(vTableReplicated.bothAttrs) { (edgePartition, vTableReplicatedIter) =>
val (_, vPart) = vTableReplicatedIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, edgePartition)(vdManifest, edManifest)
edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (ePart, vPartIter) =>
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest)
}
}
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
vertices.persist(newLevel)
edges.persist(newLevel)
vertexPlacement.persist(newLevel)
this
}
@ -149,29 +148,38 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
println("\n\nvertexPlacement.bothAttrs -------------------------------")
traverseLineage(vertexPlacement.bothAttrs, " ", visited)
visited += (vertexPlacement.bothAttrs.id -> "vertexPlacement.bothAttrs")
println("\n\nvTableReplicated.bothAttrs ----------------")
traverseLineage(vTableReplicated.bothAttrs, " ", visited)
visited += (vTableReplicated.bothAttrs.id -> "vTableReplicated.bothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
println(visited)
} // end of printLineage
override def reverse: Graph[VD, ED] =
new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement)
new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement, vTableReplicated)
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] =
new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement)
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
if (classManifest[VD] equals classManifest[VD2]) {
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f))
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newVTableReplicated = new VTableReplicated[VD2](
changedVerts, edges, vertexPlacement,
Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
} else {
// The map does not preserve type, so we must re-replicate all vertices
new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement)
}
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] =
new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement)
new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement, vTableReplicated)
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
// manifest from GraphImpl (which would require serializing GraphImpl).
val vdManifest = classManifest[VD]
val newETable =
edges.zipEdgePartitions(vTableReplicated.bothAttrs) { (edgePartition, vTableReplicatedIter) =>
edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (edgePartition, vTableReplicatedIter) =>
val (pid, vPart) = vTableReplicatedIter.next()
val et = new EdgeTriplet[VD, ED]
val newEdgePartition = edgePartition.map { e =>
@ -182,7 +190,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
Iterator((pid, newEdgePartition))
}
new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement)
new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement, vTableReplicated)
}
override def subgraph(
@ -210,7 +218,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
ClosureCleaner.clean(merge)
val newETable = edges.mapEdgePartitions(_.groupEdges(merge))
new GraphImpl(vertices, newETable, vertexPlacement)
new GraphImpl(vertices, newETable, vertexPlacement, vTableReplicated)
}
//////////////////////////////////////////////////////////////////////////////////////////////////
@ -219,7 +227,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A): VertexRDD[A] = {
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
@ -228,23 +237,42 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// in the relevant position in an edge.
val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
val vs = vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
val vs = activeSetOpt match {
case Some((activeSet, _)) => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
case None => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
}
val activeDirectionOpt = activeSetOpt.map(_._2)
// Map and combine.
val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) =>
val (_, vertexPartition) = vTableReplicatedIter.next()
// Iterate over the partition
val et = new EdgeTriplet[VD, ED](vertexPartition)
val et = new EdgeTriplet[VD, ED]
val filteredEdges = edgePartition.iterator.flatMap { e =>
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vertexPartition(e.srcId)
// Ensure the edge is adjacent to a vertex in activeSet if necessary
val adjacent = activeDirectionOpt match {
case Some(EdgeDirection.In) =>
vertexPartition.isActive(e.dstId)
case Some(EdgeDirection.Out) =>
vertexPartition.isActive(e.srcId)
case Some(EdgeDirection.Both) =>
vertexPartition.isActive(e.srcId) && vertexPartition.isActive(e.dstId)
case None =>
true
}
if (mapUsesDstAttr) {
et.dstAttr = vertexPartition(e.dstId)
if (adjacent) {
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vertexPartition(e.srcId)
}
if (mapUsesDstAttr) {
et.dstAttr = vertexPartition(e.dstId)
}
mapFunc(et)
} else {
Iterator.empty
}
mapFunc(et)
}
// Note: This doesn't allow users to send messages to arbitrary vertices.
vertexPartition.aggregateUsingIndex(filteredEdges, reduceFunc).iterator
@ -255,22 +283,20 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
} // end of mapReduceTriplets
override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
(updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
ClosureCleaner.clean(updateF)
val newVTable = vertices.leftJoin(updates)(updateF)
new GraphImpl(newVTable, edges, vertexPlacement)
}
override def deltaJoinVertices(changedVerts: VertexRDD[VD]): Graph[VD, ED] = {
val newVerts = vertices.leftZipJoin(changedVerts) { (vid, oldAttr, newAttrOpt) =>
newAttrOpt match {
case Some(newAttr) => newAttr
case None => oldAttr
}
(updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
if (classManifest[VD] equals classManifest[VD2]) {
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(updates)(updateF)
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newVTableReplicated = new VTableReplicated[VD2](
changedVerts, edges, vertexPlacement,
Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(updates)(updateF)
new GraphImpl(newVerts, edges, vertexPlacement)
}
val newVTableReplicated = new VTableReplicated(
changedVerts, edges, vertexPlacement, Some(vTableReplicated))
new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
}
private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {

View file

@ -7,26 +7,47 @@ import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
import org.apache.spark.graph._
/**
* Stores the vertex attribute values after they are replicated.
* A view of the vertices after they are shipped to the join sites specified in
* `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated`
* is specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise,
* a fresh view is created.
*
* The view is always cached (i.e., once it is created, it remains materialized). This avoids
* constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
* example.
*/
private[impl]
class VTableReplicated[VD: ClassManifest](
vTable: VertexRDD[VD],
eTable: EdgeRDD[_],
updatedVerts: VertexRDD[VD],
edges: EdgeRDD[_],
vertexPlacement: VertexPlacement,
prevVTableReplicated: Option[VTableReplicated[VD]] = None) {
val bothAttrs: RDD[(Pid, VertexPartition[VD])] =
createVTableReplicated(vTable, eTable, vertexPlacement, true, true)
/**
* Within each edge partition, create a local map from vid to an index into the attribute
* array. Each map contains a superset of the vertices that it will receive, because it stores
* vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/
private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match {
case Some(prev) =>
prev.localVidMap
case None =>
edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
epart.foreach { e =>
vidToIndex.add(e.srcId)
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
}, preservesPartitioning = true).cache().setName("VTableReplicated localVidMap")
}
val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] =
createVTableReplicated(vTable, eTable, vertexPlacement, true, false)
val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] =
createVTableReplicated(vTable, eTable, vertexPlacement, false, true)
val noAttrs: RDD[(Pid, VertexPartition[VD])] =
createVTableReplicated(vTable, eTable, vertexPlacement, false, false)
private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
private lazy val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(true, false)
private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true)
private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false)
def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
(includeSrc, includeDst) match {
@ -37,57 +58,60 @@ class VTableReplicated[VD: ClassManifest](
}
}
private def createVTableReplicated(
vTable: VertexRDD[VD],
eTable: EdgeRDD[_],
vertexPlacement: VertexPlacement,
includeSrcAttr: Boolean,
includeDstAttr: Boolean): RDD[(Pid, VertexPartition[VD])] = {
def get(
includeSrc: Boolean,
includeDst: Boolean,
actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = {
val placement = vertexPlacement.get(includeSrcAttr, includeDstAttr)
// Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
// includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
// shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
// also shipped there.
val shippedActives = vertexPlacement.get(true, true)
.zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _))
.partitionBy(edges.partitioner.get)
// Update vTableReplicated with shippedActives, setting activeness flags in the resulting
// VertexPartitions
get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
val (pid, vPart) = viewIter.next()
val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
Iterator((pid, newPart))
}
}
private def create(includeSrc: Boolean, includeDst: Boolean)
: RDD[(Pid, VertexPartition[VD])] = {
val vdManifest = classManifest[VD]
// Send each edge partition the vertex attributes it wants, as specified in
// vertexPlacement
val msgsByPartition = placement.zipPartitions(vTable.partitionsRDD)(VTableReplicated.buildBuffer(_, _)(vdManifest))
.partitionBy(eTable.partitioner.get).cache()
// Ship vertex attributes to edge partitions according to vertexPlacement
val verts = updatedVerts.partitionsRDD
val shippedVerts = vertexPlacement.get(includeSrc, includeDst)
.zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest))
.partitionBy(edges.partitioner.get)
// TODO: Consider using a specialized shuffler.
prevVTableReplicated match {
case Some(vTableReplicated) =>
val prev: RDD[(Pid, VertexPartition[VD])] =
vTableReplicated.get(includeSrcAttr, includeDstAttr)
val prevView: RDD[(Pid, VertexPartition[VD])] =
vTableReplicated.get(includeSrc, includeDst)
prev.zipPartitions(msgsByPartition) { (vTableIter, msgsIter) =>
val (pid, vertexPartition) = vTableIter.next()
val newVPart = vertexPartition.updateUsingIndex(msgsIter.flatMap(_._2.iterator))(vdManifest)
// Update vTableReplicated with shippedVerts, setting staleness flags in the resulting
// VertexPartitions
prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) =>
val (pid, prevVPart) = prevViewIter.next()
val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
Iterator((pid, newVPart))
}.cache().setName("VTableReplicated delta %s %s".format(includeSrcAttr, includeDstAttr))
}.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst))
case None =>
// Within each edge partition, create a local map from vid to an index into
// the attribute array. Each map contains a superset of the vertices that it
// will receive, because it stores vids from both the source and destination
// of edges. It must always include both source and destination vids because
// some operations, such as GraphImpl.mapReduceTriplets, rely on this.
val localVidMap = eTable.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
epart.foreach { e =>
vidToIndex.add(e.srcId)
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
}, preservesPartitioning = true).cache()
// Within each edge partition, place the vertex attributes received from
// msgsByPartition into the correct locations specified in localVidMap
localVidMap.zipPartitions(msgsByPartition) { (mapIter, msgsIter) =>
// Within each edge partition, place the shipped vertex attributes into the correct
// locations specified in localVidMap
localVidMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = vdManifest.newArray(vidToIndex.capacity)
for ((_, block) <- msgsIter) {
for ((_, block) <- shippedVertsIter) {
for (i <- 0 until block.vids.size) {
val vid = block.vids(i)
val attr = block.attrs(i)
@ -95,17 +119,18 @@ class VTableReplicated[VD: ClassManifest](
vertexArray(ind) = attr
}
}
Iterator((pid, new VertexPartition(vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)))
}.cache().setName("VTableReplicated %s %s".format(includeSrcAttr, includeDstAttr))
val newVPart = new VertexPartition(
vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)
Iterator((pid, newVPart))
}.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst))
}
}
}
object VTableReplicated {
def buildBuffer[VD: ClassManifest](pid2vidIter: Iterator[Array[Array[Vid]]], vertexPartIter: Iterator[VertexPartition[VD]]) = {
protected def buildBuffer[VD: ClassManifest](
pid2vidIter: Iterator[Array[Array[Vid]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = {
val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
val vertexPart: VertexPartition[VD] = vertexPartIter.next()
@ -126,6 +151,29 @@ object VTableReplicated {
(pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
}
}
protected def buildActiveBuffer(
pid2vidIter: Iterator[Array[Array[Vid]]],
activePartIter: Iterator[VertexPartition[_]])
: Iterator[(Int, Array[Vid])] = {
val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
val activePart: VertexPartition[_] = activePartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
val actives = new PrimitiveVector[Vid](vidsCandidate.size)
var i = 0
while (i < size) {
val vid = vidsCandidate(i)
if (activePart.isDefined(vid)) {
actives += vid
}
i += 1
}
(pid, actives.trim().array)
}
}
}
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) {

View file

@ -32,7 +32,9 @@ private[graph]
class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
val index: VertexIdToIndexMap,
val values: Array[VD],
val mask: BitSet)
val mask: BitSet,
/** A set of vids of active vertices. May contain vids not in index due to join rewrite. */
private val activeSet: Option[VertexSet] = None)
extends Logging {
val capacity: Int = index.capacity
@ -47,6 +49,11 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
pos >= 0 && mask.get(pos)
}
/** Look up vid in activeSet, throwing an exception if it is None. */
def isActive(vid: Vid): Boolean = {
activeSet.get.contains(vid)
}
/**
* Pass each vertex attribute along with the vertex id through a map
* function and retain the original RDD's partitioning and index.
@ -57,7 +64,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
* attribute in the RDD
*
* @return a new VertexPartition with values obtained by applying `f` to
* each of the entries in the original VertexSet. The resulting
* each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
def map[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexPartition[VD2] = {
@ -94,19 +101,25 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
new VertexPartition(index, values, newMask)
}
/**
* Hides vertices that are the same between this and other. For vertices that are different, keeps
* the values from `other`. The indices of `this` and `other` must be the same.
*/
def diff(other: VertexPartition[VD]): VertexPartition[VD] = {
assert(index == other.index)
val newMask = mask & other.mask
var i = newMask.nextSetBit(0)
while (i >= 0) {
if (values(i) == other.values(i)) {
newMask.unset(i)
if (index != other.index) {
logWarning("Diffing two VertexPartitions with different indexes is slow.")
diff(createUsingIndex(other.iterator))
} else {
val newMask = mask & other.mask
var i = newMask.nextSetBit(0)
while (i >= 0) {
if (values(i) == other.values(i)) {
newMask.unset(i)
}
i = newMask.nextSetBit(i + 1)
}
i = mask.nextSetBit(i + 1)
new VertexPartition(index, other.values, newMask)
}
new VertexPartition[VD](index, other.values, newMask)
}
/** Inner join another VertexPartition. */
@ -130,30 +143,6 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
}
}
/** Inner join another VertexPartition. */
def deltaJoin[VD2: ClassManifest, VD3: ClassManifest]
(other: VertexPartition[VD2])
(f: (Vid, VD, VD2) => VD3): VertexPartition[VD3] =
{
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
join(createUsingIndex(other.iterator))(f)
} else {
val newValues = new Array[VD3](capacity)
val newMask = mask & other.mask
var i = newMask.nextSetBit(0)
while (i >= 0) {
newValues(i) = f(index.getValue(i), values(i), other.values(i))
if (newValues(i) == values(i)) {
newMask.unset(i)
}
i = mask.nextSetBit(i + 1)
}
new VertexPartition(index, newValues, newMask)
}
}
/** Left outer join another VertexPartition. */
def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
(other: VertexPartition[VD2])
@ -181,6 +170,32 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
leftJoin(createUsingIndex(other))(f)
}
/** Inner join another VertexPartition. */
def innerJoin[U: ClassManifest, VD2: ClassManifest](other: VertexPartition[U])
(f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
innerJoin(createUsingIndex(other.iterator))(f)
}
val newMask = mask & other.mask
val newValues = new Array[VD2](capacity)
var i = newMask.nextSetBit(0)
while (i >= 0) {
newValues(i) = f(index.getValue(i), values(i), other.values(i))
i = newMask.nextSetBit(i + 1)
}
new VertexPartition(index, newValues, newMask)
}
/**
* Inner join an iterator of messages.
*/
def innerJoin[U: ClassManifest, VD2: ClassManifest]
(iter: Iterator[Product2[Vid, U]])
(f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
innerJoin(createUsingIndex(iter))(f)
}
/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
@ -196,17 +211,20 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
new VertexPartition[VD2](index, newValues, newMask)
}
def updateUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]])
: VertexPartition[VD2] = {
/**
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask.
*/
def innerJoinKeepLeft(iter: Iterator[Product2[Vid, VD]]): VertexPartition[VD] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
val newValues = new Array[VD](capacity)
System.arraycopy(values, 0, newValues, 0, newValues.length)
iter.foreach { case (vid, vdata) =>
val pos = index.getPos(vid)
newMask.set(pos)
newValues(pos) = vdata
}
new VertexPartition[VD2](index, newValues, newMask)
new VertexPartition(index, newValues, newMask)
}
def aggregateUsingIndex[VD2: ClassManifest](
@ -228,6 +246,12 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
new VertexPartition[VD2](index, newValues, newMask)
}
def replaceActives(iter: Iterator[Vid]): VertexPartition[VD] = {
val newActiveSet = new VertexSet
iter.foreach(newActiveSet.add(_))
new VertexPartition(index, values, mask, Some(newActiveSet))
}
/**
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
@ -241,4 +265,6 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
}
def iterator: Iterator[(Vid, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind)))
def vidIterator: Iterator[Vid] = mask.iterator.map(ind => index.getValue(ind))
}

View file

@ -7,8 +7,10 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.collection.PrimitiveVector
/**
* Stores the layout of replicated vertex attributes for GraphImpl. Tells each
* partition of the vertex data where it should go.
* Stores the locations of edge-partition join sites for each vertex attribute in `vTable`; that is,
* the routing information for shipping vertex attributes to edge partitions. This is always cached
* because it may be used multiple times in VTableReplicated -- once to ship the vertex attributes
* and (possibly) once to ship the active-set information.
*/
class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
@ -25,13 +27,6 @@ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
case (false, false) => noAttrs
}
def persist(newLevel: StorageLevel) {
bothAttrs.persist(newLevel)
srcAttrOnly.persist(newLevel)
dstAttrOnly.persist(newLevel)
noAttrs.persist(newLevel)
}
private def createPid2Vid(
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.

View file

@ -35,12 +35,38 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
test("core operations") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 5
val star = Graph.fromEdgeTuples(
sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
// triplets
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
(1 to n).map(x => (0: Vid, x: Vid, "v", "v")).toSet)
// reverse
val reverseStar = star.reverse
assert(reverseStar.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet)
// outerJoinVertices
val reverseStarDegrees =
reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0)))
// mapVertices preserving type
val mappedVAttrs = reverseStar.mapVertices((vid, attr) => attr + "2")
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: Vid, "v2")).toSet)
// mapVertices changing type
val mappedVAttrs2 = reverseStar.mapVertices((vid, attr) => attr.length)
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet)
}
}
test("mapEdges") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
val star = Graph.fromEdgeTuples(
sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))),
"defaultValue")
sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v")
val starWithEdgeAttrs = star.mapEdges(e => e.dstId)
// map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25
@ -52,13 +78,42 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mapReduceTriplets") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
val n = 5
val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0)
val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
val neighborDegreeSums = starDeg.mapReduceTriplets(
edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
(a: Int, b: Int) => a + b)
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
// activeSetOpt
val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid)
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
val vids = complete.mapVertices((vid, attr) => vid).cache()
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
val numEvenNeighbors = vids.mapReduceTriplets(et => {
// Map function should only run on edges with destination in the active set
if (et.dstId % 2 != 0) {
throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet)
// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0)
.mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_)
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
// Map function should only run on edges with source in the active set
if (et.srcId % 2 != 1) {
throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet)
}
}
@ -151,34 +206,4 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}
test("deltaJoinVertices") {
withSpark(new SparkContext("local", "test")) { sc =>
// Create a star graph of 10 vertices
val n = 10
val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v1").cache()
// Modify only vertices whose vids are even
val changedVerts = star.vertices.filter(_._1 % 2 == 0).mapValues((vid, attr) => "v2")
// Apply the modification to the graph
val changedStar = star.deltaJoinVertices(changedVerts)
val newVertices = star.vertices.leftZipJoin(changedVerts) { (vid, oldVd, newVdOpt) =>
newVdOpt match {
case Some(newVd) => newVd
case None => oldVd
}
}
// The graph's vertices should be correct
assert(changedStar.vertices.collect().toSet === newVertices.collect().toSet)
// Send the leaf attributes to the center
val sums = changedStar.mapReduceTriplets(
edge => Iterator((edge.srcId, Set(edge.dstAttr))),
(a: Set[String], b: Set[String]) => a ++ b)
assert(sums.collect().toSet === Set((0, Set("v1", "v2"))))
}
}
}

View file

@ -0,0 +1,43 @@
package org.apache.spark.graph
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.graph.LocalSparkContext._
import org.apache.spark.rdd._
class PregelSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
test("1 iteration") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 5
val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
val result = Pregel(star, 0)(
(vid, attr, msg) => attr,
et => Iterator.empty,
(a: Int, b: Int) => throw new Exception("mergeMsg run unexpectedly"))
assert(result.vertices.collect.toSet === star.vertices.collect.toSet)
}
}
test("chain propagation") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 5
val chain = Graph.fromEdgeTuples(
sc.parallelize((1 until n).map(x => (x: Vid, x + 1: Vid)), 3),
0).cache()
assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: Vid, 0)).toSet)
val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }
assert(chainWithSeed.vertices.collect.toSet === Set((1: Vid, 1)) ++ (2 to n).map(x => (x: Vid, 0)).toSet)
val result = Pregel(chainWithSeed, 0)(
(vid, attr, msg) => math.max(msg, attr),
et => Iterator((et.dstId, et.srcAttr)),
(a: Int, b: Int) => math.max(a, b))
assert(result.vertices.collect.toSet ===
chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet)
}
}
}