Refactored the Graph API for discussion.

This commit is contained in:
Reynold Xin 2013-05-05 17:12:30 -07:00
parent f54bc544c5
commit 70ba4d1740
15 changed files with 1053 additions and 930 deletions

View file

@ -7,32 +7,32 @@ import spark.SparkContext._
object Analytics extends Logging {
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
// /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
// */
// // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// // // Compute the out degree of each vertex
// // val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
// // (vertex, deg) => (deg.getOrElse(0), 1.0F)
// // )
// // GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
// // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
// // (a: Float, b: Float) => a + b, // merge
// // (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
// // numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
// // }
// def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// // Compute the out degree of each vertex
// val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
// (vertex, deg) => (deg.getOrElse(0), 1.0F)
// )
// GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
// GraphLab.iterateGA2[(Int, Float), ED, Float](pagerankGraph)(
// (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
// (a: Float, b: Float) => a + b, // merge
// (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
// 0.0F, // default
// (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply
// numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
// }
def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0F)
)
GraphLab.iterateGA2[(Int, Float), ED, Float](pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Float, b: Float) => a + b, // merge
0.0F, // default
(vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
}
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
@ -50,57 +50,25 @@ object Analytics extends Logging {
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
}
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
tol: Float, maxIter: Int = 10) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees,
(vertex, degIter) => (degIter.sum, 1.0F, 1.0F)
)
// /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
// */
// def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
// tol: Float, maxIter: Int = 10) = {
// // Compute the out degree of each vertex
// val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees,
// (vertex, degIter) => (degIter.sum, 1.0F, 1.0F)
// )
// Run PageRank
GraphLab.iterateGAS(pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Float, b: Float) => a + b,
(vertex, a: Option[Float]) =>
(vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply
(me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter
maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) }
}
/**
* Compute the connected component membership of each vertex
* and return an RDD with the vertex value containing the
* lowest vertex id in the connected component containing
* that vertex.
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) }
GraphLab.iterateGA[Int, ED, Int](ccGraph)(
(me_id, edge) => edge.otherVertex(me_id).data, // gather
(a: Int, b: Int) => math.min(a, b), // merge
(v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply
numIter,
gatherDirection = EdgeDirection.Both)
}
/**
* Compute the shortest path to a set of markers
*/
def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = {
val sourceSet = sources.toSet
val spGraph = graph.mapVertices {
case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
}
GraphLab.iterateGA[Float, Float, Float](spGraph)(
(me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
(a: Float, b: Float) => math.min(a, b), // merge
(v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
numIter,
gatherDirection = EdgeDirection.In)
}
// // Run PageRank
// GraphLab.iterateGAS(pagerankGraph)(
// (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
// (a: Float, b: Float) => a + b,
// (vertex, a: Option[Float]) =>
// (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply
// (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter
// maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) }
// }
// /**
// * Compute the connected component membership of each vertex
@ -108,314 +76,346 @@ object Analytics extends Logging {
// * lowest vertex id in the connected component containing
// * that vertex.
// */
// def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
// numIter: Int = Int.MaxValue) = {
// val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) })
// val edges = graph.edges // .mapValues(v => None)
// val ccGraph = new Graph(vertices, edges)
// ccGraph.iterateDynamic(
// def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) }
// GraphLab.iterateGA[Int, ED, Int](ccGraph)(
// (me_id, edge) => edge.otherVertex(me_id).data, // gather
// (a: Int, b: Int) => math.min(a, b), // merge
// Integer.MAX_VALUE,
// (v, a: Int) => math.min(v.data, a), // apply
// (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter
// (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply
// numIter,
// gatherEdges = EdgeDirection.Both,
// scatterEdges = EdgeDirection.Both).vertices
// //
// // graph_ret.vertices.collect.foreach(println)
// // graph_ret.edges.take(10).foreach(println)
// gatherDirection = EdgeDirection.Both)
// }
// /**
// * Compute the shortest path to a set of markers
// */
// def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float],
// sources: List[Int], numIter: Int) = {
// def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = {
// val sourceSet = sources.toSet
// val vertices = graph.vertices.mapPartitions(
// iter => iter.map {
// case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) )
// });
// val edges = graph.edges // .mapValues(v => None)
// val spGraph = new Graph(vertices, edges)
// val niterations = Int.MaxValue
// spGraph.iterateDynamic(
// val spGraph = graph.mapVertices {
// case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
// }
// GraphLab.iterateGA[Float, Float, Float](spGraph)(
// (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
// (a: Float, b: Float) => math.min(a, b), // merge
// Float.MaxValue,
// (v, a: Float) => math.min(v.data, a), // apply
// (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter
// (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
// numIter,
// gatherEdges = EdgeDirection.In,
// scatterEdges = EdgeDirection.Out).vertices
// gatherDirection = EdgeDirection.In)
// }
// // /**
// // * Compute the connected component membership of each vertex
// // * and return an RDD with the vertex value containing the
// // * lowest vertex id in the connected component containing
// // * that vertex.
// // */
// // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
// // numIter: Int = Int.MaxValue) = {
// // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) })
// // val edges = graph.edges // .mapValues(v => None)
// // val ccGraph = new Graph(vertices, edges)
// // ccGraph.iterateDynamic(
// // (me_id, edge) => edge.otherVertex(me_id).data, // gather
// // (a: Int, b: Int) => math.min(a, b), // merge
// // Integer.MAX_VALUE,
// // (v, a: Int) => math.min(v.data, a), // apply
// // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter
// // numIter,
// // gatherEdges = EdgeDirection.Both,
// // scatterEdges = EdgeDirection.Both).vertices
// // //
// // // graph_ret.vertices.collect.foreach(println)
// // // graph_ret.edges.take(10).foreach(println)
// // }
// /**
// *
// */
// def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double],
// latentK: Int, lambda: Double, numIter: Int) = {
// val vertices = graph.vertices.mapPartitions( _.map {
// case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } )
// }).cache
// val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// val edges = graph.edges // .mapValues(v => None)
// val alsGraph = new Graph(vertices, edges)
// alsGraph.numVPart = graph.numVPart
// alsGraph.numEPart = graph.numEPart
// val niterations = Int.MaxValue
// alsGraph.iterateDynamic[(Array[Double], Array[Double])](
// (me_id, edge) => { // gather
// val X = edge.otherVertex(me_id).data
// val y = edge.data
// val Xy = X.map(_ * y)
// val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray
// (Xy, XtX)
// },
// (a, b) => {
// // The difference between the while loop and the zip is a FACTOR OF TWO in overall
// // runtime
// var i = 0
// while(i < a._1.length) { a._1(i) += b._1(i); i += 1 }
// i = 0
// while(i < a._2.length) { a._2(i) += b._2(i); i += 1 }
// a
// // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r })
// },
// (Array.empty[Double], Array.empty[Double]), // default value is empty
// (vertex, accum) => { // apply
// val XyArray = accum._1
// val XtXArray = accum._2
// if(XyArray.isEmpty) vertex.data // no neighbors
// else {
// val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) =>
// (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) +
// (if(i == j) lambda else 1.0F) //regularization
// // /**
// // * Compute the shortest path to a set of markers
// // */
// // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float],
// // sources: List[Int], numIter: Int) = {
// // val sourceSet = sources.toSet
// // val vertices = graph.vertices.mapPartitions(
// // iter => iter.map {
// // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) )
// // });
// // val edges = graph.edges // .mapValues(v => None)
// // val spGraph = new Graph(vertices, edges)
// // val niterations = Int.MaxValue
// // spGraph.iterateDynamic(
// // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
// // (a: Float, b: Float) => math.min(a, b), // merge
// // Float.MaxValue,
// // (v, a: Float) => math.min(v.data, a), // apply
// // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter
// // numIter,
// // gatherEdges = EdgeDirection.In,
// // scatterEdges = EdgeDirection.Out).vertices
// // }
// // /**
// // *
// // */
// // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double],
// // latentK: Int, lambda: Double, numIter: Int) = {
// // val vertices = graph.vertices.mapPartitions( _.map {
// // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } )
// // }).cache
// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// // val edges = graph.edges // .mapValues(v => None)
// // val alsGraph = new Graph(vertices, edges)
// // alsGraph.numVPart = graph.numVPart
// // alsGraph.numEPart = graph.numEPart
// // val niterations = Int.MaxValue
// // alsGraph.iterateDynamic[(Array[Double], Array[Double])](
// // (me_id, edge) => { // gather
// // val X = edge.otherVertex(me_id).data
// // val y = edge.data
// // val Xy = X.map(_ * y)
// // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray
// // (Xy, XtX)
// // },
// // (a, b) => {
// // // The difference between the while loop and the zip is a FACTOR OF TWO in overall
// // // runtime
// // var i = 0
// // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 }
// // i = 0
// // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 }
// // a
// // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r })
// // },
// // (Array.empty[Double], Array.empty[Double]), // default value is empty
// // (vertex, accum) => { // apply
// // val XyArray = accum._1
// // val XtXArray = accum._2
// // if(XyArray.isEmpty) vertex.data // no neighbors
// // else {
// // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) =>
// // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) +
// // (if(i == j) lambda else 1.0F) //regularization
// // }
// // val Xy = DenseMatrix.create(latentK,1,XyArray)
// // val w = XtX \ Xy
// // w.data
// // }
// // },
// // (me_id, edge) => true,
// // numIter,
// // gatherEdges = EdgeDirection.Both,
// // scatterEdges = EdgeDirection.Both,
// // vertex => vertex.id < maxUser).vertices
// // }
// def main(args: Array[String]) = {
// val host = args(0)
// val taskType = args(1)
// val fname = args(2)
// val options = args.drop(3).map { arg =>
// arg.dropWhile(_ == '-').split('=') match {
// case Array(opt, v) => (opt -> v)
// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
// }
// val Xy = DenseMatrix.create(latentK,1,XyArray)
// val w = XtX \ Xy
// w.data
// }
// },
// (me_id, edge) => true,
// numIter,
// gatherEdges = EdgeDirection.Both,
// scatterEdges = EdgeDirection.Both,
// vertex => vertex.id < maxUser).vertices
// }
def main(args: Array[String]) = {
val host = args(0)
val taskType = args(1)
val fname = args(2)
val options = args.drop(3).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
// System.setProperty("spark.serializer", "spark.KryoSerializer")
// //System.setProperty("spark.shuffle.compress", "false")
// System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
System.setProperty("spark.serializer", "spark.KryoSerializer")
//System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
// taskType match {
// case "pagerank" => {
taskType match {
case "pagerank" => {
var numIter = Int.MaxValue
var isDynamic = false
var tol:Float = 0.001F
var outFname = ""
var numVPart = 4
var numEPart = 4
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
println("======================================")
println("| PageRank |")
println("--------------------------------------")
println(" Using parameters:")
println(" \tDynamic: " + isDynamic)
if(isDynamic) println(" \t |-> Tolerance: " + tol)
println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
val startTime = System.currentTimeMillis
logInfo("GRAPHX: starting tasks")
logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
logInfo("GRAPHX: Number of edges " + graph.edges.count)
val pr = Analytics.pagerank(graph, numIter)
// val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// else Analytics.pagerank(graph, numIter)
logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
if (!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname)
pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop()
}
case "cc" => {
var numIter = Int.MaxValue
var isDynamic = false
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
println("======================================")
println("| Connected Components |")
println("--------------------------------------")
println(" Using parameters:")
println(" \tDynamic: " + isDynamic)
println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
val graph = Graph.textFile(sc, fname, a => 1.0F)
val cc = Analytics.connectedComponents(graph, numIter)
// val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter)
println("Components: " + cc.vertices.map(_.data).distinct())
sc.stop()
}
case "shortestpath" => {
var numIter = Int.MaxValue
var isDynamic = true
var sources: List[Int] = List.empty
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("source", v) => sources ++= List(v.toInt)
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
if(sources.isEmpty) {
println("No sources provided!")
sys.exit(1)
}
println("======================================")
println("| Shortest Path |")
println("--------------------------------------")
println(" Using parameters:")
println(" \tDynamic: " + isDynamic)
println(" \tNumIter: " + numIter)
println(" \tSources: [" + sources.mkString(", ") + "]")
println("======================================")
val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
val sp = Analytics.shortestPath(graph, sources, numIter)
// val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// else Analytics.shortestPath(graph, sources, numIter)
println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
sc.stop()
}
// case "als" => {
// var numIter = 5
// var lambda = 0.01
// var latentK = 10
// var usersFname = "usersFactors.tsv"
// var moviesFname = "moviesFname.tsv"
// var numIter = Int.MaxValue
// var isDynamic = false
// var tol:Float = 0.001F
// var outFname = ""
// var numVPart = 4
// var numEPart = 4
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("lambda", v) => lambda = v.toDouble
// case ("latentK", v) => latentK = v.toInt
// case ("usersFname", v) => usersFname = v
// case ("moviesFname", v) => moviesFname = v
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("tol", v) => tol = v.toFloat
// case ("output", v) => outFname = v
// case ("numVPart", v) => numVPart = v.toInt
// case ("numEPart", v) => numEPart = v.toInt
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// println("======================================")
// println("| Alternating Least Squares |")
// println("| PageRank |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// if(isDynamic) println(" \t |-> Tolerance: " + tol)
// println(" \tNumIter: " + numIter)
// println(" \tLambda: " + lambda)
// println(" \tLatentK: " + latentK)
// println(" \tusersFname: " + usersFname)
// println(" \tmoviesFname: " + moviesFname)
// println("======================================")
// val sc = new SparkContext(host, "ALS(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => a(0).toDouble )
// graph.numVPart = numVPart
// graph.numEPart = numEPart
// val sc = new SparkContext(host, "PageRank(" + fname + ")")
// val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
// assert(maxUser < minMovie)
// val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
// val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
// factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
// .saveAsTextFile(usersFname)
// factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
// .saveAsTextFile(moviesFname)
// val startTime = System.currentTimeMillis
// logInfo("GRAPHX: starting tasks")
// logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
// logInfo("GRAPHX: Number of edges " + graph.edges.count)
// val pr = Analytics.pagerank(graph, numIter)
// // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// // else Analytics.pagerank(graph, numIter)
// logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
// if (!outFname.isEmpty) {
// println("Saving pageranks of pages to " + outFname)
// pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
// }
// logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
// sc.stop()
// }
// case "cc" => {
// var numIter = Int.MaxValue
// var isDynamic = false
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// println("======================================")
// println("| Connected Components |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// println(" \tNumIter: " + numIter)
// println("======================================")
// val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0F)
// val cc = Analytics.connectedComponents(graph, numIter)
// // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// // else Analytics.connectedComponents(graph, numIter)
// println("Components: " + cc.vertices.map(_.data).distinct())
// sc.stop()
// }
// case "shortestpath" => {
// var numIter = Int.MaxValue
// var isDynamic = true
// var sources: List[Int] = List.empty
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("source", v) => sources ++= List(v.toInt)
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// if(sources.isEmpty) {
// println("No sources provided!")
// sys.exit(1)
// }
// println("======================================")
// println("| Shortest Path |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// println(" \tNumIter: " + numIter)
// println(" \tSources: [" + sources.mkString(", ") + "]")
// println("======================================")
// val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
// val sp = Analytics.shortestPath(graph, sources, numIter)
// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// // else Analytics.shortestPath(graph, sources, numIter)
// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
// sc.stop()
// }
case _ => {
println("Invalid task type.")
}
}
}
// // case "als" => {
// // var numIter = 5
// // var lambda = 0.01
// // var latentK = 10
// // var usersFname = "usersFactors.tsv"
// // var moviesFname = "moviesFname.tsv"
// // var numVPart = 4
// // var numEPart = 4
// // options.foreach{
// // case ("numIter", v) => numIter = v.toInt
// // case ("lambda", v) => lambda = v.toDouble
// // case ("latentK", v) => latentK = v.toInt
// // case ("usersFname", v) => usersFname = v
// // case ("moviesFname", v) => moviesFname = v
// // case ("numVPart", v) => numVPart = v.toInt
// // case ("numEPart", v) => numEPart = v.toInt
// // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// // }
// // println("======================================")
// // println("| Alternating Least Squares |")
// // println("--------------------------------------")
// // println(" Using parameters:")
// // println(" \tNumIter: " + numIter)
// // println(" \tLambda: " + lambda)
// // println(" \tLatentK: " + latentK)
// // println(" \tusersFname: " + usersFname)
// // println(" \tmoviesFname: " + moviesFname)
// // println("======================================")
// // val sc = new SparkContext(host, "ALS(" + fname + ")")
// // val graph = Graph.textFile(sc, fname, a => a(0).toDouble )
// // graph.numVPart = numVPart
// // graph.numEPart = numEPart
// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
// // assert(maxUser < minMovie)
// // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
// // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
// // .saveAsTextFile(usersFname)
// // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
// // .saveAsTextFile(moviesFname)
// // sc.stop()
// // }
// case _ => {
// println("Invalid task type.")
// }
// }
// }
}

View file

@ -0,0 +1,7 @@
package spark.graph
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
var src: Vid = 0,
var dst: Vid = 0,
var data: ED = nullValue[ED])

View file

@ -0,0 +1,17 @@
package spark.graph
class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD,
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] {
var src: Vertex[VD] = _
var dst: Vertex[VD] = _
var data: ED = _
def otherVertex(vid: Vid): Vertex[VD] = if (src.id == vid) dst else src
def vertex(vid: Vid): Vertex[VD] = if (src.id == vid) src else dst
def relativeDirection(vid: Vid): EdgeDirection = {
if (vid == src.id) EdgeDirection.Out else EdgeDirection.In
}
}

View file

@ -1,483 +1,57 @@
package spark.graph
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntArrayList
import spark.{ClosureCleaner, HashPartitioner, Partitioner, RDD}
import spark.SparkContext
import spark.SparkContext._
import spark.graph.Graph.EdgePartition
import spark.storage.StorageLevel
import spark.RDD
case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] (
var id: Vid = 0,
var data: VD = nullValue[VD]) {
abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2)
def vertices(): RDD[Vertex[VD]]
def tuple = (id, data)
}
def edges(): RDD[Edge[ED]]
def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]]
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
var src: Vid = 0,
var dst: Vid = 0,
var data: ED = nullValue[ED])
def cache(): Graph[VD, ED]
def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED]
class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD,
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] {
var src: Vertex[VD] = _
var dst: Vertex[VD] = _
var data: ED = _
def otherVertex(vid: Vid): Vertex[VD] = if (src.id == vid) dst else src
def vertex(vid: Vid): Vertex[VD] = if (src.id == vid) src else dst
def relativeDirection(vid: Vid): EdgeDirection = {
if (vid == src.id) EdgeDirection.Out else EdgeDirection.In
}
}
/**
* A Graph RDD that supports computation on graphs.
*/
class Graph[VD: ClassManifest, ED: ClassManifest] protected (
val numVertexPartitions: Int,
val numEdgePartitions: Int,
_rawVertices: RDD[Vertex[VD]],
_rawEdges: RDD[Edge[ED]],
_rawVTable: RDD[(Vid, (VD, Array[Pid]))],
_rawETable: RDD[(Pid, EdgePartition[ED])]) {
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null)
}
def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
if (_cached) {
(new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable))
.cache()
} else {
new Graph(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
}
}
def withVertexPartitioner(numVertexPartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
def withEdgePartitioner(numEdgePartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
protected var _cached = false
def cache(): Graph[VD, ED] = {
eTable.cache()
vTable.cache()
_cached = true
this
}
/** Return a RDD of vertices. */
def vertices: RDD[Vertex[VD]] = {
if (!_cached && _rawVertices != null) {
_rawVertices
} else {
vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
}
}
/** Return a RDD of edges. */
def edges: RDD[Edge[ED]] = {
if (!_cached && _rawEdges != null) {
_rawEdges
} else {
eTable.mapPartitions { iter => iter.next._2.iterator }
}
}
/** Return a RDD that brings edges with its source and destination vertices together. */
def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = {
(new EdgeWithVerticesRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 }
}
lazy val numEdges: Long = edges.count()
lazy val numVertices: Long = vertices.count()
lazy val inDegrees = mapReduceNeighborhood[Vid]((vid, edge) => 1, _+_, 0, EdgeDirection.In)
lazy val outDegrees = mapReduceNeighborhood[Vid]((vid,edge) => 1, _+_, 0, EdgeDirection.Out)
lazy val degrees = mapReduceNeighborhood[Vid]((vid,edge) => 1, _+_, 0, EdgeDirection.Both)
def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2]
/** Return a new graph with its edge directions reversed. */
lazy val reverse: Graph[VD,ED] = {
newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) })
}
def reverse: Graph[VD, ED]
def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
mapReduceNeighborhood[Array[Vid]](
(vid, edge) => Array(edge.otherVertex(vid).id),
(a, b) => a ++ b,
Array.empty[Vid],
edgeDirection)
}
def mapVertices[VD2: ClassManifest](f: Vertex[VD] => Vertex[VD2]): Graph[VD2, ED] = {
newGraph(vertices.map(f), edges)
}
def mapEdges[ED2: ClassManifest](f: Edge[ED] => Edge[ED2]): Graph[VD, ED2] = {
newGraph(vertices, edges.map(f))
}
//////////////////////////////////////////////////////////////////////////////////////////////////
// Lower level transformation methods
//////////////////////////////////////////////////////////////////////////////////////////////////
def mapReduceNeighborhood[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => VD2,
reduceFunc: (VD2, VD2) => VD2,
default: VD2,
gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, default)) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, VD2], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, VD2], ED] =>
edgeSansAcc.data = e.data
edgeSansAcc.src.data = e.src.data._1
edgeSansAcc.dst.data = e.dst.data._1
edgeSansAcc.src.id = e.src.id
edgeSansAcc.dst.id = e.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 = reduceFunc(e.dst.data._2, mapFunc(edgeSansAcc.dst.id, edgeSansAcc))
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
e.src.data._2 = reduceFunc(e.src.data._2, mapFunc(edgeSansAcc.src.id, edgeSansAcc))
}
}
vmap.int2ObjectEntrySet().fastIterator().map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
/**
* Same as mapReduceNeighborhood but map function can return none and there is no default value.
* As a consequence, the resulting table may be much smaller than the set of vertices.
*/
def flatMapReduceNeighborhood[VD2: ClassManifest](
def aggregateNeighbors[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2,
gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = {
gatherDirection: EdgeDirection)
: RDD[(Vid, VD2)]
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
edgeSansAcc.data = e.data
edgeSansAcc.src.data = e.src.data._1
edgeSansAcc.dst.data = e.dst.data._1
edgeSansAcc.src.id = e.src.id
edgeSansAcc.dst.id = e.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2
}
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.src.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2
}
}
}
vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
.map{ case (vid, aOpt) => (vid, aOpt.get) }
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
def aggregateNeighbors[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2,
default: VD2, // Should this be a function or a value?
gatherDirection: EdgeDirection)
: RDD[(Vid, VD2)]
def updateVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)],
updateFunc: (Vertex[VD], Option[U]) => VD2)
: Graph[VD2, ED] = {
: Graph[VD2, ED]
ClosureCleaner.clean(updateFunc)
// This one can be used to skip records when we can do in-place update.
// Annoying that we can't rename it ...
def updateVertices2[U: ClassManifest](
updates: RDD[(Vid, U)],
updateFunc: (Vertex[VD], U) => VD)
: Graph[VD, ED]
val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), update)) =>
val newVdata = updateFunc(Vertex(vid, vdata), update)
(vid, (newVdata, pids))
}
}, preservesPartitioning = true).cache()
new Graph(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable)
}
// def mapPartitions[U: ClassManifest](
// f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
// preservesPartitioning: Boolean = false): RDD[U] = {
// (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part =>
// val (vmap, iter) = part.next()
// f(vmap, iter)
// }, preservesPartitioning)
// }
//////////////////////////////////////////////////////////////////////////////////////////////////
// Internals hidden from callers
//////////////////////////////////////////////////////////////////////////////////////////////////
// TODO: Support non-hash partitioning schemes.
protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
/** Create a new graph but keep the current partitioning scheme. */
protected def newGraph[VD2: ClassManifest, ED2: ClassManifest](
vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = {
(new Graph[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions)
}
protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
if (_rawETable == null) {
Graph.createETable(_rawEdges, numEdgePartitions)
} else {
_rawETable
}
}
protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
if (_rawVTable == null) {
Graph.createVTable(_rawVertices, eTable, numVertexPartitions)
} else {
_rawVTable
}
}
protected lazy val vTableReplicated: RDD[(Vid, VD)] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get
// the shuffle id so we can use it on the slave.
vTable
.flatMap { case (vid, (vdata, pids)) => pids.iterator.map { pid => (pid, (vid, vdata)) } }
.partitionBy(edgePartitioner)
.mapPartitions(
{ part => part.map { case(pid, (vid, vdata)) => (vid, vdata) } },
preservesPartitioning = true)
}
// Save a copy of the GraphOps object so there is always one unique GraphOps object
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
val ops = new GraphOps(this)
}
object Graph {
/**
* Load an edge list from file initializing the Graph RDD
*/
def textFile[ED: ClassManifest](
sc: SparkContext,
path: String,
edgeParser: Array[String] => ED,
minEdgePartitions: Int = 1,
minVertexPartitions: Int = 1)
: Graph[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path).flatMap { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if(lineArray.length < 2) {
println("Invalid line: " + line)
assert(false)
}
val source = lineArray(0)
val target = lineArray(1)
val tail = lineArray.drop(2)
val edata = edgeParser(tail)
Array(Edge(source.trim.toInt, target.trim.toInt, edata))
} else {
Array.empty[Edge[ED]]
}
}.cache()
val graph = fromEdges(edges)
// println("Loaded graph:" +
// "\n\t#edges: " + graph.numEdges +
// "\n\t#vertices: " + graph.numVertices)
graph
}
def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): Graph[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) }
.reduceByKey(_ + _)
.map{ case (vid, degree) => Vertex(vid, degree) }
(new Graph[Int, ED](vertices, edges))
}
/**
* Make k-cycles
*/
def kCycles(sc: SparkContext, numCycles: Int = 3, size: Int = 3) = {
// Construct the edges
val edges = sc.parallelize(for (i <- 0 until numCycles; j <- 0 until size) yield {
val offset = i * numCycles
val source = offset + j
val target = offset + ((j + 1) % size)
Edge(source, target, i * numCycles + j)
})
// Change vertex data to be the lowest vertex id of the vertex in that cycle
val graph = fromEdges(edges).mapVertices{
case Vertex(id, degree) => Vertex(id, (id/numCycles) * numCycles)
}
graph
}
/**
* Make a regular grid graph
**/
def grid(sc: SparkContext, numRows: Int = 5, numCols: Int = 5) = {
def coord(vid: Int) = (vid % numRows, vid / numRows)
val vertices = sc.parallelize( 0 until (numRows * numCols) ).map(
vid => Vertex(vid, coord(vid)))
def index(r: Int, c:Int) = (r + c * numRows)
val edges = vertices.flatMap{ case Vertex(vid, (r,c)) =>
(if(r+1 < numRows) List(Edge(vid, index(r+1,c), 1.0F)) else List.empty) ++
(if(c+1 < numCols) List(Edge(vid, index(r,c+1), 1.0F)) else List.empty)
}
new Graph(vertices, edges)
}
/**
* A partition of edges in 3 large columnar arrays.
*/
private[graph]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]
{
val srcIds: IntArrayList = new IntArrayList
val dstIds: IntArrayList = new IntArrayList
// TODO: Specialize data.
val data: ArrayBuffer[ED] = new ArrayBuffer[ED]
/** Add a new edge to the partition. */
def add(src: Vid, dst: Vid, d: ED) {
srcIds.add(src)
dstIds.add(dst)
data += d
}
def trim() {
srcIds.trim()
dstIds.trim()
}
def size: Int = srcIds.size
def iterator = new Iterator[Edge[ED]] {
private var edge = new Edge[ED]
private var pos = 0
override def hasNext: Boolean = pos < EdgePartition.this.size
override def next(): Edge[ED] = {
edge.src = srcIds.get(pos)
edge.dst = dstIds.get(pos)
edge.data = data(pos)
pos += 1
edge
}
}
}
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
*
* The edge table contains multiple partitions, and each partition contains only one RDD
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
: RDD[(Pid, EdgePartition[ED])] = {
edges
.map { e =>
// Random partitioning based on the source vertex id.
(math.abs(e.src) % numPartitions, (e.src, e.dst, e.data))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex({ (pid, iter) =>
val edgePartition = new Graph.EdgePartition[ED]
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
Iterator((pid, edgePartition))
}, preservesPartitioning = true)
}
protected def createVTable[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[Vertex[VD]],
eTable: RDD[(Pid, EdgePartition[ED])],
numPartitions: Int)
: RDD[(Vid, (VD, Array[Pid]))] = {
val partitioner = new HashPartitioner(numPartitions)
// A key-value RDD. The key is a vertex id, and the value is a list of
// partitions that contains edges referencing the vertex.
val vid2pid : RDD[(Int, Seq[Pid])] = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new it.unimi.dsi.fastutil.ints.IntOpenHashSet
var i = 0
while (i < edgePartition.srcIds.size) {
vSet.add(edgePartition.srcIds.getInt(i))
vSet.add(edgePartition.dstIds.getInt(i))
i += 1
}
vSet.iterator.map { vid => (vid.intValue, pid) }
}.groupByKey(partitioner)
vertices
.map { v => (v.id, v.data) }
.partitionBy(partitioner)
.leftOuterJoin(vid2pid)
.mapValues {
case (vdata, None) => (vdata, Array.empty[Pid])
case (vdata, Some(pids)) => (vdata, pids.toArray)
}
}
implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops
}

View file

@ -6,116 +6,116 @@ import spark.RDD
object GraphLab {
def iterateGA2[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A,
default: A,
apply: (Vertex[VD], A) => VD,
numIter: Int,
gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
// def iterateGA2[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
// gather: (Vid, EdgeWithVertices[VD, ED]) => A,
// merge: (A, A) => A,
// default: A,
// apply: (Vertex[VD], A) => VD,
// numIter: Int,
// gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
var g = graph.cache()
// var g = graph.cache()
var i = 0
while (i < numIter) {
// var i = 0
// while (i < numIter) {
val accUpdates: RDD[(Vid, A)] =
g.mapReduceNeighborhood(gather, merge, default, gatherDirection)
// val accUpdates: RDD[(Vid, A)] =
// g.aggregateNeighbors(gather, merge, default, gatherDirection)
def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
g = g.updateVertices(accUpdates, applyFunc).cache()
// def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
// g = g.updateVertices(accUpdates, applyFunc).cache()
i += 1
}
g
}
def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
mergeFunc: (A, A) => A,
applyFunc: (Vertex[VD], Option[A]) => VD,
numIter: Int,
gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
var g = graph.cache()
def someGather(vid: Vid, edge: EdgeWithVertices[VD, ED]) = Some(gatherFunc(vid, edge))
var i = 0
while (i < numIter) {
val accUpdates: RDD[(Vid, A)] =
g.flatMapReduceNeighborhood(someGather, mergeFunc, gatherDirection)
g = g.updateVertices(accUpdates, applyFunc).cache()
i += 1
}
g
}
def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
mergeFunc: (A, A) => A,
applyFunc: (Vertex[VD], Option[A]) => VD,
scatterFunc: (Vid, EdgeWithVertices[VD, ED]) => Boolean,
numIter: Int,
gatherDirection: EdgeDirection = EdgeDirection.In,
scatterDirection: EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = {
var g = graph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache()
def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = {
if(e.vertex(vid).data._1) {
val edge = new EdgeWithVertices[VD,ED]
edge.src = Vertex(e.src.id, e.src.data._2)
edge.dst = Vertex(e.dst.id, e.dst.data._2)
Some(gatherFunc(vid, edge))
} else {
None
}
}
def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = {
if(v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum))
else (false, v.data._2)
}
def scatter(rawVid: Vid, e: EdgeWithVertices[(Boolean, VD),ED]) = {
val vid = e.otherVertex(rawVid).id
if(e.vertex(vid).data._1) {
val edge = new EdgeWithVertices[VD,ED]
edge.src = Vertex(e.src.id, e.src.data._2)
edge.dst = Vertex(e.dst.id, e.dst.data._2)
Some(scatterFunc(vid, edge))
} else {
None
}
}
def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) =
(accum.getOrElse(false), v.data._2)
var i = 0
var numActive = g.numVertices
while (i < numIter && numActive > 0) {
val accUpdates: RDD[(Vid, A)] =
g.flatMapReduceNeighborhood(gather, mergeFunc, gatherDirection)
g = g.updateVertices(accUpdates, apply).cache()
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
val activeVertices: RDD[(Vid, Boolean)] =
g.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection.reverse)
g = g.updateVertices(activeVertices, applyActive).cache()
numActive = g.vertices.map(v => if (v.data._1) 1 else 0).reduce( _ + _ )
println("Number active vertices: " + numActive)
i += 1
}
g.mapVertices(v => Vertex(v.id, v.data._2))
}
// i += 1
// }
// g
// }
// def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
// gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
// mergeFunc: (A, A) => A,
// applyFunc: (Vertex[VD], Option[A]) => VD,
// numIter: Int,
// gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
// var g = graph.cache()
// def someGather(vid: Vid, edge: EdgeWithVertices[VD, ED]) = Some(gatherFunc(vid, edge))
// var i = 0
// while (i < numIter) {
// val accUpdates: RDD[(Vid, A)] =
// g.flatMapReduceNeighborhood(someGather, mergeFunc, gatherDirection)
// g = g.updateVertices(accUpdates, applyFunc).cache()
// i += 1
// }
// g
// }
// def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
// gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
// mergeFunc: (A, A) => A,
// applyFunc: (Vertex[VD], Option[A]) => VD,
// scatterFunc: (Vid, EdgeWithVertices[VD, ED]) => Boolean,
// numIter: Int,
// gatherDirection: EdgeDirection = EdgeDirection.In,
// scatterDirection: EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = {
// var g = graph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache()
// def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = {
// if(e.vertex(vid).data._1) {
// val edge = new EdgeWithVertices[VD,ED]
// edge.src = Vertex(e.src.id, e.src.data._2)
// edge.dst = Vertex(e.dst.id, e.dst.data._2)
// Some(gatherFunc(vid, edge))
// } else {
// None
// }
// }
// def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = {
// if(v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum))
// else (false, v.data._2)
// }
// def scatter(rawVid: Vid, e: EdgeWithVertices[(Boolean, VD),ED]) = {
// val vid = e.otherVertex(rawVid).id
// if(e.vertex(vid).data._1) {
// val edge = new EdgeWithVertices[VD,ED]
// edge.src = Vertex(e.src.id, e.src.data._2)
// edge.dst = Vertex(e.dst.id, e.dst.data._2)
// Some(scatterFunc(vid, edge))
// } else {
// None
// }
// }
// def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) =
// (accum.getOrElse(false), v.data._2)
// var i = 0
// var numActive = g.numVertices
// while (i < numIter && numActive > 0) {
// val accUpdates: RDD[(Vid, A)] =
// g.flatMapReduceNeighborhood(gather, mergeFunc, gatherDirection)
// g = g.updateVertices(accUpdates, apply).cache()
// // Scatter is basically a gather in the opposite direction so we reverse the edge direction
// val activeVertices: RDD[(Vid, Boolean)] =
// g.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection.reverse)
// g = g.updateVertices(activeVertices, applyActive).cache()
// numActive = g.vertices.map(v => if (v.data._1) 1 else 0).reduce( _ + _ )
// println("Number active vertices: " + numActive)
// i += 1
// }
// g.mapVertices(v => Vertex(v.id, v.data._2))
// }
}

