From 730a3156d394b6ba7b7caa0087bd8c09d62dcfad Mon Sep 17 00:00:00 2001 From: Dan Crankshaw Date: Sat, 5 Oct 2013 19:44:28 -0700 Subject: [PATCH] Added initial groupEdges code. Still a prototype, I haven't figured out quite how it should all work yet. --- .../scala/org/apache/spark/graph/Graph.scala | 2 + .../apache/spark/graph/impl/GraphImpl.scala | 53 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 40673bbc90..6fe71b4756 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -188,6 +188,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { // */ // def combineEdges(reduce: (ED, ED) => ED): Graph[VD, ED] + def groupEdges[ED2: ClassManifest](f: Iter[EdgeTriplet[ED,VD]] => ED2 ): Graph[VD,ED2] + diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 0ba39d8d80..d0e03e0ce2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -118,6 +118,59 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( new GraphImpl(newVertices, newEdges) } + + // Because of the edgepartitioner, we know that all edges with the same src and dst + // will be in the same partition + + // We will want to keep the same partitioning scheme. Use newGraph() rather than + // new GraphImpl() + override def groupEdges[ED2: ClassManifest](f: Iterator[EdgeTriplet[ED,VD]] => ED2 ): + Graph[VD,ED2] = { + + // I think that + // myRDD.mapPartitions { part => + // val (vmap, edges) = part.next() + // gives me access to the vertex map and the set of + // edges within that partition + + // This is what happens during mapPartitions + // The iterator iterates over all partitions + // val result: RDD[U] = new RDD[T]().mapPartitions(f: Iterator[T] => Iterator[U]) + + // TODO(crankshaw) figure out how to actually get the new Edge RDD and what + // type that should have + val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIterator => + // toList lets us operate on all EdgeTriplets in a single partition at once + partIterator.toList + // groups all ETs in this partition that have the same src and dst + // Because all ETs with the same src and dst will live on the same + // partition due to the EdgePartitioner, this guarantees that these + // ET groups will be complete. + .groupBy { t => (t.src.id, t.dst.id) } + // Apply the user supplied supplied edge group function to + // each group of edges + // The result of this line is Map[(Long, Long, ED2] + .mapValues { ts => f(ts.toIterator) } + // convert the resulting map back to a list of tuples + .toList + // map over those tuples that contain src and dst info plus the + // new edge data to make my new edges + .map { case ((src, dst), data) => Edge(src, dst, data) } + + // How do I convert from a scala map to a list? + // I want to be able to apply a function like: + // f: (key, value): (K, V) => result: [R] + // so that I can transfrom a Map[K, V] to List[R] + + // Maybe look at collections.breakOut + // see http://stackoverflow.com/questions/1715681/scala-2-8-breakout + // and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list + + } + newGraph(vertices, newEdges) + + } + ////////////////////////////////////////////////////////////////////////////////////////////////// // Lower level transformation methods //////////////////////////////////////////////////////////////////////////////////////////////////