Merge pull request #1 from ankurdave/MissingVertices

During graph creation, create eTable earlier
This commit is contained in:
Joey 2013-11-13 17:55:58 -08:00
commit 33b2deafe6

View file

@ -322,15 +322,20 @@ object GraphImpl {
mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = { mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
vertices.cache vertices.cache
edges.cache val etable = createETable(edges).cache
// Get the set of all vids // Get the set of all vids, preserving partitions
val allVids = vertices.map(_._1).union(edges.flatMap(e => Seq(e.srcId, e.dstId))) val partitioner = Partitioner.defaultPartitioner(vertices)
val implicitVids = etable.flatMap {
case (pid, partition) => Array.concat(partition.srcIds, partition.dstIds)
}.map(vid => (vid, ())).partitionBy(partitioner)
val allVids = vertices.zipPartitions(implicitVids) {
(a, b) => a.map(_._1) ++ b.map(_._1)
}
// Index the set of all vids // Index the set of all vids
val index = VertexSetRDD.makeIndex(allVids, Some(Partitioner.defaultPartitioner(vertices))) val index = VertexSetRDD.makeIndex(allVids, Some(partitioner))
// Index the vertices and fill in missing attributes with the default // Index the vertices and fill in missing attributes with the default
val vtable = VertexSetRDD(vertices, index, mergeFunc).fillMissing(defaultVertexAttr) val vtable = VertexSetRDD(vertices, index, mergeFunc).fillMissing(defaultVertexAttr)
val etable = createETable(edges)
val vid2pid = new Vid2Pid(etable, vtable.index) val vid2pid = new Vid2Pid(etable, vtable.index)
val localVidMap = createLocalVidMap(etable) val localVidMap = createLocalVidMap(etable)
new GraphImpl(vtable, vid2pid, localVidMap, etable) new GraphImpl(vtable, vid2pid, localVidMap, etable)