diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0355618e43..09932db5ea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.Partitioner._ import org.apache.spark.api.java.JavaRDD @@ -38,7 +39,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{Utils, BoundedPriorityQueue} +import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog} import org.apache.spark.SparkContext._ import org.apache.spark._ @@ -765,6 +766,29 @@ abstract class RDD[T: ClassManifest]( sc.runApproximateJob(this, countPartition, evaluator, timeout) } + /** + * Return approximate number of distinct elements in the RDD. + * + * The accuracy of approximation can be controlled through the relative standard diviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. + */ + def countDistinct(relativeSD: Double = 0.05): Long = { + + def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = { + val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) + while (iter.hasNext) { + val v = iter.next() + hllCounter.value.offer(v) + } + Iterator(hllCounter) + } + def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog): SerializableHyperLogLog = c1.merge(c2) + + mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality() + } + /** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 6d1bc5e296..6baf9c7ece 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } + test("Approximate distinct count") { + + def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble + + val size = 100 + val uniformDistro = for (i <- 1 to 100000) yield i % size + val simpleRdd = sc.makeRDD(uniformDistro) + assert( error(simpleRdd.countDistinct(0.2), size) < 0.2) + assert( error(simpleRdd.countDistinct(0.05), size) < 0.05) + assert( error(simpleRdd.countDistinct(0.01), size) < 0.01) + assert( error(simpleRdd.countDistinct(0.001), size) < 0.001) + } + test("SparkContext.union") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))