refactor foldByKey to use combineByKey

This commit is contained in:
Mark Hamstra 2013-03-16 13:31:01 -07:00
parent 1fb192ef40
commit ca9f81e8fc
2 changed files with 19 additions and 7 deletions

View file

@ -89,22 +89,28 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
} }
/** /**
* Merge the values for each key using an associative function and a neutral "zero value". * Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/ */
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
groupByKey(partitioner).mapValues(seq => seq.fold[V](zeroValue)(func)) combineByKey[V]({v: V => func(zeroValue, v)}, func, func, partitioner)
} }
/** /**
* Merge the values for each key using an associative function and a neutral "zero value". * Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/ */
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = { def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
} }
/** /**
* Merge the values for each key using an associative function and a neutral "zero value". * Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/ */
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = { def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
foldByKey(zeroValue, defaultPartitioner(self))(func) foldByKey(zeroValue, defaultPartitioner(self))(func)

View file

@ -161,19 +161,25 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap) rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
/** /**
* Merge the values for each key using an associative function and a neutral "zero value". * Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/ */
def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue, partitioner)(func)) fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
/** /**
* Merge the values for each key using an associative function and a neutral "zero value". * Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/ */
def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func)) fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func))
/** /**
* Merge the values for each key using an associative function and a neutral "zero value". * Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/ */
def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue)(func)) fromRDD(rdd.foldByKey(zeroValue)(func))