[SPARK-3994] Use standard Aggregator code path for countByKey and countByValue
See [JIRA](https://issues.apache.org/jira/browse/SPARK-3994) for more information. Also adds a note which warns against using these methods. Author: Aaron Davidson <aaron@databricks.com> Closes #2839 from aarondav/countByKey and squashes the following commits: d6fdb2a [Aaron Davidson] Respond to comments e1f06d3 [Aaron Davidson] [SPARK-3994] Use standard Aggregator code path for countByKey and countByValue
This commit is contained in:
parent
1a623b2e16
commit
5fdaf52a9d
|
@ -315,8 +315,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
@deprecated("Use reduceByKeyLocally", "1.0.0")
|
||||
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
|
||||
|
||||
/** Count the number of elements for each key, and return the result to the master as a Map. */
|
||||
def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
|
||||
/**
|
||||
* Count the number of elements for each key, collecting the results to a local Map.
|
||||
*
|
||||
* Note that this method should only be used if the resulting map is expected to be small, as
|
||||
* the whole thing is loaded into the driver's memory.
|
||||
* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
|
||||
* returns an RDD[T, Long] instead of a map.
|
||||
*/
|
||||
def countByKey(): Map[K, Long] = self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
|
||||
|
||||
/**
|
||||
* :: Experimental ::
|
||||
|
|
|
@ -927,32 +927,15 @@ abstract class RDD[T: ClassTag](
|
|||
}
|
||||
|
||||
/**
|
||||
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
|
||||
* combine step happens locally on the master, equivalent to running a single reduce task.
|
||||
* Return the count of each unique value in this RDD as a local map of (value, count) pairs.
|
||||
*
|
||||
* Note that this method should only be used if the resulting map is expected to be small, as
|
||||
* the whole thing is loaded into the driver's memory.
|
||||
* To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which
|
||||
* returns an RDD[T, Long] instead of a map.
|
||||
*/
|
||||
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = {
|
||||
if (elementClassTag.runtimeClass.isArray) {
|
||||
throw new SparkException("countByValue() does not support arrays")
|
||||
}
|
||||
// TODO: This should perhaps be distributed by default.
|
||||
val countPartition = (iter: Iterator[T]) => {
|
||||
val map = new OpenHashMap[T,Long]
|
||||
iter.foreach {
|
||||
t => map.changeValue(t, 1L, _ + 1L)
|
||||
}
|
||||
Iterator(map)
|
||||
}: Iterator[OpenHashMap[T,Long]]
|
||||
val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
|
||||
m2.foreach { case (key, value) =>
|
||||
m1.changeValue(key, value, _ + value)
|
||||
}
|
||||
m1
|
||||
}: OpenHashMap[T,Long]
|
||||
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
|
||||
// Convert to a Scala mutable map
|
||||
val mutableResult = scala.collection.mutable.Map[T,Long]()
|
||||
myResult.foreach { case (k, v) => mutableResult.put(k, v) }
|
||||
mutableResult
|
||||
map(value => (value, null)).countByKey()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in a new issue