Add helper methods to Aggregator.

This commit is contained in:
Josh Rosen 2012-10-08 17:41:02 -07:00
parent 84979499db
commit 110832e88f

View file

@ -1,5 +1,9 @@
package spark
import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions._
/** A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
@ -13,5 +17,32 @@ case class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C,
val mapSideCombine: Boolean = true)
val mapSideCombine: Boolean = true) {
def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
for ((k, v) <- iter) {
val oldC = combiners.get(k)
if (oldC == null) {
combiners.put(k, createCombiner(v))
} else {
combiners.put(k, mergeValue(oldC, v))
}
}
combiners.iterator
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
for ((k, c) <- iter) {
val oldC = combiners.get(k)
if (oldC == null) {
combiners.put(k, c)
} else {
combiners.put(k, mergeCombiners(oldC, c))
}
}
combiners.iterator
}
}