From 8f4b8e9b954be5022e4bcc69ec45180a513b132c Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 13 Dec 2013 02:03:08 -0800 Subject: [PATCH] Reuse previous localVidMap if available --- .../spark/graph/impl/VTableReplicated.scala | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala index 72032aa5b2..4cccac02cf 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala @@ -23,21 +23,26 @@ class VTableReplicated[VD: ClassManifest]( vertexPlacement: VertexPlacement, prevVTableReplicated: Option[VTableReplicated[VD]] = None) { - // Within each edge partition, create a local map from vid to an index into - // the attribute array. Each map contains a superset of the vertices that it - // will receive, because it stores vids from both the source and destination - // of edges. It must always include both source and destination vids because - // some operations, such as GraphImpl.mapReduceTriplets, rely on this. - val localVidMap: RDD[(Int, VertexIdToIndexMap)] = edges.partitionsRDD.mapPartitions(_.map { - case (pid, epart) => - val vidToIndex = new VertexIdToIndexMap - epart.foreach { e => - vidToIndex.add(e.srcId) - vidToIndex.add(e.dstId) - } - (pid, vidToIndex) - }, preservesPartitioning = true).cache() - + /** + * Within each edge partition, create a local map from vid to an index into the attribute + * array. Each map contains a superset of the vertices that it will receive, because it stores + * vids from both the source and destination of edges. It must always include both source and + * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. + */ + private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match { + case Some(prev) => + prev.localVidMap + case None => + edges.partitionsRDD.mapPartitions(_.map { + case (pid, epart) => + val vidToIndex = new VertexIdToIndexMap + epart.foreach { e => + vidToIndex.add(e.srcId) + vidToIndex.add(e.dstId) + } + (pid, vidToIndex) + }, preservesPartitioning = true).cache() + } val bothAttrs: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(true, true) val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(true, false)