Reuse previous localVidMap if available

This commit is contained in:
Ankur Dave 2013-12-13 02:03:08 -08:00
parent 5e20cbaf66
commit 8f4b8e9b95

View file

@ -23,21 +23,26 @@ class VTableReplicated[VD: ClassManifest](
vertexPlacement: VertexPlacement,
prevVTableReplicated: Option[VTableReplicated[VD]] = None) {
// Within each edge partition, create a local map from vid to an index into
// the attribute array. Each map contains a superset of the vertices that it
// will receive, because it stores vids from both the source and destination
// of edges. It must always include both source and destination vids because
// some operations, such as GraphImpl.mapReduceTriplets, rely on this.
val localVidMap: RDD[(Int, VertexIdToIndexMap)] = edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
epart.foreach { e =>
vidToIndex.add(e.srcId)
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
}, preservesPartitioning = true).cache()
/**
* Within each edge partition, create a local map from vid to an index into the attribute
* array. Each map contains a superset of the vertices that it will receive, because it stores
* vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/
private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match {
case Some(prev) =>
prev.localVidMap
case None =>
edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
epart.foreach { e =>
vidToIndex.add(e.srcId)
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
}, preservesPartitioning = true).cache()
}
val bothAttrs: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(true, true)
val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(true, false)