switched to more efficienct implementation of reduce by key

This commit is contained in:
Joseph E. Gonzalez 2013-10-16 00:18:37 -07:00
parent 9058f261fe
commit 59700c0c2a
4 changed files with 55 additions and 11 deletions

View file

@ -216,6 +216,50 @@ object IndexedRDD {
}
}
def reduceByKey[K: ClassManifest, V: ClassManifest](
rdd: RDD[(K,V)], reduceFun: (V, V) => V, index: RDDIndex[K]): IndexedRDD[K,V] = {
// Get the index Partitioner
val partitioner = index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Preaggregate and shuffle if necessary
val partitioned =
if (rdd.partitioner != Some(partitioner)) {
// Preaggregation.
val aggregator = new Aggregator[K, V, V](v => v, reduceFun, reduceFun)
val combined = rdd.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
combined.partitionBy(partitioner) //new ShuffledRDD[K, V, (K, V)](combined, partitioner)
} else {
rdd
}
// Use the index to build the new values table
val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// There is only one map
val index = indexIter.next()
assert(!indexIter.hasNext())
val values = new Array[Array[V]](index.size)
for ((k,v) <- tblIter) {
if (!index.contains(k)) {
throw new SparkException("Error: Trying to bind an external index " +
"to an RDD which contains keys that are not in the index.")
}
val ind = index(k)
if (values(ind) == null) {
values(ind) = Array(v)
} else {
values(ind)(0) = reduceFun(values(ind).head, v)
}
}
List(values.view.map(x => if (x != null) x.toSeq else null ).toSeq).iterator
})
new IndexedRDD[K,V](index, values)
}
/**
* Construct and index of the unique values in a given RDD.
*/

View file

@ -245,7 +245,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
new ShuffledRDD[K, V, (K, V)](self, partitioner)
new ShuffledRDD[K, V, (K, V)](self, partitioner)
}
/**

View file

@ -18,12 +18,12 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
/**
* The source vertex attribute
*/
var srcAttr: VD = nullValue[VD]
var srcAttr: VD = _ //nullValue[VD]
/**
* The destination vertex attribute
*/
var dstAttr: VD = nullValue[VD]
var dstAttr: VD = _ //nullValue[VD]
/**
* Set the edge properties of this triplet.

View file

@ -341,9 +341,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable: RDD[(Vid, A)] =
vTableReplicated.join(eTable).flatMap{
case (pid, (vmap, edgePartition)) =>
// Map and preaggregate
val preAgg = vTableReplicated.join(eTable).flatMap{
case (pid, (vmap, edgePartition)) =>
val aggMap = new VertexHashMap[A]
val et = new EdgeTriplet[VD, ED]
edgePartition.foreach{e =>
@ -353,17 +353,17 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
mapFunc(et).foreach{case (vid, a) =>
if(aggMap.containsKey(vid)) {
aggMap.put(vid, reduceFunc(aggMap.get(vid), a))
} else { aggMap.put(vid, a) }
} else { aggMap.put(vid, a) }
}
}
}
// Return the aggregate map
aggMap.long2ObjectEntrySet().fastIterator().map{
entry => (entry.getLongKey(), entry.getValue())
}
}
.indexed(vTable.index).reduceByKey(reduceFunc)
}.partitionBy(vTable.index.rdd.partitioner.get)
newVTable
// do the final reduction reusing the index map
IndexedRDD.reduceByKey(preAgg, reduceFunc, vTable.index)
}