View file

@ -0,0 +1,54 @@
package spark.graph
import spark.RDD
import spark.SparkContext
import spark.SparkContext._
import spark.graph.impl.GraphImpl
object GraphLoader {
/**
* Load an edge list from file initializing the Graph RDD
*/
def textFile[ED: ClassManifest](
sc: SparkContext,
path: String,
edgeParser: Array[String] => ED,
minEdgePartitions: Int = 1,
minVertexPartitions: Int = 1)
: GraphImpl[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path).flatMap { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if(lineArray.length < 2) {
println("Invalid line: " + line)
assert(false)
}
val source = lineArray(0)
val target = lineArray(1)
val tail = lineArray.drop(2)
val edata = edgeParser(tail)
Array(Edge(source.trim.toInt, target.trim.toInt, edata))
} else {
Array.empty[Edge[ED]]
}
}.cache()
val graph = fromEdges(edges)
// println("Loaded graph:" +
// "\n\t#edges: " + graph.numEdges +
// "\n\t#vertices: " + graph.numVertices)
graph
}
def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) }
.reduceByKey(_ + _)
.map{ case (vid, degree) => Vertex(vid, degree) }
new GraphImpl[Int, ED](vertices, edges)
}
}

View file

@ -0,0 +1,30 @@
package spark.graph
import spark.RDD
class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) {
lazy val numEdges: Long = g.edges.count()
lazy val numVertices: Long = g.vertices.count()
lazy val inDegrees: RDD[(Vid, Int)] = {
g.aggregateNeighbors((vid, edge) => Some(1), _+_, EdgeDirection.In)
}
lazy val outDegrees: RDD[(Vid, Int)] = {
g.aggregateNeighbors((vid,edge) => Some(1), _+_, EdgeDirection.Out)
}
lazy val degrees: RDD[(Vid, Int)] = {
g.aggregateNeighbors((vid,edge) => Some(1), _+_, EdgeDirection.Both)
}
def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
g.aggregateNeighbors(
(vid, edge) => Some(Array(edge.otherVertex(vid).id)),
(a, b) => a ++ b,
edgeDirection)
}
}

