completed port of several analytics packages as well as analytics main

This commit is contained in:
Joseph E. Gonzalez 2013-04-03 23:04:29 -07:00
parent 39cac0ae65
commit 91e1227edb
3 changed files with 61 additions and 114 deletions

View file

@ -26,25 +26,19 @@ object Analytics {
/** /**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/ */
def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], maxIter: Int) = { def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// Compute the out degree of each vertex // Compute the out degree of each vertex
val outDegree = graph.edges.map { val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
case Edge(src, target, data) => (src, 1)
}.reduceByKey(_ + _)
val pagerankGraph = graph.updateVertices[Int, (Int, Float)](outDegree,
(vertex, degIter) => (degIter.sum, 1.0F) (vertex, degIter) => (degIter.sum, 1.0F)
) )
GraphLab.iterateGAS[Float, (Int, Float), ED](pagerankGraph, GraphLab.iterateGAS[(Int, Float), ED, Float](pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.dst.data._1, // gather (me_id, edge) => edge.src.data._2 / edge.dst.data._1, // gather
(a: Float, b: Float) => a + b, // merge (a: Float, b: Float) => a + b, // merge
0F, 0F,
(vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply
maxIter).vertices.map { case Vertex(id, (degree, rank)) => (id, rank) } numIter).vertices.map{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
// println("Computed graph: #edges: " + graph_ret.numEdges + " #vertices" + graph_ret.numVertices)
// graph_ret.vertices.take(10).foreach(println)
} }
/*
/** /**
* Compute the connected component membership of each vertex * Compute the connected component membership of each vertex
@ -53,41 +47,26 @@ object Analytics {
* that vertex. * that vertex.
*/ */
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) }
val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) }) GraphLab.iterateGAS[Int, ED, Int](ccGraph)(
val edges = graph.edges // .mapValues(v => None)
val ccGraph = new Graph(vertices, edges)
ccGraph.iterateStatic(
(me_id, edge) => edge.otherVertex(me_id).data, // gather (me_id, edge) => edge.otherVertex(me_id).data, // gather
(a: Int, b: Int) => math.min(a, b), // merge (a: Int, b: Int) => math.min(a, b), // merge
Integer.MAX_VALUE, Integer.MAX_VALUE,
(v, a: Int) => math.min(v.data, a), // apply (v, a: Int) => math.min(v.data, a), // apply
numIter, numIter,
gatherEdges = EdgeDirection.Both).vertices gatherEdges = EdgeDirection.Both).vertices
//
// graph_ret.vertices.`.foreach(println)
// graph_ret.edges.take(10).foreach(println)
} }
/** /**
* Compute the shortest path to a set of markers * Compute the shortest path to a set of markers
*/ */
def shortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float], def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = {
sources: List[Int], numIter: Int) = {
val sourceSet = sources.toSet val sourceSet = sources.toSet
val vertices = graph.vertices.mapPartitions( val spGraph = graph.mapVertices {
iter => iter.map { case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) ) }
}); GraphLab.iterateGAS[Float, Float, Float](spGraph)(
val edges = graph.edges // .mapValues(v => None)
val spGraph = new Graph(vertices, edges)
val niterations = Int.MaxValue
spGraph.iterateStatic(
(me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
(a: Float, b: Float) => math.min(a, b), // merge (a: Float, b: Float) => math.min(a, b), // merge
Float.MaxValue, Float.MaxValue,
@ -96,7 +75,7 @@ object Analytics {
gatherEdges = EdgeDirection.In).vertices gatherEdges = EdgeDirection.In).vertices
} }
*/
// /** // /**
@ -266,7 +245,7 @@ object Analytics {
/*
taskType match { taskType match {
case "pagerank" => { case "pagerank" => {
@ -289,7 +268,7 @@ object Analytics {
if(!isDynamic && numIter == Int.MaxValue) { if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!") println("Set number of iterations!")
exit(1) sys.exit(1)
} }
println("======================================") println("======================================")
println("| PageRank |") println("| PageRank |")
@ -306,13 +285,13 @@ object Analytics {
graph.numEdgePartitions = numEPart graph.numEdgePartitions = numEPart
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis
Analytics.pageRank(graph, numIter) val pr = Analytics.pagerank(graph, numIter)
// val pr = if(isDynamic) Analytics.dynamicPageRank(graph, tol, numIter) // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// else Analytics.pageRank(graph, numIter) // else Analytics.pagerank(graph, numIter)
println("Total rank: " + pr.map(_._2).reduce(_+_)) println("Total rank: " + pr.map{ case Vertex(id,r) => r }.reduce(_+_) )
if(!outFname.isEmpty) { if(!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname) println("Saving pageranks of pages to " + outFname)
pr.map(p => p._1 + "\t" + p._2).saveAsTextFile(outFname) pr.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
} }
println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop() sc.stop()
@ -331,7 +310,7 @@ object Analytics {
if(!isDynamic && numIter == Int.MaxValue) { if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!") println("Set number of iterations!")
exit(1) sys.exit(1)
} }
println("======================================") println("======================================")
println("| Connected Components |") println("| Connected Components |")
@ -343,55 +322,56 @@ object Analytics {
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
val graph = Graph.textFile(sc, fname, a => 1.0F) val graph = Graph.textFile(sc, fname, a => 1.0F)
Analytics.connectedComponents(graph, numIter) val cc = Analytics.connectedComponents(graph, numIter)
// val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter) // else Analytics.connectedComponents(graph, numIter)
println("Components: " + cc.map(_._2).distinct()) println("Components: " + cc.map(_.data).distinct())
sc.stop() sc.stop()
} }
// case "shortestpath" => { case "shortestpath" => {
// var numIter = Int.MaxValue var numIter = Int.MaxValue
// var isDynamic = true var isDynamic = true
// var sources: List[Int] = List.empty var sources: List[Int] = List.empty
// options.foreach{ options.foreach{
// case ("numIter", v) => numIter = v.toInt case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean case ("dynamic", v) => isDynamic = v.toBoolean
// case ("source", v) => sources ++= List(v.toInt) case ("source", v) => sources ++= List(v.toInt)
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// } }
// if(!isDynamic && numIter == Int.MaxValue) { if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!") println("Set number of iterations!")
// exit(1) sys.exit(1)
// } }
// if(sources.isEmpty) { if(sources.isEmpty) {
// println("No sources provided!") println("No sources provided!")
// exit(1) sys.exit(1)
// } }
// println("======================================") println("======================================")
// println("| Shortest Path |") println("| Shortest Path |")
// println("--------------------------------------") println("--------------------------------------")
// println(" Using parameters:") println(" Using parameters:")
// println(" \tDynamic: " + isDynamic) println(" \tDynamic: " + isDynamic)
// println(" \tNumIter: " + numIter) println(" \tNumIter: " + numIter)
// println(" \tSources: [" + sources.mkString(", ") + "]") println(" \tSources: [" + sources.mkString(", ") + "]")
// println("======================================") println("======================================")
// val sc = new SparkContext(host, "ShortestPath(" + fname + ")") val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
val sp = Analytics.shortestPath(graph, sources, numIter)
// val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// else Analytics.shortestPath(graph, sources, numIter) // else Analytics.shortestPath(graph, sources, numIter)
// println("Longest Path: " + cc.map(_._2).reduce(math.max(_,_))) println("Longest Path: " + sp.map(_.data).reduce(math.max(_,_)))
// sc.stop() sc.stop()
// } }
// case "als" => { // case "als" => {
@ -449,7 +429,7 @@ object Analytics {
println("Invalid task type.") println("Invalid task type.")
} }
} }
*/
} }

