diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index e45d7748d9..a67cc44f6e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -273,9 +273,9 @@ object Analytics extends Logging { logInfo("GRAPHX: Number of vertices " + graph.vertices.count) logInfo("GRAPHX: Number of edges " + graph.edges.count) - val pr = Analytics.pagerank(graph, numIter) - // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) - // else Analytics.pagerank(graph, numIter) + //val pr = Analytics.pagerank(graph, numIter) + val pr = if(isDynamic) Analytics.deltaPagerank(graph, tol, numIter) + else Analytics.pagerank(graph, numIter) logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) ) if (!outFname.isEmpty) { println("Saving pageranks of pages to " + outFname) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index 29ea38ec67..13edd8fd1a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.Kryo import org.apache.spark.graph.impl.MessageToPartition import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.graph.impl._ +import scala.collection.mutable.BitSet class GraphKryoRegistrator extends KryoRegistrator { @@ -14,6 +15,7 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[MessageToPartition[Object]]) kryo.register(classOf[(Vid, Object)]) kryo.register(classOf[EdgePartition[Object]]) + kryo.register(classOf[BitSet]) // This avoids a large number of hash table lookups. kryo.setReferences(false) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 2295084024..313737fdbe 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -55,7 +55,6 @@ object GraphLoader { private def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) } .reduceByKey(_ + _) - .map{ case (vid, degree) => (vid, degree) } GraphImpl(vertices, edges, 0) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index a64eb6a99c..9b7e043272 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -190,7 +190,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( override def filter(pred: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = { val cleanF = index.rdd.context.clean(pred) val newValues = index.rdd.zipPartitions(valuesRDD){ - (keysIter, valuesIter) => + (keysIter: Iterator[VertexIdToIndexMap], + valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => val index = keysIter.next() assert(keysIter.hasNext() == false) val (oldValues, bs) = valuesIter.next() @@ -222,7 +223,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( val cleanF = index.rdd.context.clean(f) val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = valuesRDD.mapPartitions(iter => iter.map{ - case (values, bs) => + case (values, bs: BitSet) => /** * @todo Consider using a view rather than creating a new * array. This is already being done for join operations. @@ -255,10 +256,11 @@ class VertexSetRDD[@specialized V: ClassManifest]( val cleanF = index.rdd.context.clean(f) val newValues: RDD[ (IndexedSeq[U], BitSet) ] = index.rdd.zipPartitions(valuesRDD){ - (keysIter, valuesIter) => + (keysIter: Iterator[VertexIdToIndexMap], + valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => val index = keysIter.next() assert(keysIter.hasNext() == false) - val (oldValues, bs) = valuesIter.next() + val (oldValues, bs: BitSet) = valuesIter.next() assert(valuesIter.hasNext() == false) /** * @todo Consider using a view rather than creating a new array. @@ -296,10 +298,11 @@ class VertexSetRDD[@specialized V: ClassManifest]( } val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter, otherIter) => - val (thisValues, thisBS) = thisIter.next() + (thisIter: Iterator[(IndexedSeq[V], BitSet)], + otherIter: Iterator[(IndexedSeq[W], BitSet)]) => + val (thisValues, thisBS: BitSet) = thisIter.next() assert(!thisIter.hasNext) - val (otherValues, otherBS) = otherIter.next() + val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) val newBS = thisBS & otherBS val newValues = thisValues.view.zip(otherValues) @@ -328,11 +331,13 @@ class VertexSetRDD[@specialized V: ClassManifest]( if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } - val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter, otherIter) => - val (thisValues, thisBS) = thisIter.next() + val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = + valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter: Iterator[(IndexedSeq[V], BitSet)], + otherIter: Iterator[(IndexedSeq[W], BitSet)]) => + val (thisValues, thisBS: BitSet) = thisIter.next() assert(!thisIter.hasNext) - val (otherValues, otherBS) = otherIter.next() + val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) val otherOption = otherValues.view.zipWithIndex .map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None } @@ -384,7 +389,9 @@ class VertexSetRDD[@specialized V: ClassManifest]( // Compute the new values RDD val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = index.rdd.zipPartitions(valuesRDD, otherShuffled) { - (thisIndexIter, thisIter, tuplesIter) => + (thisIndexIter: Iterator[VertexIdToIndexMap], + thisIter: Iterator[(IndexedSeq[V], BitSet)], + tuplesIter: Iterator[(Vid,W)]) => // Get the Index and values for this RDD val index = thisIndexIter.next() assert(!thisIndexIter.hasNext) @@ -618,10 +625,14 @@ object VertexSetRDD { def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = { // Preaggregate and shuffle if necessary - // Preaggregation. - val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc) - val partitioner = new HashPartitioner(rdd.partitions.size) - val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) + val preAgg = rdd.partitioner match { + case Some(p) => rdd + case None => + val partitioner = new HashPartitioner(rdd.partitions.size) + // Preaggregation. + val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc) + rdd.mapPartitions(aggregator.combineValuesByKey, true).partitionBy(partitioner) + } val groups = preAgg.mapPartitions( iter => { val indexMap = new VertexIdToIndexMap() diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index d7cabce34f..d545ba5d1b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -80,21 +80,6 @@ object EdgeTripletBuilder { } -// { -// val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => { -// val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next() -// assert(iter.hasNext == false) -// // Return an iterator that looks up the hash map to find matching -// // vertices for each edge. -// new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition) -// } -// ClosureCleaner.clean(iterFun) -// localVidMap.zipJoin(vTableReplicatedValues).zipJoinRDD(eTable) -// .mapPartitions( iterFun ) // end of map partition -// } -// } - - /** * A Graph RDD that supports computation on graphs. */ @@ -105,9 +90,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val eTable: RDD[(Pid, EdgePartition[ED])] ) extends Graph[VD, ED] { -// def this() = this(null,null,null) - - /** * (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the * vertex data after it is replicated. Within each partition, it holds a map @@ -127,7 +109,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( /** Return a RDD of edges. */ @transient override val edges: RDD[Edge[ED]] = { - eTable.mapPartitions { iter => iter.next()._2.iterator } + eTable.mapPartitions( iter => iter.next()._2.iterator , true ) } @@ -136,21 +118,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable) - // { - // val iterFun = (iter: Iterator[(Pid, (VertexHashMap[VD], EdgePartition[ED]))]) => { - // val (pid, (vmap, edgePartition)) = iter.next() - // //assert(iter.hasNext == false) - // // Return an iterator that looks up the hash map to find matching - // // vertices for each edge. - // new EdgeTripletIterator(vmap, edgePartition) - // } - // ClosureCleaner.clean(iterFun) - // vTableReplicated.join(eTable).mapPartitions( iterFun ) // end of map partition - // } - - - - override def cache(): Graph[VD, ED] = { eTable.cache() vid2pid.cache() @@ -175,6 +142,64 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } + /** + * Display the lineage information for this graph. + */ + def printLineage() = { + + def traverseLineage(rdd: RDD[_], indent: String = "", visited: Map[Int, String] = Map.empty[Int, String]) { + if(visited.contains(rdd.id)) { + println(indent + visited(rdd.id)) + println(indent) + } else { + val locs = rdd.partitions.map( p => rdd.preferredLocations(p) ) + val cacheLevel = rdd.getStorageLevel + val name = rdd.id + val deps = rdd.dependencies + val partitioner = rdd.partitioner + val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0} + println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner + ", " + numparts +")") + println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString) + println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", ")) + deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited)) + } + } + + println("eTable ------------------------------------------") + traverseLineage(eTable, " ") + var visited = Map(eTable.id -> "eTable") + + println("\n\nvTable.index ------------------------------------") + traverseLineage(vTable.index.rdd, " ", visited) + visited += (vTable.index.rdd.id -> "vTable.index") + + println("\n\nvTable.values ------------------------------------") + traverseLineage(vTable.valuesRDD, " ", visited) + visited += (vTable.valuesRDD.id -> "vTable.values") + + println("\n\nvTable ------------------------------------------") + traverseLineage(vTable, " ", visited) + visited += (vTable.id -> "vTable") + + println("\n\nvid2pid -----------------------------------------") + traverseLineage(vid2pid, " ", visited) + visited += (vid2pid.id -> "vid2pid") + visited += (vid2pid.valuesRDD.id -> "vid2pid.values") + + println("\n\nlocalVidMap -------------------------------------") + traverseLineage(localVidMap, " ", visited) + visited += (localVidMap.id -> "localVidMap") + + println("\n\nvTableReplicatedValues --------------------------") + traverseLineage(vTableReplicatedValues, " ", visited) + visited += (vTableReplicatedValues.id -> "vTableReplicatedValues") + + println("\n\ntriplets ----------------------------------------") + traverseLineage(triplets, " ", visited) + println(visited) + } // end of print lineage + + override def reverse: Graph[VD, ED] = { val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) }, preservesPartitioning = true) @@ -213,13 +238,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( new GraphImpl(vTable, vid2pid, localVidMap, newETable) } - // override def correctEdges(): Graph[VD, ED] = { - // val sc = vertices.context - // val vset = sc.broadcast(vertices.map(_.id).collect().toSet) - // val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst)) - // Graph(vertices, newEdges) - // } - override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = { @@ -382,16 +400,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( - - - - - - - - - - object GraphImpl { def apply[VD: ClassManifest, ED: ClassManifest]( @@ -528,28 +536,6 @@ object GraphImpl { }.cache() // @todo assert edge table has partitioner - - // val localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap] = - // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { - // val vidToIndex = new VertexIdToIndexMap - // var i = 0 - // for (msg <- iter) { - // vidToIndex.put(msg.data._1, i) - // i += 1 - // } - // Array((pid, vidToIndex)).iterator - // }, preservesPartitioning = true).indexed(eTable.index) - - // val vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]] = - // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { - // val vertexArray = ArrayBuilder.make[VD] - // for (msg <- iter) { - // vertexArray += msg.data._2 - // } - // Array((pid, vertexArray.result)).iterator - // }, preservesPartitioning = true).indexed(eTable.index) - - // (localVidMap, vTableReplicatedValues) }