From d513addb77916fba78ce321936fe5b54385991d4 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 30 Oct 2013 20:05:29 -0700 Subject: [PATCH] added lineage tracking code --- .../apache/spark/graph/impl/GraphImpl.scala | 70 ++++++++++++++++--- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 13f9c53189..016811db36 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -109,7 +109,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( /** Return a RDD of edges. */ @transient override val edges: RDD[Edge[ED]] = { - eTable.mapPartitions { iter => iter.next()._2.iterator } + eTable.mapPartitions( iter => iter.next()._2.iterator , true ) } @@ -142,6 +142,64 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } + /** + * Display the lineage information for this graph. + */ + def printLineage() = { + + def traverseLineage(rdd: RDD[_], indent: String = "", visited: Map[Int, String] = Map.empty[Int, String]) { + if(visited.contains(rdd.id)) { + println(indent + visited(rdd.id)) + println(indent) + } else { + val locs = rdd.partitions.map( p => rdd.preferredLocations(p) ) + val cacheLevel = rdd.getStorageLevel + val name = rdd.id + val deps = rdd.dependencies + val partitioner = rdd.partitioner + val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0} + println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner + ", " + numparts +")") + println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString) + println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", ")) + deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited)) + } + } + + println("eTable ------------------------------------------") + traverseLineage(eTable, " ") + var visited = Map(eTable.id -> "eTable") + + println("\n\nvTable.index ------------------------------------") + traverseLineage(vTable.index.rdd, " ", visited) + visited += (vTable.index.rdd.id -> "vTable.index") + + println("\n\nvTable.values ------------------------------------") + traverseLineage(vTable.valuesRDD, " ", visited) + visited += (vTable.valuesRDD.id -> "vTable.values") + + println("\n\nvTable ------------------------------------------") + traverseLineage(vTable, " ", visited) + visited += (vTable.id -> "vTable") + + println("\n\nvid2pid -----------------------------------------") + traverseLineage(vid2pid, " ", visited) + visited += (vid2pid.id -> "vid2pid") + visited += (vid2pid.valuesRDD.id -> "vid2pid.values") + + println("\n\nlocalVidMap -------------------------------------") + traverseLineage(localVidMap, " ", visited) + visited += (localVidMap.id -> "localVidMap") + + println("\n\nvTableReplicatedValues --------------------------") + traverseLineage(vTableReplicatedValues, " ", visited) + visited += (vTableReplicatedValues.id -> "vTableReplicatedValues") + + println("\n\ntriplets ----------------------------------------") + traverseLineage(triplets, " ", visited) + println(visited) + } // end of print lineage + + override def reverse: Graph[VD, ED] = { val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) }, preservesPartitioning = true) @@ -342,16 +400,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( - - - - - - - - - - object GraphImpl { def apply[VD: ClassManifest, ED: ClassManifest](