Fixing build after merging upstream changes.

This commit is contained in:
Joseph E. Gonzalez 2013-11-19 22:03:49 -08:00
parent 12cb19b1c1
commit de3d6ee5a7
5 changed files with 96 additions and 78 deletions

View file

@ -351,7 +351,7 @@ object Analytics extends Logging {
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
minEdgePartitions = numEPart).cache()
val startTime = System.currentTimeMillis
logInfo("GRAPHX: starting tasks")
@ -401,14 +401,10 @@ object Analytics extends Logging {
println("======================================")
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
//val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
val graph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
minEdgePartitions = numEPart).cache()
val cc = Analytics.connectedComponents(graph)
//val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter)
println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
sc.stop()
}
@ -425,14 +421,12 @@ object Analytics extends Logging {
println("| Triangle Count |")
println("--------------------------------------")
val sc = new SparkContext(host, "TriangleCount(" + fname + ")")
//val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
val graph = GraphLoader.edgeListFileUndirected(sc, fname,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
minEdgePartitions = numEPart).cache()
val triangles = Analytics.triangleCount(graph)
//val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter)
println("Triangles: " + triangles.vertices.map{ case (vid,data) => data.toLong }.reduce(_+_) /3)
println("Triangles: " + triangles.vertices.map {
case (vid,data) => data.toLong
}.reduce(_+_) / 3)
sc.stop()
}

View file