View file

@ -16,41 +16,20 @@ object Pregel {
var g = graph.cache
var i = 0
def reverseGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) =
sendMsg(edge.otherVertex(vid).id, edge)
def mapF(vid: Vid, edge: EdgeWithVertices[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge)
def runProg(v: Vertex[VD], msg: Option[A]): VD = {
if (msg.isEmpty) v.data else vprog(v, msg.get)
}
var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) }
while (i < numIter) {
def runProg(v: Vertex[VD], msg: Option[A]): VD = if(msg.isEmpty) v.data else vprog(v, msg.get)
g = g.updateVertices(msgs, runProg).cache()
msgs = g.flatMapReduceNeighborhood(reverseGather, mergeMsg, EdgeDirection.In)
msgs = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
i += 1
}
g
}
def iterateOriginal[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])(
vprog: ( Vertex[VD], A, Seq[Vid]) => Seq[(Vid, A)],
mergeMsg: (A, A) => A,
numIter: Int) : Graph[VD, ED] = {
var graph = rawGraph.cache
var i = 0
val outNbrIds : RDD[(Vid, Array[Vid])] = graph.collectNeighborIds(EdgeDirection.Out)
/// Todo implement
/// vprog takes the vertex, the message (A), and list of out neighbor ids
graph
}
}

