Merging local changes to @rxin graph branch.

This commit is contained in:
Joseph E. Gonzalez 2013-08-06 12:29:21 -07:00
commit 0704d85823
2 changed files with 398 additions and 400 deletions

View file

@ -1,15 +1,14 @@
package spark.graph package spark.graph
import spark._ import spark._
import spark.SparkContext._
// import breeze.linalg._
object Analytics extends Logging { object Analytics extends Logging {
def main(args: Array[String]) { // def main(args: Array[String]) {
//pregelPagerank() // //pregelPagerank()
} // }
// /** // /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
@ -41,387 +40,390 @@ object Analytics extends Logging {
/** /**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/ */
def pregelPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
numIter: Int,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex // Compute the out degree of each vertex
val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees, val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0) (vertex, deg) => (deg.getOrElse(0), 1.0)
) )
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(
(vertex, a: Double) => (vertex.data._1, (0.15 + 0.85 * a)), // apply (vertex, a: Double) => (vertex.data._1, (resetProb + (1.0 - resetProb) * a)), // apply
(me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather (me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather
(a: Double, b: Double) => a + b, // merge (a: Double, b: Double) => a + b, // merge
1.0, 1.0,
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => r } numIter).mapVertices{ case Vertex(id, (outDeg, r)) => r }
} }
// /** /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
// */ */
// def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
// tol: Float, maxIter: Int = 10) = { tol: Float,
// // Compute the out degree of each vertex maxIter: Int = Integer.MAX_VALUE,
// val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees, resetProb: Double = 0.15) = {
// (vertex, degIter) => (degIter.sum, 1.0F, 1.0F) // Compute the out degree of each vertex
// ) val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double, Double)](graph.outDegrees,
(vertex, degIter) => (degIter.sum, 1.0, 1.0)
)
// // Run PageRank // Run PageRank
// GraphLab.iterateGAS(pagerankGraph)( GraphLab.iterate(pagerankGraph)(
// (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
// (a: Float, b: Float) => a + b, (a: Double, b: Double) => a + b,
// (vertex, a: Option[Float]) => (vertex, a: Option[Double]) =>
// (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply (vertex.data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), vertex.data._2), // apply
// (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter (me_id, edge) => math.abs(edge.src.data._3 - edge.src.data._2) > tol, // scatter
// maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) } maxIter).mapVertices { case Vertex(vid, data) => data._2 }
// } }
// /**
// * Compute the connected component membership of each vertex
// * and return an RDD with the vertex value containing the
// * lowest vertex id in the connected component containing
// * that vertex.
// */
// def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) }
// GraphLab.iterateGA[Int, ED, Int](ccGraph)(
// (me_id, edge) => edge.otherVertex(me_id).data, // gather
// (a: Int, b: Int) => math.min(a, b), // merge
// (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply
// numIter,
// gatherDirection = EdgeDirection.Both)
// }
// /**
// * Compute the shortest path to a set of markers
// */
// def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = {
// val sourceSet = sources.toSet
// val spGraph = graph.mapVertices {
// case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
// }
// GraphLab.iterateGA[Float, Float, Float](spGraph)(
// (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
// (a: Float, b: Float) => math.min(a, b), // merge
// (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
// numIter,
// gatherDirection = EdgeDirection.In)
// }
// // /**
// // * Compute the connected component membership of each vertex
// // * and return an RDD with the vertex value containing the
// // * lowest vertex id in the connected component containing
// // * that vertex.
// // */
// // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
// // numIter: Int = Int.MaxValue) = {
// // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) })
// // val edges = graph.edges // .mapValues(v => None)
// // val ccGraph = new Graph(vertices, edges)
// // ccGraph.iterateDynamic(
// // (me_id, edge) => edge.otherVertex(me_id).data, // gather
// // (a: Int, b: Int) => math.min(a, b), // merge
// // Integer.MAX_VALUE,
// // (v, a: Int) => math.min(v.data, a), // apply
// // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter
// // numIter,
// // gatherEdges = EdgeDirection.Both,
// // scatterEdges = EdgeDirection.Both).vertices
// // //
// // // graph_ret.vertices.collect.foreach(println)
// // // graph_ret.edges.take(10).foreach(println)
// // }
/**
* Compute the connected component membership of each vertex
* and return an RDD with the vertex value containing the
* lowest vertex id in the connected component containing
* that vertex.
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = {
val ccGraph = graph.mapVertices { case Vertex(vid, _) => vid }
GraphLab.iterate[Int, ED, Int](ccGraph)(
(me_id, edge) => edge.otherVertex(me_id).data, // gather
(a: Int, b: Int) => math.min(a, b), // merge
(v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply
(me_id, edge) => (edge.vertex(me_id).data < edge.otherVertex(me_id).data), // scatter
gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both
)
}
// /**
// * Compute the shortest path to a set of markers
// */
// def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = {
// val sourceSet = sources.toSet
// val spGraph = graph.mapVertices {
// case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
// }
// GraphLab.iterateGA[Float, Float, Float](spGraph)(
// (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
// (a: Float, b: Float) => math.min(a, b), // merge
// (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
// numIter,
// gatherDirection = EdgeDirection.In)
// }
// // /**
// // * Compute the connected component membership of each vertex
// // * and return an RDD with the vertex value containing the
// // * lowest vertex id in the connected component containing
// // * that vertex.
// // */
// // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
// // numIter: Int = Int.MaxValue) = {
// // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) })
// // val edges = graph.edges // .mapValues(v => None)
// // val ccGraph = new Graph(vertices, edges)
// // ccGraph.iterateDynamic(
// // (me_id, edge) => edge.otherVertex(me_id).data, // gather
// // (a: Int, b: Int) => math.min(a, b), // merge
// // Integer.MAX_VALUE,
// // (v, a: Int) => math.min(v.data, a), // apply
// // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter
// // numIter,
// // gatherEdges = EdgeDirection.Both,
// // scatterEdges = EdgeDirection.Both).vertices
// // //
// // // graph_ret.vertices.collect.foreach(println)
// // // graph_ret.edges.take(10).foreach(println)
// // }
// // /** // // /**
// // * Compute the shortest path to a set of markers // // * Compute the shortest path to a set of markers
// // */ // // */
// // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float], // // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float],
// // sources: List[Int], numIter: Int) = { // // sources: List[Int], numIter: Int) = {
// // val sourceSet = sources.toSet // // val sourceSet = sources.toSet
// // val vertices = graph.vertices.mapPartitions( // // val vertices = graph.vertices.mapPartitions(
// // iter => iter.map { // // iter => iter.map {
// // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) ) // // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) )
// // }); // // });
// // val edges = graph.edges // .mapValues(v => None) // // val edges = graph.edges // .mapValues(v => None)
// // val spGraph = new Graph(vertices, edges) // // val spGraph = new Graph(vertices, edges)
// // val niterations = Int.MaxValue // // val niterations = Int.MaxValue
// // spGraph.iterateDynamic( // // spGraph.iterateDynamic(
// // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather // // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
// // (a: Float, b: Float) => math.min(a, b), // merge // // (a: Float, b: Float) => math.min(a, b), // merge
// // Float.MaxValue, // // Float.MaxValue,
// // (v, a: Float) => math.min(v.data, a), // apply // // (v, a: Float) => math.min(v.data, a), // apply
// // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter // // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter
// // numIter, // // numIter,
// // gatherEdges = EdgeDirection.In, // // gatherEdges = EdgeDirection.In,
// // scatterEdges = EdgeDirection.Out).vertices // // scatterEdges = EdgeDirection.Out).vertices
// // } // // }
// // /**
// // *
// // */
// // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double],
// // latentK: Int, lambda: Double, numIter: Int) = {
// // val vertices = graph.vertices.mapPartitions( _.map {
// // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } )
// // }).cache
// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// // val edges = graph.edges // .mapValues(v => None)
// // val alsGraph = new Graph(vertices, edges)
// // alsGraph.numVPart = graph.numVPart
// // alsGraph.numEPart = graph.numEPart
// // val niterations = Int.MaxValue
// // alsGraph.iterateDynamic[(Array[Double], Array[Double])](
// // (me_id, edge) => { // gather
// // val X = edge.otherVertex(me_id).data
// // val y = edge.data
// // val Xy = X.map(_ * y)
// // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray
// // (Xy, XtX)
// // },
// // (a, b) => {
// // // The difference between the while loop and the zip is a FACTOR OF TWO in overall
// // // runtime
// // var i = 0
// // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 }
// // i = 0
// // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 }
// // a
// // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r })
// // },
// // (Array.empty[Double], Array.empty[Double]), // default value is empty
// // (vertex, accum) => { // apply
// // val XyArray = accum._1
// // val XtXArray = accum._2
// // if(XyArray.isEmpty) vertex.data // no neighbors
// // else {
// // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) =>
// // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) +
// // (if(i == j) lambda else 1.0F) //regularization
// // }
// // val Xy = DenseMatrix.create(latentK,1,XyArray)
// // val w = XtX \ Xy
// // w.data
// // }
// // },
// // (me_id, edge) => true,
// // numIter,
// // gatherEdges = EdgeDirection.Both,
// // scatterEdges = EdgeDirection.Both,
// // vertex => vertex.id < maxUser).vertices
// // }
// def main(args: Array[String]) = {
// val host = args(0)
// val taskType = args(1)
// val fname = args(2)
// val options = args.drop(3).map { arg =>
// arg.dropWhile(_ == '-').split('=') match {
// case Array(opt, v) => (opt -> v)
// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
// }
// }
// System.setProperty("spark.serializer", "spark.KryoSerializer")
// //System.setProperty("spark.shuffle.compress", "false")
// System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
// taskType match {
// case "pagerank" => {
// var numIter = Int.MaxValue
// var isDynamic = false
// var tol:Float = 0.001F
// var outFname = ""
// var numVPart = 4
// var numEPart = 4
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("tol", v) => tol = v.toFloat
// case ("output", v) => outFname = v
// case ("numVPart", v) => numVPart = v.toInt
// case ("numEPart", v) => numEPart = v.toInt
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// println("======================================")
// println("| PageRank |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// if(isDynamic) println(" \t |-> Tolerance: " + tol)
// println(" \tNumIter: " + numIter)
// println("======================================")
// val sc = new SparkContext(host, "PageRank(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
// val startTime = System.currentTimeMillis
// logInfo("GRAPHX: starting tasks")
// logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
// logInfo("GRAPHX: Number of edges " + graph.edges.count)
// val pr = Analytics.pagerank(graph, numIter)
// // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// // else Analytics.pagerank(graph, numIter)
// logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
// if (!outFname.isEmpty) {
// println("Saving pageranks of pages to " + outFname)
// pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
// }
// logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
// sc.stop()
// }
// case "cc" => {
// var numIter = Int.MaxValue
// var isDynamic = false
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// println("======================================")
// println("| Connected Components |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// println(" \tNumIter: " + numIter)
// println("======================================")
// val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0F)
// val cc = Analytics.connectedComponents(graph, numIter)
// // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// // else Analytics.connectedComponents(graph, numIter)
// println("Components: " + cc.vertices.map(_.data).distinct())
// sc.stop()
// }
// case "shortestpath" => {
// var numIter = Int.MaxValue
// var isDynamic = true
// var sources: List[Int] = List.empty
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("source", v) => sources ++= List(v.toInt)
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// // /** // if(!isDynamic && numIter == Int.MaxValue) {
// // * // println("Set number of iterations!")
// // */ // sys.exit(1)
// // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double], // }
// // latentK: Int, lambda: Double, numIter: Int) = {
// // val vertices = graph.vertices.mapPartitions( _.map {
// // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } )
// // }).cache
// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// // val edges = graph.edges // .mapValues(v => None)
// // val alsGraph = new Graph(vertices, edges)
// // alsGraph.numVPart = graph.numVPart
// // alsGraph.numEPart = graph.numEPart
// // val niterations = Int.MaxValue // if(sources.isEmpty) {
// // alsGraph.iterateDynamic[(Array[Double], Array[Double])]( // println("No sources provided!")
// // (me_id, edge) => { // gather // sys.exit(1)
// // val X = edge.otherVertex(me_id).data // }
// // val y = edge.data
// // val Xy = X.map(_ * y)
// // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray
// // (Xy, XtX)
// // },
// // (a, b) => {
// // // The difference between the while loop and the zip is a FACTOR OF TWO in overall
// // // runtime
// // var i = 0
// // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 }
// // i = 0
// // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 }
// // a
// // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r })
// // },
// // (Array.empty[Double], Array.empty[Double]), // default value is empty
// // (vertex, accum) => { // apply
// // val XyArray = accum._1
// // val XtXArray = accum._2
// // if(XyArray.isEmpty) vertex.data // no neighbors
// // else {
// // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) =>
// // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) +
// // (if(i == j) lambda else 1.0F) //regularization
// // }
// // val Xy = DenseMatrix.create(latentK,1,XyArray)
// // val w = XtX \ Xy
// // w.data
// // }
// // },
// // (me_id, edge) => true,
// // numIter,
// // gatherEdges = EdgeDirection.Both,
// // scatterEdges = EdgeDirection.Both,
// // vertex => vertex.id < maxUser).vertices
// // }
// def main(args: Array[String]) = { // println("======================================")
// val host = args(0) // println("| Shortest Path |")
// val taskType = args(1) // println("--------------------------------------")
// val fname = args(2) // println(" Using parameters:")
// val options = args.drop(3).map { arg => // println(" \tDynamic: " + isDynamic)
// arg.dropWhile(_ == '-').split('=') match { // println(" \tNumIter: " + numIter)
// case Array(opt, v) => (opt -> v) // println(" \tSources: [" + sources.mkString(", ") + "]")
// case _ => throw new IllegalArgumentException("Invalid argument: " + arg) // println("======================================")
// }
// }
// System.setProperty("spark.serializer", "spark.KryoSerializer") // val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
// //System.setProperty("spark.shuffle.compress", "false") // val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
// System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator") // val sp = Analytics.shortestPath(graph, sources, numIter)
// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// // else Analytics.shortestPath(graph, sources, numIter)
// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
// taskType match { // sc.stop()
// case "pagerank" => { // }
// var numIter = Int.MaxValue
// var isDynamic = false
// var tol:Float = 0.001F
// var outFname = ""
// var numVPart = 4
// var numEPart = 4
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("tol", v) => tol = v.toFloat
// case ("output", v) => outFname = v
// case ("numVPart", v) => numVPart = v.toInt
// case ("numEPart", v) => numEPart = v.toInt
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// println("======================================")
// println("| PageRank |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// if(isDynamic) println(" \t |-> Tolerance: " + tol)
// println(" \tNumIter: " + numIter)
// println("======================================")
// val sc = new SparkContext(host, "PageRank(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
// val startTime = System.currentTimeMillis
// logInfo("GRAPHX: starting tasks")
// logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
// logInfo("GRAPHX: Number of edges " + graph.edges.count)
// val pr = Analytics.pagerank(graph, numIter)
// // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// // else Analytics.pagerank(graph, numIter)
// logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
// if (!outFname.isEmpty) {
// println("Saving pageranks of pages to " + outFname)
// pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
// }
// logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
// sc.stop()
// }
// case "cc" => {
// var numIter = Int.MaxValue
// var isDynamic = false
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// println("======================================")
// println("| Connected Components |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// println(" \tNumIter: " + numIter)
// println("======================================")
// val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0F)
// val cc = Analytics.connectedComponents(graph, numIter)
// // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// // else Analytics.connectedComponents(graph, numIter)
// println("Components: " + cc.vertices.map(_.data).distinct())
// sc.stop()
// }
// case "shortestpath" => {
// var numIter = Int.MaxValue
// var isDynamic = true
// var sources: List[Int] = List.empty
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("source", v) => sources ++= List(v.toInt)
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) { // // case "als" => {
// println("Set number of iterations!")
// sys.exit(1)
// }
// if(sources.isEmpty) { // // var numIter = 5
// println("No sources provided!") // // var lambda = 0.01
// sys.exit(1) // // var latentK = 10
// } // // var usersFname = "usersFactors.tsv"
// // var moviesFname = "moviesFname.tsv"
// // var numVPart = 4
// // var numEPart = 4
// println("======================================") // // options.foreach{
// println("| Shortest Path |") // // case ("numIter", v) => numIter = v.toInt
// println("--------------------------------------") // // case ("lambda", v) => lambda = v.toDouble
// println(" Using parameters:") // // case ("latentK", v) => latentK = v.toInt
// println(" \tDynamic: " + isDynamic) // // case ("usersFname", v) => usersFname = v
// println(" \tNumIter: " + numIter) // // case ("moviesFname", v) => moviesFname = v
// println(" \tSources: [" + sources.mkString(", ") + "]") // // case ("numVPart", v) => numVPart = v.toInt
// println("======================================") // // case ("numEPart", v) => numEPart = v.toInt
// // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// // }
// val sc = new SparkContext(host, "ShortestPath(" + fname + ")") // // println("======================================")
// val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) // // println("| Alternating Least Squares |")
// val sp = Analytics.shortestPath(graph, sources, numIter) // // println("--------------------------------------")
// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) // // println(" Using parameters:")
// // else Analytics.shortestPath(graph, sources, numIter) // // println(" \tNumIter: " + numIter)
// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) // // println(" \tLambda: " + lambda)
// // println(" \tLatentK: " + latentK)
// // println(" \tusersFname: " + usersFname)
// // println(" \tmoviesFname: " + moviesFname)
// // println("======================================")
// sc.stop() // // val sc = new SparkContext(host, "ALS(" + fname + ")")
// } // // val graph = Graph.textFile(sc, fname, a => a(0).toDouble )
// // graph.numVPart = numVPart
// // graph.numEPart = numEPart
// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
// // assert(maxUser < minMovie)
// // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
// // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
// // .saveAsTextFile(usersFname)
// // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
// // .saveAsTextFile(moviesFname)
// // sc.stop()
// // }
// // case "als" => { // case _ => {
// println("Invalid task type.")
// // var numIter = 5 // }
// // var lambda = 0.01 // }
// // var latentK = 10 // }
// // var usersFname = "usersFactors.tsv"
// // var moviesFname = "moviesFname.tsv"
// // var numVPart = 4
// // var numEPart = 4
// // options.foreach{
// // case ("numIter", v) => numIter = v.toInt
// // case ("lambda", v) => lambda = v.toDouble
// // case ("latentK", v) => latentK = v.toInt
// // case ("usersFname", v) => usersFname = v
// // case ("moviesFname", v) => moviesFname = v
// // case ("numVPart", v) => numVPart = v.toInt
// // case ("numEPart", v) => numEPart = v.toInt
// // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// // }
// // println("======================================")
// // println("| Alternating Least Squares |")
// // println("--------------------------------------")
// // println(" Using parameters:")
// // println(" \tNumIter: " + numIter)
// // println(" \tLambda: " + lambda)
// // println(" \tLatentK: " + latentK)
// // println(" \tusersFname: " + usersFname)
// // println(" \tmoviesFname: " + moviesFname)
// // println("======================================")
// // val sc = new SparkContext(host, "ALS(" + fname + ")")
// // val graph = Graph.textFile(sc, fname, a => a(0).toDouble )
// // graph.numVPart = numVPart
// // graph.numEPart = numEPart
// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
// // assert(maxUser < minMovie)
// // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
// // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
// // .saveAsTextFile(usersFname)
// // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
// // .saveAsTextFile(moviesFname)
// // sc.stop()
// // }
// case _ => {
// println("Invalid task type.")
// }
// }
// }
// /** // /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
@ -503,8 +505,6 @@ object Analytics extends Logging {
// } // }
// /** // /**
// * Compute the shortest path to a set of markers // * Compute the shortest path to a set of markers
// */ // */
@ -532,8 +532,6 @@ object Analytics extends Logging {
// } // }
// /** // /**
// * // *
// */ // */

View file

@ -33,12 +33,12 @@ object GraphLab {
* @tparam A The type accumulated during the gather phase * @tparam A The type accumulated during the gather phase
* @return the resulting graph after the algorithm converges * @return the resulting graph after the algorithm converges
*/ */
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A, gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A, mergeFunc: (A, A) => A,
applyFunc: (Vertex[VD], Option[A]) => VD, applyFunc: (Vertex[VD], Option[A]) => VD,
scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean, scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
numIter: Int, numIter: Int = Integer.MAX_VALUE,
gatherDirection: EdgeDirection = EdgeDirection.In, gatherDirection: EdgeDirection = EdgeDirection.In,
scatterDirection: EdgeDirection = EdgeDirection.Out): Graph[VD, ED] = { scatterDirection: EdgeDirection = EdgeDirection.Out): Graph[VD, ED] = {