Added initial groupEdges code. Still a prototype, I haven't figured out quite how it should all work yet.
This commit is contained in:
parent
bfedbee13a
commit
730a3156d3
|
@ -188,6 +188,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
||||||
// */
|
// */
|
||||||
// def combineEdges(reduce: (ED, ED) => ED): Graph[VD, ED]
|
// def combineEdges(reduce: (ED, ED) => ED): Graph[VD, ED]
|
||||||
|
|
||||||
|
def groupEdges[ED2: ClassManifest](f: Iter[EdgeTriplet[ED,VD]] => ED2 ): Graph[VD,ED2]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -118,6 +118,59 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
new GraphImpl(newVertices, newEdges)
|
new GraphImpl(newVertices, newEdges)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Because of the edgepartitioner, we know that all edges with the same src and dst
|
||||||
|
// will be in the same partition
|
||||||
|
|
||||||
|
// We will want to keep the same partitioning scheme. Use newGraph() rather than
|
||||||
|
// new GraphImpl()
|
||||||
|
override def groupEdges[ED2: ClassManifest](f: Iterator[EdgeTriplet[ED,VD]] => ED2 ):
|
||||||
|
Graph[VD,ED2] = {
|
||||||
|
|
||||||
|
// I think that
|
||||||
|
// myRDD.mapPartitions { part =>
|
||||||
|
// val (vmap, edges) = part.next()
|
||||||
|
// gives me access to the vertex map and the set of
|
||||||
|
// edges within that partition
|
||||||
|
|
||||||
|
// This is what happens during mapPartitions
|
||||||
|
// The iterator iterates over all partitions
|
||||||
|
// val result: RDD[U] = new RDD[T]().mapPartitions(f: Iterator[T] => Iterator[U])
|
||||||
|
|
||||||
|
// TODO(crankshaw) figure out how to actually get the new Edge RDD and what
|
||||||
|
// type that should have
|
||||||
|
val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIterator =>
|
||||||
|
// toList lets us operate on all EdgeTriplets in a single partition at once
|
||||||
|
partIterator.toList
|
||||||
|
// groups all ETs in this partition that have the same src and dst
|
||||||
|
// Because all ETs with the same src and dst will live on the same
|
||||||
|
// partition due to the EdgePartitioner, this guarantees that these
|
||||||
|
// ET groups will be complete.
|
||||||
|
.groupBy { t => (t.src.id, t.dst.id) }
|
||||||
|
// Apply the user supplied supplied edge group function to
|
||||||
|
// each group of edges
|
||||||
|
// The result of this line is Map[(Long, Long, ED2]
|
||||||
|
.mapValues { ts => f(ts.toIterator) }
|
||||||
|
// convert the resulting map back to a list of tuples
|
||||||
|
.toList
|
||||||
|
// map over those tuples that contain src and dst info plus the
|
||||||
|
// new edge data to make my new edges
|
||||||
|
.map { case ((src, dst), data) => Edge(src, dst, data) }
|
||||||
|
|
||||||
|
// How do I convert from a scala map to a list?
|
||||||
|
// I want to be able to apply a function like:
|
||||||
|
// f: (key, value): (K, V) => result: [R]
|
||||||
|
// so that I can transfrom a Map[K, V] to List[R]
|
||||||
|
|
||||||
|
// Maybe look at collections.breakOut
|
||||||
|
// see http://stackoverflow.com/questions/1715681/scala-2-8-breakout
|
||||||
|
// and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list
|
||||||
|
|
||||||
|
}
|
||||||
|
newGraph(vertices, newEdges)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// Lower level transformation methods
|
// Lower level transformation methods
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
Loading…
Reference in a new issue