View file

@ -1,14 +0,0 @@
package spark.graph
class Timer {
var lastTime = System.currentTimeMillis
def tic = {
val currentTime = System.currentTimeMillis
val elapsedTime = (currentTime - lastTime)/1000.0
lastTime = currentTime
elapsedTime
}
}

View file

@ -0,0 +1,11 @@
package spark.graph
case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] (
var id: Vid = 0,
var data: VD = nullValue[VD]) {
def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2)
def tuple = (id, data)
}

View file

@ -0,0 +1,49 @@
package spark.graph.impl
import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntArrayList
import spark.graph._
/**
* A partition of edges in 3 large columnar arrays.
*/
private[graph]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]
{
val srcIds: IntArrayList = new IntArrayList
val dstIds: IntArrayList = new IntArrayList
// TODO: Specialize data.
val data: ArrayBuffer[ED] = new ArrayBuffer[ED]
/** Add a new edge to the partition. */
def add(src: Vid, dst: Vid, d: ED) {
srcIds.add(src)
dstIds.add(dst)
data += d
}
def trim() {
srcIds.trim()
dstIds.trim()
}
def size: Int = srcIds.size
def iterator = new Iterator[Edge[ED]] {
private var edge = new Edge[ED]
private var pos = 0
override def hasNext: Boolean = pos < EdgePartition.this.size
override def next(): Edge[ED] = {
edge.src = srcIds.get(pos)
edge.dst = dstIds.get(pos)
edge.data = data(pos)
pos += 1
edge
}
}
}