View file

@ -287,39 +287,6 @@ object Graph {
}, preservesPartitioning = true) }, preservesPartitioning = true)
} }
/**
* Load a graph from a text file.
*/
def textFile[ED: Manifest](sc: SparkContext,
fname: String, edgeParser: Array[String] => ED) = {
// Parse the edge data table
val edges = sc.textFile(fname).map(
line => {
val lineArray = line.split("\\s+")
if(lineArray.length < 2) {
println("Invalid line: " + line)
assert(false)
}
val source = lineArray(0)
val target = lineArray(1)
val tail = lineArray.drop(2)
val edata = edgeParser(tail)
Edge(source.trim.toInt, target.trim.toInt, edata)
}).cache
// Parse the vertex data table
val vertices = edges.flatMap {
case Edge(source, target, _) => List((source, 1), (target, 1))
}.reduceByKey(_ + _).map(pair => Vertex(pair._1, pair._2))
val graph = new Graph[Int, ED](vertices, edges)
println("Loaded graph:" +
"\n\t#edges: " + graph.numEdges +
"\n\t#vertices: " + graph.numVertices)
graph
}
} }

View file

@ -6,14 +6,14 @@ import spark.RDD
object GraphLab { object GraphLab {
def iterateGAS[A: ClassManifest, VD: ClassManifest, ED: ClassManifest]( def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
graph: Graph[VD, ED],
gather: (Vid, EdgeWithVertices[VD, ED]) => A, gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A, merge: (A, A) => A,
default: A, default: A,
apply: (Vertex[VD], A) => VD, apply: (Vertex[VD], A) => VD,
numIter: Int, numIter: Int,
gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) = { gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) :
Graph[VD, ED] = {
var g = graph.mapVertices(v => Vertex(v.id, VDataWithAcc(v.data, default))).cache() var g = graph.mapVertices(v => Vertex(v.id, VDataWithAcc(v.data, default))).cache()