During graph creation, create eTable earlier
This commit is contained in:
parent
5a9b07ead2
commit
3558e8bda1
|
@ -322,15 +322,20 @@ object GraphImpl {
|
||||||
mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
|
mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
|
||||||
|
|
||||||
vertices.cache
|
vertices.cache
|
||||||
edges.cache
|
val etable = createETable(edges).cache
|
||||||
// Get the set of all vids
|
// Get the set of all vids, preserving partitions
|
||||||
val allVids = vertices.map(_._1).union(edges.flatMap(e => Seq(e.srcId, e.dstId)))
|
val partitioner = Partitioner.defaultPartitioner(vertices)
|
||||||
|
val implicitVids = etable.flatMap {
|
||||||
|
case (pid, partition) => Array.concat(partition.srcIds, partition.dstIds)
|
||||||
|
}.map(vid => (vid, ())).partitionBy(partitioner)
|
||||||
|
val allVids = vertices.zipPartitions(implicitVids) {
|
||||||
|
(a, b) => a.map(_._1) ++ b.map(_._1)
|
||||||
|
}
|
||||||
// Index the set of all vids
|
// Index the set of all vids
|
||||||
val index = VertexSetRDD.makeIndex(allVids, Some(Partitioner.defaultPartitioner(vertices)))
|
val index = VertexSetRDD.makeIndex(allVids, Some(partitioner))
|
||||||
// Index the vertices and fill in missing attributes with the default
|
// Index the vertices and fill in missing attributes with the default
|
||||||
val vtable = VertexSetRDD(vertices, index, mergeFunc).fillMissing(defaultVertexAttr)
|
val vtable = VertexSetRDD(vertices, index, mergeFunc).fillMissing(defaultVertexAttr)
|
||||||
|
|
||||||
val etable = createETable(edges)
|
|
||||||
val vid2pid = new Vid2Pid(etable, vtable.index)
|
val vid2pid = new Vid2Pid(etable, vtable.index)
|
||||||
val localVidMap = createLocalVidMap(etable)
|
val localVidMap = createLocalVidMap(etable)
|
||||||
new GraphImpl(vtable, vid2pid, localVidMap, etable)
|
new GraphImpl(vtable, vid2pid, localVidMap, etable)
|
||||||
|
|
Loading…
Reference in a new issue