View file

@ -1,9 +1,9 @@
package spark.graph
package spark.graph.impl
import spark.{Aggregator, HashPartitioner, Partition, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
import spark.SparkContext._
import spark.graph.Graph.EdgePartition
import spark.graph._
private[graph]

View file

@ -0,0 +1,343 @@
package spark.graph.impl
import scala.collection.JavaConversions._
import spark.ClosureCleaner
import spark.HashPartitioner
import spark.Partitioner
import spark.RDD
import spark.SparkContext
import spark.SparkContext._
import spark.graph._
import spark.graph.impl.GraphImpl._
/**
* A Graph RDD that supports computation on graphs.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val numVertexPartitions: Int,
val numEdgePartitions: Int,
_rawVertices: RDD[Vertex[VD]],
_rawEdges: RDD[Edge[ED]],
_rawVTable: RDD[(Vid, (VD, Array[Pid]))],
_rawETable: RDD[(Pid, EdgePartition[ED])])
extends Graph[VD, ED] {
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null)
}
def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
if (_cached) {
(new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable))
.cache()
} else {
new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
}
}
def withVertexPartitioner(numVertexPartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
def withEdgePartitioner(numEdgePartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
protected var _cached = false
override def cache(): Graph[VD, ED] = {
eTable.cache()
vTable.cache()
_cached = true
this
}
override def reverse: Graph[VD, ED] = {
newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) })
}
/** Return a RDD of vertices. */
override def vertices: RDD[Vertex[VD]] = {
if (!_cached && _rawVertices != null) {
_rawVertices
} else {
vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
}
}
/** Return a RDD of edges. */
override def edges: RDD[Edge[ED]] = {
if (!_cached && _rawEdges != null) {
_rawEdges
} else {
eTable.mapPartitions { iter => iter.next._2.iterator }
}
}
/** Return a RDD that brings edges with its source and destination vertices together. */
override def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = {
(new EdgeWithVerticesRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 }
}
override def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] = {
newGraph(vertices.map(v => Vertex(v.id, f(v))), edges)
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = {
newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e))))
}
//////////////////////////////////////////////////////////////////////////////////////////////////
// Lower level transformation methods
//////////////////////////////////////////////////////////////////////////////////////////////////
override def aggregateNeighbors[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2,
default: VD2,
gatherDirection: EdgeDirection)
: RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
edgeSansAcc.data = e.data
edgeSansAcc.src.data = e.src.data._1
edgeSansAcc.dst.data = e.dst.data._1
edgeSansAcc.src.id = e.src.id
edgeSansAcc.dst.id = e.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2
}
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.src.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2
}
}
}
vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
.map{ case (vid, aOpt) => (vid, aOpt.get) }
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
/**
* Same as mapReduceNeighborhood but map function can return none and there is no default value.
* As a consequence, the resulting table may be much smaller than the set of vertices.
*/
override def aggregateNeighbors[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2,
gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
edgeSansAcc.data = e.data
edgeSansAcc.src.data = e.src.data._1
edgeSansAcc.dst.data = e.dst.data._1
edgeSansAcc.src.id = e.src.id
edgeSansAcc.dst.id = e.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2
}
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.src.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2
}
}
}
vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
.map{ case (vid, aOpt) => (vid, aOpt.get) }
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
override def updateVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)],
updateF: (Vertex[VD], Option[U]) => VD2)
: Graph[VD2, ED] = {
ClosureCleaner.clean(updateF)
val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), update)) =>
val newVdata = updateF(Vertex(vid, vdata), update)
(vid, (newVdata, pids))
}
}, preservesPartitioning = true).cache()
new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable)
}
override def updateVertices2[U: ClassManifest](
updates: RDD[(Vid, U)],
updateF: (Vertex[VD], U) => VD)
: Graph[VD, ED] = {
ClosureCleaner.clean(updateF)
val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), update)) =>
if (update.isDefined) {
val newVdata = updateF(Vertex(vid, vdata), update.get)
(vid, (newVdata, pids))
} else {
(vid, (vdata, pids))
}
}
}, preservesPartitioning = true).cache()
new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable)
}
//////////////////////////////////////////////////////////////////////////////////////////////////
// Internals hidden from callers
//////////////////////////////////////////////////////////////////////////////////////////////////
// TODO: Support non-hash partitioning schemes.
protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
/** Create a new graph but keep the current partitioning scheme. */
protected def newGraph[VD2: ClassManifest, ED2: ClassManifest](
vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = {
(new GraphImpl[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions)
}
protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
if (_rawETable == null) {
createETable(_rawEdges, numEdgePartitions)
} else {
_rawETable
}
}
protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
if (_rawVTable == null) {
createVTable(_rawVertices, eTable, numVertexPartitions)
} else {
_rawVTable
}
}
protected lazy val vTableReplicated: RDD[(Vid, VD)] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get
// the shuffle id so we can use it on the slave.
vTable
.flatMap { case (vid, (vdata, pids)) => pids.iterator.map { pid => (pid, (vid, vdata)) } }
.partitionBy(edgePartitioner)
.mapPartitions(
{ part => part.map { case(pid, (vid, vdata)) => (vid, vdata) } },
preservesPartitioning = true)
}
}
object GraphImpl {
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
*
* The edge table contains multiple partitions, and each partition contains only one RDD
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
: RDD[(Pid, EdgePartition[ED])] = {
edges
.map { e =>
// Random partitioning based on the source vertex id.
(math.abs(e.src) % numPartitions, (e.src, e.dst, e.data))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex({ (pid, iter) =>
val edgePartition = new EdgePartition[ED]
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
Iterator((pid, edgePartition))
}, preservesPartitioning = true)
}
protected def createVTable[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[Vertex[VD]],
eTable: RDD[(Pid, EdgePartition[ED])],
numPartitions: Int)
: RDD[(Vid, (VD, Array[Pid]))] = {
val partitioner = new HashPartitioner(numPartitions)
// A key-value RDD. The key is a vertex id, and the value is a list of
// partitions that contains edges referencing the vertex.
val vid2pid : RDD[(Int, Seq[Pid])] = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new it.unimi.dsi.fastutil.ints.IntOpenHashSet
var i = 0
while (i < edgePartition.srcIds.size) {
vSet.add(edgePartition.srcIds.getInt(i))
vSet.add(edgePartition.dstIds.getInt(i))
i += 1
}
vSet.iterator.map { vid => (vid.intValue, pid) }
}.groupByKey(partitioner)
vertices
.map { v => (v.id, v.data) }
.partitionBy(partitioner)
.leftOuterJoin(vid2pid)
.mapValues {
case (vdata, None) => (vdata, Array.empty[Pid])
case (vdata, Some(pids)) => (vdata, pids.toArray)
}
}
}

