Merge branch 'master' of github.com:amplab/graphx
This commit is contained in:
commit
b3bcfc09c7
|
@ -273,9 +273,9 @@ object Analytics extends Logging {
|
||||||
logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
|
logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
|
||||||
logInfo("GRAPHX: Number of edges " + graph.edges.count)
|
logInfo("GRAPHX: Number of edges " + graph.edges.count)
|
||||||
|
|
||||||
val pr = Analytics.pagerank(graph, numIter)
|
//val pr = Analytics.pagerank(graph, numIter)
|
||||||
// val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
|
val pr = if(isDynamic) Analytics.deltaPagerank(graph, tol, numIter)
|
||||||
// else Analytics.pagerank(graph, numIter)
|
else Analytics.pagerank(graph, numIter)
|
||||||
logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) )
|
logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) )
|
||||||
if (!outFname.isEmpty) {
|
if (!outFname.isEmpty) {
|
||||||
println("Saving pageranks of pages to " + outFname)
|
println("Saving pageranks of pages to " + outFname)
|
||||||
|
|
|
@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.Kryo
|
||||||
import org.apache.spark.graph.impl.MessageToPartition
|
import org.apache.spark.graph.impl.MessageToPartition
|
||||||
import org.apache.spark.serializer.KryoRegistrator
|
import org.apache.spark.serializer.KryoRegistrator
|
||||||
import org.apache.spark.graph.impl._
|
import org.apache.spark.graph.impl._
|
||||||
|
import scala.collection.mutable.BitSet
|
||||||
|
|
||||||
class GraphKryoRegistrator extends KryoRegistrator {
|
class GraphKryoRegistrator extends KryoRegistrator {
|
||||||
|
|
||||||
|
@ -14,6 +15,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
|
||||||
kryo.register(classOf[MessageToPartition[Object]])
|
kryo.register(classOf[MessageToPartition[Object]])
|
||||||
kryo.register(classOf[(Vid, Object)])
|
kryo.register(classOf[(Vid, Object)])
|
||||||
kryo.register(classOf[EdgePartition[Object]])
|
kryo.register(classOf[EdgePartition[Object]])
|
||||||
|
kryo.register(classOf[BitSet])
|
||||||
|
|
||||||
// This avoids a large number of hash table lookups.
|
// This avoids a large number of hash table lookups.
|
||||||
kryo.setReferences(false)
|
kryo.setReferences(false)
|
||||||
|
|
|
@ -55,7 +55,6 @@ object GraphLoader {
|
||||||
private def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
|
private def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
|
||||||
val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) }
|
val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) }
|
||||||
.reduceByKey(_ + _)
|
.reduceByKey(_ + _)
|
||||||
.map{ case (vid, degree) => (vid, degree) }
|
|
||||||
GraphImpl(vertices, edges, 0)
|
GraphImpl(vertices, edges, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,7 +190,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
||||||
override def filter(pred: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
|
override def filter(pred: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
|
||||||
val cleanF = index.rdd.context.clean(pred)
|
val cleanF = index.rdd.context.clean(pred)
|
||||||
val newValues = index.rdd.zipPartitions(valuesRDD){
|
val newValues = index.rdd.zipPartitions(valuesRDD){
|
||||||
(keysIter, valuesIter) =>
|
(keysIter: Iterator[VertexIdToIndexMap],
|
||||||
|
valuesIter: Iterator[(IndexedSeq[V], BitSet)]) =>
|
||||||
val index = keysIter.next()
|
val index = keysIter.next()
|
||||||
assert(keysIter.hasNext() == false)
|
assert(keysIter.hasNext() == false)
|
||||||
val (oldValues, bs) = valuesIter.next()
|
val (oldValues, bs) = valuesIter.next()
|
||||||
|
@ -222,7 +223,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
||||||
val cleanF = index.rdd.context.clean(f)
|
val cleanF = index.rdd.context.clean(f)
|
||||||
val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
|
val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
|
||||||
valuesRDD.mapPartitions(iter => iter.map{
|
valuesRDD.mapPartitions(iter => iter.map{
|
||||||
case (values, bs) =>
|
case (values, bs: BitSet) =>
|
||||||
/**
|
/**
|
||||||
* @todo Consider using a view rather than creating a new
|
* @todo Consider using a view rather than creating a new
|
||||||
* array. This is already being done for join operations.
|
* array. This is already being done for join operations.
|
||||||
|
@ -255,10 +256,11 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
||||||
val cleanF = index.rdd.context.clean(f)
|
val cleanF = index.rdd.context.clean(f)
|
||||||
val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
|
val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
|
||||||
index.rdd.zipPartitions(valuesRDD){
|
index.rdd.zipPartitions(valuesRDD){
|
||||||
(keysIter, valuesIter) =>
|
(keysIter: Iterator[VertexIdToIndexMap],
|
||||||
|
valuesIter: Iterator[(IndexedSeq[V], BitSet)]) =>
|
||||||
val index = keysIter.next()
|
val index = keysIter.next()
|
||||||
assert(keysIter.hasNext() == false)
|
assert(keysIter.hasNext() == false)
|
||||||
val (oldValues, bs) = valuesIter.next()
|
val (oldValues, bs: BitSet) = valuesIter.next()
|
||||||
assert(valuesIter.hasNext() == false)
|
assert(valuesIter.hasNext() == false)
|
||||||
/**
|
/**
|
||||||
* @todo Consider using a view rather than creating a new array.
|
* @todo Consider using a view rather than creating a new array.
|
||||||
|
@ -296,10 +298,11 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
||||||
}
|
}
|
||||||
val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] =
|
val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] =
|
||||||
valuesRDD.zipPartitions(other.valuesRDD){
|
valuesRDD.zipPartitions(other.valuesRDD){
|
||||||
(thisIter, otherIter) =>
|
(thisIter: Iterator[(IndexedSeq[V], BitSet)],
|
||||||
val (thisValues, thisBS) = thisIter.next()
|
otherIter: Iterator[(IndexedSeq[W], BitSet)]) =>
|
||||||
|
val (thisValues, thisBS: BitSet) = thisIter.next()
|
||||||
assert(!thisIter.hasNext)
|
assert(!thisIter.hasNext)
|
||||||
val (otherValues, otherBS) = otherIter.next()
|
val (otherValues, otherBS: BitSet) = otherIter.next()
|
||||||
assert(!otherIter.hasNext)
|
assert(!otherIter.hasNext)
|
||||||
val newBS = thisBS & otherBS
|
val newBS = thisBS & otherBS
|
||||||
val newValues = thisValues.view.zip(otherValues)
|
val newValues = thisValues.view.zip(otherValues)
|
||||||
|
@ -328,11 +331,13 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
||||||
if(index != other.index) {
|
if(index != other.index) {
|
||||||
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
|
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
|
||||||
}
|
}
|
||||||
val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
|
val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
|
||||||
(thisIter, otherIter) =>
|
valuesRDD.zipPartitions(other.valuesRDD){
|
||||||
val (thisValues, thisBS) = thisIter.next()
|
(thisIter: Iterator[(IndexedSeq[V], BitSet)],
|
||||||
|
otherIter: Iterator[(IndexedSeq[W], BitSet)]) =>
|
||||||
|
val (thisValues, thisBS: BitSet) = thisIter.next()
|
||||||
assert(!thisIter.hasNext)
|
assert(!thisIter.hasNext)
|
||||||
val (otherValues, otherBS) = otherIter.next()
|
val (otherValues, otherBS: BitSet) = otherIter.next()
|
||||||
assert(!otherIter.hasNext)
|
assert(!otherIter.hasNext)
|
||||||
val otherOption = otherValues.view.zipWithIndex
|
val otherOption = otherValues.view.zipWithIndex
|
||||||
.map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None }
|
.map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None }
|
||||||
|
@ -384,7 +389,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
||||||
// Compute the new values RDD
|
// Compute the new values RDD
|
||||||
val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
|
val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
|
||||||
index.rdd.zipPartitions(valuesRDD, otherShuffled) {
|
index.rdd.zipPartitions(valuesRDD, otherShuffled) {
|
||||||
(thisIndexIter, thisIter, tuplesIter) =>
|
(thisIndexIter: Iterator[VertexIdToIndexMap],
|
||||||
|
thisIter: Iterator[(IndexedSeq[V], BitSet)],
|
||||||
|
tuplesIter: Iterator[(Vid,W)]) =>
|
||||||
// Get the Index and values for this RDD
|
// Get the Index and values for this RDD
|
||||||
val index = thisIndexIter.next()
|
val index = thisIndexIter.next()
|
||||||
assert(!thisIndexIter.hasNext)
|
assert(!thisIndexIter.hasNext)
|
||||||
|
@ -618,10 +625,14 @@ object VertexSetRDD {
|
||||||
def apply[V: ClassManifest](
|
def apply[V: ClassManifest](
|
||||||
rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
|
rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
|
||||||
// Preaggregate and shuffle if necessary
|
// Preaggregate and shuffle if necessary
|
||||||
// Preaggregation.
|
val preAgg = rdd.partitioner match {
|
||||||
val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc)
|
case Some(p) => rdd
|
||||||
val partitioner = new HashPartitioner(rdd.partitions.size)
|
case None =>
|
||||||
val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
|
val partitioner = new HashPartitioner(rdd.partitions.size)
|
||||||
|
// Preaggregation.
|
||||||
|
val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc)
|
||||||
|
rdd.mapPartitions(aggregator.combineValuesByKey, true).partitionBy(partitioner)
|
||||||
|
}
|
||||||
|
|
||||||
val groups = preAgg.mapPartitions( iter => {
|
val groups = preAgg.mapPartitions( iter => {
|
||||||
val indexMap = new VertexIdToIndexMap()
|
val indexMap = new VertexIdToIndexMap()
|
||||||
|
|
|
@ -80,21 +80,6 @@ object EdgeTripletBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// {
|
|
||||||
// val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => {
|
|
||||||
// val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next()
|
|
||||||
// assert(iter.hasNext == false)
|
|
||||||
// // Return an iterator that looks up the hash map to find matching
|
|
||||||
// // vertices for each edge.
|
|
||||||
// new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
|
|
||||||
// }
|
|
||||||
// ClosureCleaner.clean(iterFun)
|
|
||||||
// localVidMap.zipJoin(vTableReplicatedValues).zipJoinRDD(eTable)
|
|
||||||
// .mapPartitions( iterFun ) // end of map partition
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Graph RDD that supports computation on graphs.
|
* A Graph RDD that supports computation on graphs.
|
||||||
*/
|
*/
|
||||||
|
@ -105,9 +90,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
@transient val eTable: RDD[(Pid, EdgePartition[ED])] )
|
@transient val eTable: RDD[(Pid, EdgePartition[ED])] )
|
||||||
extends Graph[VD, ED] {
|
extends Graph[VD, ED] {
|
||||||
|
|
||||||
// def this() = this(null,null,null)
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
|
* (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
|
||||||
* vertex data after it is replicated. Within each partition, it holds a map
|
* vertex data after it is replicated. Within each partition, it holds a map
|
||||||
|
@ -127,7 +109,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
|
|
||||||
/** Return a RDD of edges. */
|
/** Return a RDD of edges. */
|
||||||
@transient override val edges: RDD[Edge[ED]] = {
|
@transient override val edges: RDD[Edge[ED]] = {
|
||||||
eTable.mapPartitions { iter => iter.next()._2.iterator }
|
eTable.mapPartitions( iter => iter.next()._2.iterator , true )
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -136,21 +118,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable)
|
EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable)
|
||||||
|
|
||||||
|
|
||||||
// {
|
|
||||||
// val iterFun = (iter: Iterator[(Pid, (VertexHashMap[VD], EdgePartition[ED]))]) => {
|
|
||||||
// val (pid, (vmap, edgePartition)) = iter.next()
|
|
||||||
// //assert(iter.hasNext == false)
|
|
||||||
// // Return an iterator that looks up the hash map to find matching
|
|
||||||
// // vertices for each edge.
|
|
||||||
// new EdgeTripletIterator(vmap, edgePartition)
|
|
||||||
// }
|
|
||||||
// ClosureCleaner.clean(iterFun)
|
|
||||||
// vTableReplicated.join(eTable).mapPartitions( iterFun ) // end of map partition
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
override def cache(): Graph[VD, ED] = {
|
override def cache(): Graph[VD, ED] = {
|
||||||
eTable.cache()
|
eTable.cache()
|
||||||
vid2pid.cache()
|
vid2pid.cache()
|
||||||
|
@ -175,6 +142,64 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Display the lineage information for this graph.
|
||||||
|
*/
|
||||||
|
def printLineage() = {
|
||||||
|
|
||||||
|
def traverseLineage(rdd: RDD[_], indent: String = "", visited: Map[Int, String] = Map.empty[Int, String]) {
|
||||||
|
if(visited.contains(rdd.id)) {
|
||||||
|
println(indent + visited(rdd.id))
|
||||||
|
println(indent)
|
||||||
|
} else {
|
||||||
|
val locs = rdd.partitions.map( p => rdd.preferredLocations(p) )
|
||||||
|
val cacheLevel = rdd.getStorageLevel
|
||||||
|
val name = rdd.id
|
||||||
|
val deps = rdd.dependencies
|
||||||
|
val partitioner = rdd.partitioner
|
||||||
|
val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0}
|
||||||
|
println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner + ", " + numparts +")")
|
||||||
|
println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString)
|
||||||
|
println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", "))
|
||||||
|
deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println("eTable ------------------------------------------")
|
||||||
|
traverseLineage(eTable, " ")
|
||||||
|
var visited = Map(eTable.id -> "eTable")
|
||||||
|
|
||||||
|
println("\n\nvTable.index ------------------------------------")
|
||||||
|
traverseLineage(vTable.index.rdd, " ", visited)
|
||||||
|
visited += (vTable.index.rdd.id -> "vTable.index")
|
||||||
|
|
||||||
|
println("\n\nvTable.values ------------------------------------")
|
||||||
|
traverseLineage(vTable.valuesRDD, " ", visited)
|
||||||
|
visited += (vTable.valuesRDD.id -> "vTable.values")
|
||||||
|
|
||||||
|
println("\n\nvTable ------------------------------------------")
|
||||||
|
traverseLineage(vTable, " ", visited)
|
||||||
|
visited += (vTable.id -> "vTable")
|
||||||
|
|
||||||
|
println("\n\nvid2pid -----------------------------------------")
|
||||||
|
traverseLineage(vid2pid, " ", visited)
|
||||||
|
visited += (vid2pid.id -> "vid2pid")
|
||||||
|
visited += (vid2pid.valuesRDD.id -> "vid2pid.values")
|
||||||
|
|
||||||
|
println("\n\nlocalVidMap -------------------------------------")
|
||||||
|
traverseLineage(localVidMap, " ", visited)
|
||||||
|
visited += (localVidMap.id -> "localVidMap")
|
||||||
|
|
||||||
|
println("\n\nvTableReplicatedValues --------------------------")
|
||||||
|
traverseLineage(vTableReplicatedValues, " ", visited)
|
||||||
|
visited += (vTableReplicatedValues.id -> "vTableReplicatedValues")
|
||||||
|
|
||||||
|
println("\n\ntriplets ----------------------------------------")
|
||||||
|
traverseLineage(triplets, " ", visited)
|
||||||
|
println(visited)
|
||||||
|
} // end of print lineage
|
||||||
|
|
||||||
|
|
||||||
override def reverse: Graph[VD, ED] = {
|
override def reverse: Graph[VD, ED] = {
|
||||||
val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) },
|
val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) },
|
||||||
preservesPartitioning = true)
|
preservesPartitioning = true)
|
||||||
|
@ -213,13 +238,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
new GraphImpl(vTable, vid2pid, localVidMap, newETable)
|
new GraphImpl(vTable, vid2pid, localVidMap, newETable)
|
||||||
}
|
}
|
||||||
|
|
||||||
// override def correctEdges(): Graph[VD, ED] = {
|
|
||||||
// val sc = vertices.context
|
|
||||||
// val vset = sc.broadcast(vertices.map(_.id).collect().toSet)
|
|
||||||
// val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst))
|
|
||||||
// Graph(vertices, newEdges)
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
|
override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
|
||||||
vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = {
|
vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = {
|
||||||
|
@ -382,16 +400,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
object GraphImpl {
|
object GraphImpl {
|
||||||
|
|
||||||
def apply[VD: ClassManifest, ED: ClassManifest](
|
def apply[VD: ClassManifest, ED: ClassManifest](
|
||||||
|
@ -528,28 +536,6 @@ object GraphImpl {
|
||||||
}.cache()
|
}.cache()
|
||||||
|
|
||||||
// @todo assert edge table has partitioner
|
// @todo assert edge table has partitioner
|
||||||
|
|
||||||
// val localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap] =
|
|
||||||
// msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {
|
|
||||||
// val vidToIndex = new VertexIdToIndexMap
|
|
||||||
// var i = 0
|
|
||||||
// for (msg <- iter) {
|
|
||||||
// vidToIndex.put(msg.data._1, i)
|
|
||||||
// i += 1
|
|
||||||
// }
|
|
||||||
// Array((pid, vidToIndex)).iterator
|
|
||||||
// }, preservesPartitioning = true).indexed(eTable.index)
|
|
||||||
|
|
||||||
// val vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]] =
|
|
||||||
// msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {
|
|
||||||
// val vertexArray = ArrayBuilder.make[VD]
|
|
||||||
// for (msg <- iter) {
|
|
||||||
// vertexArray += msg.data._2
|
|
||||||
// }
|
|
||||||
// Array((pid, vertexArray.result)).iterator
|
|
||||||
// }, preservesPartitioning = true).indexed(eTable.index)
|
|
||||||
|
|
||||||
// (localVidMap, vTableReplicatedValues)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue