Now with style. Addressing most of Reynolds comments.

This commit is contained in:
Joseph E. Gonzalez 2013-11-18 22:44:23 -08:00
parent 2093a17ff3
commit 983810ad69
6 changed files with 93 additions and 60 deletions

View file

@ -205,7 +205,17 @@ object Analytics extends Logging {
/** /**
* Compute the number of triangles passing through each vertex. * Compute the number of triangles passing through each vertex.
* *
* @param graph * The algorithm is relatively straightforward and can be computed in
* three steps:
*
* 1) Compute the set of neighbors for each vertex
* 2) For each edge compute the intersection of the sets and send the
* count to both vertices.
* 3) Compute the sum at each vertex and divide by two since each
* triangle is counted twice.
*
*
* @param graph a graph with `sourceId` less than `destId`
* @tparam VD * @tparam VD
* @tparam ED * @tparam ED
* @return * @return
@ -218,11 +228,13 @@ object Analytics extends Logging {
// Construct set representations of the neighborhoods // Construct set representations of the neighborhoods
val nbrSets: VertexSetRDD[VertexSet] = val nbrSets: VertexSetRDD[VertexSet] =
graph.collectNeighborIds(EdgeDirection.Both).mapValuesWithKeys { (vid, nbrs) => graph.collectNeighborIds(EdgeDirection.Both).mapValuesWithKeys { (vid, nbrs) =>
val set = new VertexSet//(math.ceil(nbrs.size/0.7).toInt) val set = new VertexSet
var i = 0 var i = 0
while (i < nbrs.size) { while (i < nbrs.size) {
// prevent self cycle // prevent self cycle
if(nbrs(i) != vid) set.add(nbrs(i)) if(nbrs(i) != vid) {
set.add(nbrs(i))
}
i += 1 i += 1
} }
set set
@ -235,9 +247,11 @@ object Analytics extends Logging {
def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Array[(Vid, Int)] = { def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Array[(Vid, Int)] = {
assert(et.srcAttr != null) assert(et.srcAttr != null)
assert(et.dstAttr != null) assert(et.dstAttr != null)
val (smallSet, largeSet) = val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
if (et.srcAttr.size < et.dstAttr.size) { (et.srcAttr, et.dstAttr) } (et.srcAttr, et.dstAttr)
else { (et.dstAttr, et.srcAttr) } } else {
(et.dstAttr, et.srcAttr)
}
val iter = smallSet.iterator() val iter = smallSet.iterator()
var counter: Int = 0 var counter: Int = 0
while (iter.hasNext) { while (iter.hasNext) {
@ -247,14 +261,14 @@ object Analytics extends Logging {
Array((et.srcId, counter), (et.dstId, counter)) Array((et.srcId, counter), (et.dstId, counter))
} }
// compute the intersection along edges // compute the intersection along edges
val counters: VertexSetRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _+_) val counters: VertexSetRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice // Merge counters with the graph and divide by two since each triangle is counted twice
graph.outerJoinVertices(counters) { graph.outerJoinVertices(counters) {
(vid, _, optCounter: Option[Int]) => (vid, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0) val dblCount = optCounter.getOrElse(0)
// double count should be even (divisible by two) // double count should be even (divisible by two)
assert((dblCount & 1) == 0 ) assert((dblCount & 1) == 0)
dblCount/2 dblCount / 2
} }
} // end of TriangleCount } // end of TriangleCount

View file

@ -61,7 +61,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* type Color = Int * type Color = Int
* val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv") * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv")
* val numInvalid = graph.edgesWithVertices() * val numInvalid = graph.edgesWithVertices()
* .map(e => if(e.src.data == e.dst.data) 1 else 0).sum * .map(e => if (e.src.data == e.dst.data) 1 else 0).sum
* }}} * }}}
* *
* @see edges() If only the edge data and adjacent vertex ids are * @see edges() If only the edge data and adjacent vertex ids are
@ -110,7 +110,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
* val root = 42 * val root = 42
* var bfsGraph = rawGraph * var bfsGraph = rawGraph
* .mapVertices[Int]((vid, data) => if(vid == root) 0 else Math.MaxValue) * .mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
* }}} * }}}
* *
*/ */
@ -349,16 +349,15 @@ object Graph {
} }
/** /**
* Construct a graph from a collection of edges encoded as vertex id * Construct a graph from a collection of edges encoded as vertex id pairs.
* pairs.
* *
* @param rawEdges the RDD containing the set of edges in the graph * @param rawEdges the RDD containing the set of edges in the graph
* *
* @return a graph with edge attributes containing the count of * @return a graph with edge attributes containing the count of duplicate edges.
* duplicate edges.
*/ */
def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): Graph[VD, Int] = {
Graph[VD, Int] = { Graph(rawEdges, defaultValue, false) } Graph(rawEdges, defaultValue, false)
}
/** /**
* Construct a graph from a collection of edges encoded as vertex id * Construct a graph from a collection of edges encoded as vertex id
@ -377,7 +376,7 @@ object Graph {
def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD, uniqueEdges: Boolean): def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD, uniqueEdges: Boolean):
Graph[VD, Int] = { Graph[VD, Int] = {
val graph = GraphImpl(rawEdges.map(p => Edge(p._1, p._2, 1)), defaultValue) val graph = GraphImpl(rawEdges.map(p => Edge(p._1, p._2, 1)), defaultValue)
if(uniqueEdges) { if (uniqueEdges) {
graph.groupEdges((a,b) => a+b) graph.groupEdges((a,b) => a+b)
} else { } else {
graph graph

View file

@ -158,13 +158,18 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
VertexSetRDD[Array[Vid]] = { VertexSetRDD[Array[Vid]] = {
val nbrs = val nbrs =
if (edgeDirection == EdgeDirection.Both) { if (edgeDirection == EdgeDirection.Both) {
graph.mapReduceTriplets[Array[Vid]] ( graph.mapReduceTriplets[Array[Vid]](
et => Array( (et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), _ ++ _ mapFunc = et => Array((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _
) )
} else if (edgeDirection == EdgeDirection.Out) { } else if (edgeDirection == EdgeDirection.Out) {
graph.mapReduceTriplets[Array[Vid]](et => Array((et.srcId, Array(et.dstId))), _ ++ _) graph.mapReduceTriplets[Array[Vid]](
mapFunc = et => Array((et.srcId, Array(et.dstId))),
reduceFunc = _ ++ _)
} else if (edgeDirection == EdgeDirection.In) { } else if (edgeDirection == EdgeDirection.In) {
graph.mapReduceTriplets[Array[Vid]](et => Array((et.dstId, Array(et.srcId))), _ ++ _) graph.mapReduceTriplets[Array[Vid]](
mapFunc = et => Array((et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _)
} else { } else {
throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.") throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.")
} }

View file

@ -171,7 +171,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
var ind = bs.nextSetBit(0) var ind = bs.nextSetBit(0)
while(ind >= 0) { while(ind >= 0) {
val k = index.getValueSafe(ind) val k = index.getValueSafe(ind)
if( cleanPred( (k, oldValues(ind)) ) ) { if (cleanPred((k, oldValues(ind)))) {
newBS.set(ind) newBS.set(ind)
} }
ind = bs.nextSetBit(ind+1) ind = bs.nextSetBit(ind+1)
@ -278,7 +278,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
def zipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, W) => Z): def zipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, W) => Z):
VertexSetRDD[Z] = { VertexSetRDD[Z] = {
val cleanF = index.rdd.context.clean(f) val cleanF = index.rdd.context.clean(f)
if(index != other.index) { if (index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
} }
val newValuesRDD: RDD[ (Array[Z], BitSet) ] = val newValuesRDD: RDD[ (Array[Z], BitSet) ] =
@ -315,7 +315,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]): def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]):
RDD[Z] = { RDD[Z] = {
val cleanF = index.rdd.context.clean(f) val cleanF = index.rdd.context.clean(f)
if(index != other.index) { if (index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
} }
index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) => index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) =>
@ -351,7 +351,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
*/ */
def leftZipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, Option[W]) => Z): def leftZipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, Option[W]) => Z):
VertexSetRDD[Z] = { VertexSetRDD[Z] = {
if(index != other.index) { if (index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
} }
val cleanF = index.rdd.context.clean(f) val cleanF = index.rdd.context.clean(f)

View file

@ -12,30 +12,30 @@ import org.apache.spark.graph.util.GraphGenerators
object GridPageRank { object GridPageRank {
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
val outDegree = Array.fill(nRows * nCols)(0) val outDegree = Array.fill(nRows * nCols)(0)
// Convert row column address into vertex ids (row major order) // Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Int = r * nCols + c def sub2ind(r: Int, c: Int): Int = r * nCols + c
// Make the grid graph // Make the grid graph
for(r <- 0 until nRows; c <- 0 until nCols){ for (r <- 0 until nRows; c <- 0 until nCols) {
val ind = sub2ind(r,c) val ind = sub2ind(r,c)
if(r+1 < nRows) { if (r+1 < nRows) {
outDegree(ind) += 1 outDegree(ind) += 1
inNbrs(sub2ind(r+1,c)) += ind inNbrs(sub2ind(r+1,c)) += ind
} }
if(c+1 < nCols) { if (c+1 < nCols) {
outDegree(ind) += 1 outDegree(ind) += 1
inNbrs(sub2ind(r,c+1)) += ind inNbrs(sub2ind(r,c+1)) += ind
} }
} }
// compute the pagerank // compute the pagerank
var pr = Array.fill(nRows * nCols)(resetProb) var pr = Array.fill(nRows * nCols)(resetProb)
for(iter <- 0 until nIter) { for (iter <- 0 until nIter) {
val oldPr = pr val oldPr = pr
pr = new Array[Double](nRows * nCols) pr = new Array[Double](nRows * nCols)
for(ind <- 0 until (nRows * nCols)) { for (ind <- 0 until (nRows * nCols)) {
pr(ind) = resetProb + (1.0 - resetProb) * pr(ind) = resetProb + (1.0 - resetProb) *
inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
} }
} }
@ -58,13 +58,13 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val resetProb = 0.15 val resetProb = 0.15
val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb) val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb)
val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb) val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb)
val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) => val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) =>
if (pr1 != pr2) { 1 } else { 0 } if (pr1 != pr2) { 1 } else { 0 }
}.map { case (vid, test) => test }.sum }.map { case (vid, test) => test }.sum
assert(notMatching === 0) assert(notMatching === 0)
prGraph2.vertices.foreach(println(_)) prGraph2.vertices.foreach(println(_))
val errors = prGraph2.vertices.map{ case (vid, pr) => val errors = prGraph2.vertices.map { case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) || val correct = (vid > 0 && pr == resetProb) ||
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
if ( !correct ) { 1 } else { 0 } if ( !correct ) { 1 } else { 0 }
@ -141,9 +141,12 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
} }
val ccMap = vertices.toMap val ccMap = vertices.toMap
println(ccMap) println(ccMap)
for( id <- 0 until 20 ) { for (id <- 0 until 20) {
if(id < 10) { assert(ccMap(id) === 0) } if (id < 10) {
else { assert(ccMap(id) === 10) } assert(ccMap(id) === 0)
} else {
assert(ccMap(id) === 10)
}
} }
} }
} // end of chain connected components } // end of chain connected components
@ -157,14 +160,20 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val ccGraph = Analytics.connectedComponents(twoChains).cache() val ccGraph = Analytics.connectedComponents(twoChains).cache()
val vertices = ccGraph.vertices.collect val vertices = ccGraph.vertices.collect
for ( (id, cc) <- vertices ) { for ( (id, cc) <- vertices ) {
if(id < 10) { assert(cc === 0) } if (id < 10) {
else { assert(cc === 10) } assert(cc === 0)
} else {
assert(cc === 10)
}
} }
val ccMap = vertices.toMap val ccMap = vertices.toMap
println(ccMap) println(ccMap)
for( id <- 0 until 20 ) { for ( id <- 0 until 20 ) {
if(id < 10) { assert(ccMap(id) === 0) } if (id < 10) {
else { assert(ccMap(id) === 10) } assert(ccMap(id) === 0)
} else {
assert(ccMap(id) === 10)
}
} }
} }
} // end of reverse chain connected components } // end of reverse chain connected components
@ -181,15 +190,18 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Count two triangles") { test("Count two triangles") {
withSpark(new SparkContext("local", "test")) { sc => withSpark(new SparkContext("local", "test")) { sc =>
val triangles = Array( 0L -> 1L, 1L -> 2L, 2L -> 0L ) ++ val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array( 0L -> -1L, -1L -> -2L, -2L -> 0L ) Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val rawEdges = sc.parallelize(triangles, 2) val rawEdges = sc.parallelize(triangles, 2)
val graph = Graph(rawEdges, true).cache val graph = Graph(rawEdges, true).cache
val triangleCount = Analytics.triangleCount(graph) val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => verts.collect.foreach { case (vid, count) =>
if(vid == 0) { assert(count === 2) } if (vid == 0) {
else { assert(count === 1) } assert(count === 2)
} else {
assert(count === 1)
}
} }
} }
} }
@ -197,24 +209,27 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Count two triangles with bi-directed edges") { test("Count two triangles with bi-directed edges") {
withSpark(new SparkContext("local", "test")) { sc => withSpark(new SparkContext("local", "test")) { sc =>
val triangles = val triangles =
Array( 0L -> 1L, 1L -> 2L, 2L -> 0L ) ++ Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array( 0L -> -1L, -1L -> -2L, -2L -> 0L ) Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val revTriangles = triangles.map { case (a,b) => (b,a) } val revTriangles = triangles.map { case (a,b) => (b,a) }
val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
val graph = Graph(rawEdges, true).cache val graph = Graph(rawEdges, true).cache
val triangleCount = Analytics.triangleCount(graph) val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => verts.collect.foreach { case (vid, count) =>
if(vid == 0) { assert(count === 4) } if (vid == 0) {
else { assert(count === 2) } assert(count === 4)
} else {
assert(count === 2)
}
} }
} }
} }
test("Count a single triangle with duplicate edges") { test("Count a single triangle with duplicate edges") {
withSpark(new SparkContext("local", "test")) { sc => withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ) ++ Array( 0L->1L, 1L->2L, 2L->0L ), 2) val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
val graph = Graph(rawEdges, true).cache val graph = Graph(rawEdges, true).cache
val triangleCount = Analytics.triangleCount(graph) val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices val verts = triangleCount.vertices

View file

@ -16,7 +16,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L) val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L)
val edges = sc.parallelize(rawEdges) val edges = sc.parallelize(rawEdges)
val graph = Graph(edges, 1.0F) val graph = Graph(edges, 1.0F)
assert( graph.edges.count() === rawEdges.size ) assert(graph.edges.count() === rawEdges.size)
} }
} }
@ -29,8 +29,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert( graph.edges.count() === rawEdges.size ) assert( graph.edges.count() === rawEdges.size )
assert( graph.vertices.count() === 100) assert( graph.vertices.count() === 100)
graph.triplets.map { et => graph.triplets.map { et =>
assert( (et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr) ) assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
assert( (et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr) ) assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
} }
} }
} }
@ -111,7 +111,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
nbrs.collect.foreach { case (vid, nbrs) => nbrs.collect.foreach { case (vid, nbrs) =>
val s = nbrs.toSet val s = nbrs.toSet
assert(s.contains((vid + 1) % 100)) assert(s.contains((vid + 1) % 100))
assert(s.contains(if (vid > 0) { vid - 1 } else { 99 })) assert(s.contains(if (vid > 0) vid - 1 else 99 ))
} }
} }
} }