View file

@ -1,9 +1,10 @@
package spark.graph
package spark.graph.perf
import spark._
import spark.SparkContext._
import spark.bagel.Bagel
import spark.bagel.examples._
import spark.graph._
object BagelTest {
@ -41,7 +42,7 @@ object BagelTest {
}
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val g = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
val startTime = System.currentTimeMillis
val numVertices = g.vertices.count()

View file

@ -0,0 +1,72 @@
package spark.graph.perf
import spark._
import spark.SparkContext._
import spark.bagel.Bagel
import spark.bagel.examples._
import spark.graph._
object SparkTest {
def main(args: Array[String]) {
val host = args(0)
val taskType = args(1)
val fname = args(2)
val options = args.drop(3).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
System.setProperty("spark.serializer", "spark.KryoSerializer")
//System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator")
var numIter = Int.MaxValue
var isDynamic = false
var tol:Float = 0.001F
var outFname = ""
var numVPart = 4
var numEPart = 4
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
val startTime = System.currentTimeMillis
val numVertices = g.vertices.count()
val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
(vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
}
// Do the computation
val epsilon = 0.01 / numVertices
val messages = sc.parallelize(Array[(String, PRMessage)]())
val utils = new PageRankUtils
val result =
Bagel.run(
sc, vertices, messages, combiner = new PRCombiner(),
numPartitions = numVPart)(
utils.computeWithCombiner(numVertices, epsilon, numIter))
println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
if (!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname)
result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
}
println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop()
}
}