@ -295,20 +295,6 @@ object Graph {
import org.apache.spark.graph.impl._
import org.apache.spark.SparkContext._
/**
* Construct a graph from a collection of edges.
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]], defaultValue: VD): Graph[VD, ED] = {
GraphImpl(edges, defaultValue)
}
/**
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
@ -317,7 +303,7 @@ object Graph {
* @return a graph with edge attributes containing the count of duplicate edges.
*/
def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): Graph[VD, Int] = {
Graph(rawEdges, defaultValue, false)
Graph(rawEdges, defaultValue, false, RandomVertexCut())
}
/**
@ -334,9 +320,14 @@ object Graph {
* attributes containing the total degree of each vertex.
*
*/
def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD, uniqueEdges: Boolean):
def apply[VD: ClassManifest](
rawEdges: RDD[(Vid, Vid)],
defaultValue: VD,
uniqueEdges: Boolean,
partitionStrategy: PartitionStrategy):
Graph[VD, Int] = {
val graph = GraphImpl(rawEdges.map(p => Edge(p._1, p._2, 1)), defaultValue)
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue, partitionStrategy)
if (uniqueEdges) {
graph.groupEdges((a,b) => a+b)
} else {
@ -344,6 +335,37 @@ object Graph {
}
}
/**
* Construct a graph from a collection of edges.
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = {
Graph(edges, defaultValue, RandomVertexCut())
}
/**
* Construct a graph from a collection of edges.
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD,
partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, partitionStrategy)
}
/**
* Construct a graph from a collection attributed vertices and
* edges.
@ -362,32 +384,30 @@ object Graph {
vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]]): Graph[VD, ED] = {
val defaultAttr: VD = null.asInstanceOf[VD]
Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a)
Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a, RandomVertexCut())
}
/**
* Construct a graph from a collection attributed vertices and
* edges. Duplicate vertices are combined using the `mergeFunc` and
* vertices found in the edge collection but not in the input
* vertices are the default attribute `defautVertexAttr`.
*
* @note Duplicate vertices are removed arbitrarily .
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for
* vertices that are mentioned in `edges` but not in `vertices
* @param mergeFunc the function used to merge duplicate vertices
* in the `vertices` collection.
* vertices that are mentioned in `edges` but not in `vertices`
*
*/
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, (a,b) => a)
Graph(vertices, edges, defaultVertexAttr, (a,b) => a, RandomVertexCut())
}
/**
@ -404,14 +424,17 @@ object Graph {
* vertices that are mentioned in `edges` but not in `vertices
* @param mergeFunc the function used to merge duplicate vertices
* in the `vertices` collection.
* @param partitionStrategy the partition strategy to use when
* partitioning the edges.
*
*/
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc)
mergeFunc: (VD, VD) => VD,
partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc, partitionStrategy)
}
/**

View file

@ -26,7 +26,8 @@ object GraphLoader {
path: String,
edgeParser: Array[String] => ED,
minEdgePartitions: Int = 1,
partitionStrategy: PartitionStrategy = RandomVertexCut()): GraphImpl[Int, ED] = {
partitionStrategy: PartitionStrategy = RandomVertexCut()):
Graph[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>

View file

@ -255,7 +255,7 @@ object GraphImpl {
defaultValue: VD,
partitionStrategy: PartitionStrategy):
GraphImpl[VD, ED] = {
val etable = createETable(edges).cache
val etable = createETable(edges, partitionStrategy).cache
// Get the set of all vids
val vids = etable.mapPartitions( iter => {
val (pid, epart) = iter.next()
@ -271,28 +271,28 @@ object GraphImpl {
new GraphImpl(vtable, vid2pid, localVidMap, etable, partitionStrategy)
}
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD,ED] = {
apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut())
}
// def apply[VD: ClassManifest, ED: ClassManifest](
// vertices: RDD[(Vid, VD)],
// edges: RDD[Edge[ED]],
// defaultVertexAttr: VD): GraphImpl[VD,ED] = {
// apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut())
// }
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = {
apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy)
}
// def apply[VD: ClassManifest, ED: ClassManifest](
// vertices: RDD[(Vid, VD)],
// edges: RDD[Edge[ED]],
// defaultVertexAttr: VD,
// partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = {
// apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy)
// }
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut())
}
// def apply[VD: ClassManifest, ED: ClassManifest](
// vertices: RDD[(Vid, VD)],
// edges: RDD[Edge[ED]],
// defaultVertexAttr: VD,
// mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
// apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut())
// }
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],

View file

@ -65,7 +65,7 @@ object GraphGenerators {
// Right now it just generates a bunch of edges where
// the edge data is the weight (default 1)
def logNormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = {
def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = {
// based on Pregel settings
val mu = 4
val sigma = 1.3
@ -75,11 +75,11 @@ object GraphGenerators {
src => (src, sampleLogNormal(mu, sigma, numVertices))
}
val edges = vertices.flatMap{
v => generateRandomEdges(v._1.toInt, v._2, numVertices)
val edges = vertices.flatMap{
v => generateRandomEdges(v._1.toInt, v._2, numVertices)
}
GraphImpl(vertices, edges, 0)
Graph(vertices, edges, 0)
//println("Vertices:")
//for (v <- vertices) {
// println(v.id)
@ -137,7 +137,7 @@ object GraphGenerators {
def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): GraphImpl[Int, Int] = {
def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): Graph[Int, Int] = {
// let N = requestedNumVertices
// the number of vertices is 2^n where n=ceil(log2[N])
// This ensures that the 4 quadrants are the same size at all recursion levels
@ -155,12 +155,12 @@ object GraphGenerators {
}
def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): Graph[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.srcId, 1)) }
.reduceByKey(_ + _)
.map{ case (vid, degree) => (vid, degree) }
GraphImpl(vertices, edges, 0)
Graph(vertices, edges, 0)
}
/**
@ -192,7 +192,7 @@ object GraphGenerators {
* | c | d | |
* | | | |
* *************** -
*
*
* where this represents the subquadrant of the adj matrix currently being
* subdivided. (x,y) represent the upper left hand corner of the subquadrant,
* and T represents the side length (guaranteed to be a power of 2).
@ -204,7 +204,7 @@ object GraphGenerators {
* quad = c, x'=x, y'=y+T/2, T'=T/2
* quad = d, x'=x+T/2, y'=y+T/2, T'=T/2
*
* @param src is the
* @param src is the
*/
@tailrec
def chooseCell(x: Int, y: Int, t: Int): (Int, Int) = {
@ -242,7 +242,7 @@ object GraphGenerators {
* Create `rows` by `cols` grid graph with each vertex connected to its
* row+1 and col+1 neighbors. Vertex ids are assigned in row major
* order.
*
*
* @param sc the spark context in which to construct the graph
* @param rows the number of rows
* @param cols the number of columns
@ -252,12 +252,12 @@ object GraphGenerators {
*/
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Vid = r * cols + c
def sub2ind(r: Int, c: Int): Vid = r * cols + c
val vertices: RDD[(Vid, (Int,Int))] =
val vertices: RDD[(Vid, (Int,Int))] =
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
(if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++
(if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty })
}.map{ case (src, dst) => Edge(src, dst, 1.0) }
@ -266,7 +266,7 @@ object GraphGenerators {
/**
* Create a star graph with vertex 0 being the center.
*
*
* @param sc the spark context in which to construct the graph
* @param the number of vertices in the star
*