Added a countDistinct method to RDD that takes takes an accuracy parameter and returns the (approximate) number of distinct elements in the RDD.
This commit is contained in:
parent
843727af99
commit
1a701358c0
|
@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
|
||||||
import org.apache.hadoop.mapred.TextOutputFormat
|
import org.apache.hadoop.mapred.TextOutputFormat
|
||||||
|
|
||||||
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
|
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
|
||||||
|
import com.clearspring.analytics.stream.cardinality.HyperLogLog
|
||||||
|
|
||||||
import org.apache.spark.Partitioner._
|
import org.apache.spark.Partitioner._
|
||||||
import org.apache.spark.api.java.JavaRDD
|
import org.apache.spark.api.java.JavaRDD
|
||||||
|
@ -38,7 +39,7 @@ import org.apache.spark.partial.CountEvaluator
|
||||||
import org.apache.spark.partial.GroupedCountEvaluator
|
import org.apache.spark.partial.GroupedCountEvaluator
|
||||||
import org.apache.spark.partial.PartialResult
|
import org.apache.spark.partial.PartialResult
|
||||||
import org.apache.spark.storage.StorageLevel
|
import org.apache.spark.storage.StorageLevel
|
||||||
import org.apache.spark.util.{Utils, BoundedPriorityQueue}
|
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
|
||||||
|
|
||||||
import org.apache.spark.SparkContext._
|
import org.apache.spark.SparkContext._
|
||||||
import org.apache.spark._
|
import org.apache.spark._
|
||||||
|
@ -765,6 +766,29 @@ abstract class RDD[T: ClassManifest](
|
||||||
sc.runApproximateJob(this, countPartition, evaluator, timeout)
|
sc.runApproximateJob(this, countPartition, evaluator, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return approximate number of distinct elements in the RDD.
|
||||||
|
*
|
||||||
|
* The accuracy of approximation can be controlled through the relative standard diviation
|
||||||
|
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
|
||||||
|
* more accurate counts but increase the memory footprint and vise versa. The default value of
|
||||||
|
* relativeSD is 0.05.
|
||||||
|
*/
|
||||||
|
def countDistinct(relativeSD: Double = 0.05): Long = {
|
||||||
|
|
||||||
|
def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = {
|
||||||
|
val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
|
||||||
|
while (iter.hasNext) {
|
||||||
|
val v = iter.next()
|
||||||
|
hllCounter.value.offer(v)
|
||||||
|
}
|
||||||
|
Iterator(hllCounter)
|
||||||
|
}
|
||||||
|
def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog): SerializableHyperLogLog = c1.merge(c2)
|
||||||
|
|
||||||
|
mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality()
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
|
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
|
||||||
* results from that partition to estimate the number of additional partitions needed to satisfy
|
* results from that partition to estimate the number of additional partitions needed to satisfy
|
||||||
|
|
|
@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Approximate distinct count") {
|
||||||
|
|
||||||
|
def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble
|
||||||
|
|
||||||
|
val size = 100
|
||||||
|
val uniformDistro = for (i <- 1 to 100000) yield i % size
|
||||||
|
val simpleRdd = sc.makeRDD(uniformDistro)
|
||||||
|
assert( error(simpleRdd.countDistinct(0.2), size) < 0.2)
|
||||||
|
assert( error(simpleRdd.countDistinct(0.05), size) < 0.05)
|
||||||
|
assert( error(simpleRdd.countDistinct(0.01), size) < 0.01)
|
||||||
|
assert( error(simpleRdd.countDistinct(0.001), size) < 0.001)
|
||||||
|
}
|
||||||
|
|
||||||
test("SparkContext.union") {
|
test("SparkContext.union") {
|
||||||
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
|
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
|
||||||
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
|
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
|
||||||
|
|
Loading…
Reference in a new issue