Cache msgsByPartition

This commit is contained in:
Ankur Dave 2013-10-16 23:56:15 -07:00
parent bc234bf0e1
commit 2282d27cf1

View file

@ -486,7 +486,8 @@ object GraphImpl {
.flatMap { case (vid, (vdata, pids)) => .flatMap { case (vid, (vdata, pids)) =>
pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
} }
.partitionBy(eTable.partitioner.get) //@todo assert edge table has partitioner .partitionBy(eTable.partitioner.get).cache()
// @todo assert edge table has partitioner
val vTableReplicationMap: IndexedRDD[Pid, VertexIdToIndexMap] = val vTableReplicationMap: IndexedRDD[Pid, VertexIdToIndexMap] =
msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {