Documentation: Encourage use of reduceByKey instead of groupByKey.

Author: Patrick Wendell <pwendell@gmail.com>

Closes #784 from pwendell/group-by-key and squashes the following commits:

9b4505f [Patrick Wendell] Small fix
6347924 [Patrick Wendell] Documentation: Encourage use of reduceByKey instead of groupByKey.
This commit is contained in:
Patrick Wendell 2014-05-14 22:24:04 -07:00
parent f10de042b8
commit 21570b4633
4 changed files with 32 additions and 0 deletions

View file

@ -263,6 +263,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
/** /**
* Group the values for each key in the RDD into a single sequence. Allows controlling the * Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner. * partitioning of the resulting key-value pair RDD by passing a Partitioner.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[JavaPairRDD.reduceByKey]] or [[JavaPairRDD.combineByKey]]
* will provide much better performance.
*/ */
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] = def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner))) fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
@ -270,6 +274,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
/** /**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the * Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions. * resulting RDD with into `numPartitions` partitions.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[JavaPairRDD.reduceByKey]] or [[JavaPairRDD.combineByKey]]
* will provide much better performance.
*/ */
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] = def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
@ -380,6 +388,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
/** /**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the * Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level. * resulting RDD with the existing partitioner/parallelism level.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[JavaPairRDD.reduceByKey]] or [[JavaPairRDD.combineByKey]]
* will provide much better performance.
*/ */
def groupByKey(): JavaPairRDD[K, JIterable[V]] = def groupByKey(): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey())) fromRDD(groupByResultToJava(rdd.groupByKey()))

View file

@ -264,6 +264,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/** /**
* Group the values for each key in the RDD into a single sequence. Allows controlling the * Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner. * partitioning of the resulting key-value pair RDD by passing a Partitioner.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
* will provide much better performance.
*/ */
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not // groupByKey shouldn't use map side combine because map side combine does not
@ -280,6 +284,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/** /**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the * Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions. * resulting RDD with into `numPartitions` partitions.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
* will provide much better performance.
*/ */
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
groupByKey(new HashPartitioner(numPartitions)) groupByKey(new HashPartitioner(numPartitions))
@ -365,6 +373,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/** /**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the * Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level. * resulting RDD with the existing partitioner/parallelism level.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
* will provide much better performance,
*/ */
def groupByKey(): RDD[(K, Iterable[V])] = { def groupByKey(): RDD[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(self)) groupByKey(defaultPartitioner(self))

View file

@ -196,6 +196,10 @@ The following tables list the transformations and actions currently supported (s
<tr> <tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td> <td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br /> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using `reduceByKey` or `combineByKey` will yield much better
performance.
<br />
<b>Note:</b> By default, if the RDD already has a partitioner, the task number is decided by the partition number of the partitioner, or else relies on the value of <code>spark.default.parallelism</code> if the property is set , otherwise depends on the partition number of the RDD. You can pass an optional <code>numTasks</code> argument to set a different number of tasks. <b>Note:</b> By default, if the RDD already has a partitioner, the task number is decided by the partition number of the partitioner, or else relies on the value of <code>spark.default.parallelism</code> if the property is set , otherwise depends on the partition number of the RDD. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
</td> </td>
</tr> </tr>

View file

@ -1152,6 +1152,10 @@ class RDD(object):
Group the values for each key in the RDD into a single sequence. Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with into numPartitions partitions. Hash-partitions the resulting RDD with into numPartitions partitions.
Note: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey will provide much better
performance.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
[('a', [1, 1]), ('b', [1])] [('a', [1, 1]), ('b', [1])]