added lineage tracking code
This commit is contained in:
parent
a4b8ddf417
commit
d513addb77
|
@ -109,7 +109,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
|
|
||||||
/** Return a RDD of edges. */
|
/** Return a RDD of edges. */
|
||||||
@transient override val edges: RDD[Edge[ED]] = {
|
@transient override val edges: RDD[Edge[ED]] = {
|
||||||
eTable.mapPartitions { iter => iter.next()._2.iterator }
|
eTable.mapPartitions( iter => iter.next()._2.iterator , true )
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -142,6 +142,64 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Display the lineage information for this graph.
|
||||||
|
*/
|
||||||
|
def printLineage() = {
|
||||||
|
|
||||||
|
def traverseLineage(rdd: RDD[_], indent: String = "", visited: Map[Int, String] = Map.empty[Int, String]) {
|
||||||
|
if(visited.contains(rdd.id)) {
|
||||||
|
println(indent + visited(rdd.id))
|
||||||
|
println(indent)
|
||||||
|
} else {
|
||||||
|
val locs = rdd.partitions.map( p => rdd.preferredLocations(p) )
|
||||||
|
val cacheLevel = rdd.getStorageLevel
|
||||||
|
val name = rdd.id
|
||||||
|
val deps = rdd.dependencies
|
||||||
|
val partitioner = rdd.partitioner
|
||||||
|
val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0}
|
||||||
|
println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner + ", " + numparts +")")
|
||||||
|
println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString)
|
||||||
|
println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", "))
|
||||||
|
deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println("eTable ------------------------------------------")
|
||||||
|
traverseLineage(eTable, " ")
|
||||||
|
var visited = Map(eTable.id -> "eTable")
|
||||||
|
|
||||||
|
println("\n\nvTable.index ------------------------------------")
|
||||||
|
traverseLineage(vTable.index.rdd, " ", visited)
|
||||||
|
visited += (vTable.index.rdd.id -> "vTable.index")
|
||||||
|
|
||||||
|
println("\n\nvTable.values ------------------------------------")
|
||||||
|
traverseLineage(vTable.valuesRDD, " ", visited)
|
||||||
|
visited += (vTable.valuesRDD.id -> "vTable.values")
|
||||||
|
|
||||||
|
println("\n\nvTable ------------------------------------------")
|
||||||
|
traverseLineage(vTable, " ", visited)
|
||||||
|
visited += (vTable.id -> "vTable")
|
||||||
|
|
||||||
|
println("\n\nvid2pid -----------------------------------------")
|
||||||
|
traverseLineage(vid2pid, " ", visited)
|
||||||
|
visited += (vid2pid.id -> "vid2pid")
|
||||||
|
visited += (vid2pid.valuesRDD.id -> "vid2pid.values")
|
||||||
|
|
||||||
|
println("\n\nlocalVidMap -------------------------------------")
|
||||||
|
traverseLineage(localVidMap, " ", visited)
|
||||||
|
visited += (localVidMap.id -> "localVidMap")
|
||||||
|
|
||||||
|
println("\n\nvTableReplicatedValues --------------------------")
|
||||||
|
traverseLineage(vTableReplicatedValues, " ", visited)
|
||||||
|
visited += (vTableReplicatedValues.id -> "vTableReplicatedValues")
|
||||||
|
|
||||||
|
println("\n\ntriplets ----------------------------------------")
|
||||||
|
traverseLineage(triplets, " ", visited)
|
||||||
|
println(visited)
|
||||||
|
} // end of print lineage
|
||||||
|
|
||||||
|
|
||||||
override def reverse: Graph[VD, ED] = {
|
override def reverse: Graph[VD, ED] = {
|
||||||
val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) },
|
val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) },
|
||||||
preservesPartitioning = true)
|
preservesPartitioning = true)
|
||||||
|
@ -342,16 +400,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
object GraphImpl {
|
object GraphImpl {
|
||||||
|
|
||||||
def apply[VD: ClassManifest, ED: ClassManifest](
|
def apply[VD: ClassManifest, ED: ClassManifest](
|
||||||
|
|
Loading…
Reference in a new issue