Merge remote-tracking branch 'apache/master' into conf2

Conflicts:
	project/SparkBuild.scala
This commit is contained in:
Matei Zaharia 2014-01-01 13:28:54 -05:00
commit 0e5b2adb5c
12 changed files with 457 additions and 211 deletions

View file

@ -17,215 +17,219 @@
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId> <artifactId>spark-core_2.10</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version> <packaging>jar</packaging>
<relativePath>../pom.xml</relativePath> <name>Spark Project Core</name>
</parent> <url>http://spark.incubator.apache.org/</url>
<groupId>org.apache.spark</groupId> <dependencies>
<artifactId>spark-core_2.10</artifactId> <dependency>
<packaging>jar</packaging> <groupId>org.apache.hadoop</groupId>
<name>Spark Project Core</name> <artifactId>hadoop-client</artifactId>
<url>http://spark.incubator.apache.org/</url> </dependency>
<dependency>
<dependencies> <groupId>net.java.dev.jets3t</groupId>
<dependency> <artifactId>jets3t</artifactId>
<groupId>org.apache.hadoop</groupId> </dependency>
<artifactId>hadoop-client</artifactId> <dependency>
</dependency> <groupId>org.apache.avro</groupId>
<dependency> <artifactId>avro</artifactId>
<groupId>net.java.dev.jets3t</groupId> </dependency>
<artifactId>jets3t</artifactId> <dependency>
</dependency> <groupId>org.apache.avro</groupId>
<dependency> <artifactId>avro-ipc</artifactId>
<groupId>org.apache.avro</groupId> </dependency>
<artifactId>avro</artifactId> <dependency>
</dependency> <groupId>org.apache.zookeeper</groupId>
<dependency> <artifactId>zookeeper</artifactId>
<groupId>org.apache.avro</groupId> </dependency>
<artifactId>avro-ipc</artifactId> <dependency>
</dependency> <groupId>org.eclipse.jetty</groupId>
<dependency> <artifactId>jetty-server</artifactId>
<groupId>org.apache.zookeeper</groupId> </dependency>
<artifactId>zookeeper</artifactId> <dependency>
</dependency> <groupId>com.google.guava</groupId>
<dependency> <artifactId>guava</artifactId>
<groupId>org.eclipse.jetty</groupId> </dependency>
<artifactId>jetty-server</artifactId> <dependency>
</dependency> <groupId>com.google.code.findbugs</groupId>
<dependency> <artifactId>jsr305</artifactId>
<groupId>com.google.guava</groupId> </dependency>
<artifactId>guava</artifactId> <dependency>
</dependency> <groupId>org.slf4j</groupId>
<dependency> <artifactId>slf4j-api</artifactId>
<groupId>com.google.code.findbugs</groupId> </dependency>
<artifactId>jsr305</artifactId> <dependency>
</dependency> <groupId>com.ning</groupId>
<dependency> <artifactId>compress-lzf</artifactId>
<groupId>org.slf4j</groupId> </dependency>
<artifactId>slf4j-api</artifactId> <dependency>
</dependency> <groupId>org.xerial.snappy</groupId>
<dependency> <artifactId>snappy-java</artifactId>
<groupId>com.ning</groupId> </dependency>
<artifactId>compress-lzf</artifactId> <dependency>
</dependency> <groupId>org.ow2.asm</groupId>
<dependency> <artifactId>asm</artifactId>
<groupId>org.xerial.snappy</groupId> </dependency>
<artifactId>snappy-java</artifactId> <dependency>
</dependency> <groupId>com.twitter</groupId>
<dependency> <artifactId>chill_${scala.binary.version}</artifactId>
<groupId>org.ow2.asm</groupId> <version>0.3.1</version>
<artifactId>asm</artifactId> </dependency>
</dependency> <dependency>
<dependency> <groupId>com.twitter</groupId>
<groupId>com.twitter</groupId> <artifactId>chill-java</artifactId>
<artifactId>chill_${scala.binary.version}</artifactId> <version>0.3.1</version>
<version>0.3.1</version> </dependency>
</dependency> <dependency>
<dependency> <groupId>${akka.group}</groupId>
<groupId>com.twitter</groupId> <artifactId>akka-remote_${scala.binary.version}</artifactId>
<artifactId>chill-java</artifactId> </dependency>
<version>0.3.1</version> <dependency>
</dependency> <groupId>${akka.group}</groupId>
<dependency> <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<groupId>${akka.group}</groupId> </dependency>
<artifactId>akka-remote_${scala.binary.version}</artifactId> <dependency>
</dependency> <groupId>org.scala-lang</groupId>
<dependency> <artifactId>scala-library</artifactId>
<groupId>${akka.group}</groupId> </dependency>
<artifactId>akka-slf4j_${scala.binary.version}</artifactId> <dependency>
</dependency> <groupId>net.liftweb</groupId>
<dependency> <artifactId>lift-json_${scala.binary.version}</artifactId>
<groupId>org.scala-lang</groupId> </dependency>
<artifactId>scala-library</artifactId> <dependency>
</dependency> <groupId>it.unimi.dsi</groupId>
<dependency> <artifactId>fastutil</artifactId>
<groupId>net.liftweb</groupId> </dependency>
<artifactId>lift-json_${scala.binary.version}</artifactId> <dependency>
</dependency> <groupId>colt</groupId>
<dependency> <artifactId>colt</artifactId>
<groupId>it.unimi.dsi</groupId> </dependency>
<artifactId>fastutil</artifactId> <dependency>
</dependency> <groupId>org.apache.mesos</groupId>
<dependency> <artifactId>mesos</artifactId>
<groupId>colt</groupId> </dependency>
<artifactId>colt</artifactId> <dependency>
</dependency> <groupId>io.netty</groupId>
<dependency> <artifactId>netty-all</artifactId>
<groupId>org.apache.mesos</groupId> </dependency>
<artifactId>mesos</artifactId> <dependency>
</dependency> <groupId>log4j</groupId>
<dependency> <artifactId>log4j</artifactId>
<groupId>io.netty</groupId> </dependency>
<artifactId>netty-all</artifactId> <dependency>
</dependency> <groupId>com.clearspring.analytics</groupId>
<dependency> <artifactId>stream</artifactId>
<groupId>log4j</groupId> </dependency>
<artifactId>log4j</artifactId> <dependency>
</dependency> <groupId>com.codahale.metrics</groupId>
<dependency> <artifactId>metrics-core</artifactId>
<groupId>com.codahale.metrics</groupId> </dependency>
<artifactId>metrics-core</artifactId> <dependency>
</dependency> <groupId>com.codahale.metrics</groupId>
<dependency> <artifactId>metrics-jvm</artifactId>
<groupId>com.codahale.metrics</groupId> </dependency>
<artifactId>metrics-jvm</artifactId> <dependency>
</dependency> <groupId>com.codahale.metrics</groupId>
<dependency> <artifactId>metrics-json</artifactId>
<groupId>com.codahale.metrics</groupId> </dependency>
<artifactId>metrics-json</artifactId> <dependency>
</dependency> <groupId>com.codahale.metrics</groupId>
<dependency> <artifactId>metrics-ganglia</artifactId>
<groupId>com.codahale.metrics</groupId> </dependency>
<artifactId>metrics-ganglia</artifactId> <dependency>
</dependency> <groupId>com.codahale.metrics</groupId>
<dependency> <artifactId>metrics-graphite</artifactId>
<groupId>com.codahale.metrics</groupId> </dependency>
<artifactId>metrics-graphite</artifactId> <dependency>
</dependency> <groupId>org.apache.derby</groupId>
<dependency> <artifactId>derby</artifactId>
<groupId>org.apache.derby</groupId> <scope>test</scope>
<artifactId>derby</artifactId> </dependency>
<scope>test</scope> <dependency>
</dependency> <groupId>commons-io</groupId>
<dependency> <artifactId>commons-io</artifactId>
<groupId>commons-io</groupId> <scope>test</scope>
<artifactId>commons-io</artifactId> </dependency>
<scope>test</scope> <dependency>
</dependency> <groupId>org.scalatest</groupId>
<dependency> <artifactId>scalatest_${scala.binary.version}</artifactId>
<groupId>org.scalatest</groupId> <scope>test</scope>
<artifactId>scalatest_${scala.binary.version}</artifactId> </dependency>
<scope>test</scope> <dependency>
</dependency> <groupId>org.scalacheck</groupId>
<dependency> <artifactId>scalacheck_${scala.binary.version}</artifactId>
<groupId>org.scalacheck</groupId> <scope>test</scope>
<artifactId>scalacheck_${scala.binary.version}</artifactId> </dependency>
<scope>test</scope> <dependency>
</dependency> <groupId>org.easymock</groupId>
<dependency> <artifactId>easymock</artifactId>
<groupId>org.easymock</groupId> <scope>test</scope>
<artifactId>easymock</artifactId> </dependency>
<scope>test</scope> <dependency>
</dependency> <groupId>com.novocode</groupId>
<dependency> <artifactId>junit-interface</artifactId>
<groupId>com.novocode</groupId> <scope>test</scope>
<artifactId>junit-interface</artifactId> </dependency>
<scope>test</scope> <dependency>
</dependency> <groupId>org.slf4j</groupId>
<dependency> <artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId> <scope>test</scope>
<artifactId>slf4j-log4j12</artifactId> </dependency>
<scope>test</scope> </dependencies>
</dependency> <build>
</dependencies> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<build> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <plugins>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> <plugin>
<plugins> <groupId>org.apache.maven.plugins</groupId>
<plugin> <artifactId>maven-antrun-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId> <executions>
<artifactId>maven-antrun-plugin</artifactId> <execution>
<executions> <phase>test</phase>
<execution> <goals>
<phase>test</phase> <goal>run</goal>
<goals> </goals>
<goal>run</goal> <configuration>
</goals> <exportAntProperties>true</exportAntProperties>
<configuration> <tasks>
<exportAntProperties>true</exportAntProperties> <property name="spark.classpath" refid="maven.test.classpath" />
<tasks> <property environment="env" />
<property name="spark.classpath" refid="maven.test.classpath" /> <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<property environment="env" /> <condition>
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> <not>
<condition> <or>
<not> <isset property="env.SCALA_HOME" />
<or> <isset property="env.SCALA_LIBRARY_PATH" />
<isset property="env.SCALA_HOME" /> </or>
<isset property="env.SCALA_LIBRARY_PATH" /> </not>
</or> </condition>
</not> </fail>
</condition> </tasks>
</fail> </configuration>
</tasks> </execution>
</configuration> </executions>
</execution> </plugin>
</executions> <plugin>
</plugin> <groupId>org.scalatest</groupId>
<plugin> <artifactId>scalatest-maven-plugin</artifactId>
<groupId>org.scalatest</groupId> <configuration>
<artifactId>scalatest-maven-plugin</artifactId> <environmentVariables>
<configuration> <SPARK_HOME>${basedir}/..</SPARK_HOME>
<environmentVariables> <SPARK_TESTING>1</SPARK_TESTING>
<SPARK_HOME>${basedir}/..</SPARK_HOME> <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
<SPARK_TESTING>1</SPARK_TESTING> </environmentVariables>
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH> </configuration>
</environmentVariables> </plugin>
</configuration> </plugins>
</plugin> </build>
</plugins>
</build>
</project> </project>

View file

@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* Return an RDD with the values of each tuple. * Return an RDD with the values of each tuple.
*/ */
def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2)) def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, partitioner)
}
/**
* Return approximate number of distinct values for each key this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
*/
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD)
}
/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
* output RDD into numPartitions.
*
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
}
} }
object JavaPairRDD { object JavaPairRDD {

View file

@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]] val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
takeOrdered(num, comp) takeOrdered(num, comp)
} }
/**
* Return approximate number of distinct elements in the RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
} }

View file

@ -40,12 +40,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.Aggregator import org.apache.spark.Aggregator
import org.apache.spark.Partitioner import org.apache.spark.Partitioner
import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.util.SerializableHyperLogLog
/** /**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion. * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@ -207,6 +210,45 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
self.map(_._1).countByValueApprox(timeout, confidence) self.map(_._1).countByValueApprox(timeout, confidence)
} }
/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
}
/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
* output RDD into numPartitions.
*
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}
/**
* Return approximate number of distinct values for each key this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
*/
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
}
/** /**
* Merge the values for each key using an associative reduce function. This will also perform * Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a * the merging locally on each mapper before sending results to a reducer, similarly to a

View file

@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.Partitioner._ import org.apache.spark.Partitioner._
import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.JavaRDD
@ -41,7 +42,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{Utils, BoundedPriorityQueue} import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.apache.spark._ import org.apache.spark._
@ -789,6 +790,19 @@ abstract class RDD[T: ClassTag](
sc.runApproximateJob(this, countPartition, evaluator, timeout) sc.runApproximateJob(this, countPartition, evaluator, timeout)
} }
/**
* Return approximate number of distinct elements in the RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
}
/** /**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the * Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy * results from that partition to estimate the number of additional partitions needed to satisfy

View file

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
import java.io.{Externalizable, ObjectOutput, ObjectInput}
import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
/**
* A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
*/
private[spark]
class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
def this() = this(null) // For deserialization
def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
def add[T](elem: T) = {
this.value.offer(elem)
this
}
def readExternal(in: ObjectInput) {
val byteLength = in.readInt()
val bytes = new Array[Byte](byteLength)
in.readFully(bytes)
value = HyperLogLog.Builder.build(bytes)
}
def writeExternal(out: ObjectOutput) {
val bytes = value.getBytes()
out.writeInt(bytes.length)
out.write(bytes)
}
}

View file

@ -930,4 +930,36 @@ public class JavaAPISuite implements Serializable {
parts[1]); parts[1]);
} }
@Test
public void countApproxDistinct() {
List<Integer> arrayData = new ArrayList<Integer>();
int size = 100;
for (int i = 0; i < 100000; i++) {
arrayData.add(i % size);
}
JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
}
@Test
public void countApproxDistinctByKey() {
double relativeSD = 0.001;
List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
for (int i = 10; i < 100; i++)
for (int j = 0; j < i; j++)
arrayData.add(new Tuple2<Integer, Integer>(i, j));
JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
for (Tuple2<Integer, Object> resItem : res) {
double count = (double)resItem._1();
Long resCount = (Long)resItem._2();
Double error = Math.abs((resCount - count) / count);
Assert.assertTrue(error < relativeSD);
}
}
} }

View file

@ -19,6 +19,7 @@ package org.apache.spark.rdd
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet import scala.collection.mutable.HashSet
import scala.util.Random
import org.scalatest.FunSuite import org.scalatest.FunSuite
@ -109,6 +110,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
assert(deps.size === 2) // ShuffledRDD, ParallelCollection. assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
} }
test("countApproxDistinctByKey") {
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
/* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
* only a statistical bound, the tests can fail for large values of relativeSD. We will be using
* relatively tight error bounds to check correctness of functionality rather than checking
* whether the approximation conforms with the requested bound.
*/
val relativeSD = 0.001
// For each value i, there are i tuples with first element equal to i.
// Therefore, the expected count for key i would be i.
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
val rdd1 = sc.parallelize(stacked)
val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
counted1.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
val rnd = new Random()
// The expected count for key num would be num
val randStacked = (1 to 100).flatMap { i =>
val num = rnd.nextInt % 500
(1 to num).map(j => (num, j))
}
val rdd2 = sc.parallelize(randStacked)
val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
counted2.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
}
test("join") { test("join") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))

View file

@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
} }
} }
test("countApproxDistinct") {
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
val size = 100
val uniformDistro = for (i <- 1 to 100000) yield i % size
val simpleRdd = sc.makeRDD(uniformDistro)
assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
}
test("SparkContext.union") { test("SparkContext.union") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4)) assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))

View file

@ -173,6 +173,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11)) assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
} }
test("kryo with SerializableHyperLogLog") {
assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
}
test("kryo with reduce") { test("kryo with reduce") {
val control = 1 :: 2 :: Nil val control = 1 :: 2 :: Nil
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)) val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))

View file

@ -200,6 +200,11 @@
<artifactId>asm</artifactId> <artifactId>asm</artifactId>
<version>4.0</version> <version>4.0</version>
</dependency> </dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.4.0</version>
</dependency>
<!-- In theory we need not directly depend on protobuf since Spark does not directly <!-- In theory we need not directly depend on protobuf since Spark does not directly
use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump
the protobuf version up from the one Mesos gives. For now we include this variable the protobuf version up from the one Mesos gives. For now we include this variable

View file

@ -248,7 +248,8 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-graphite" % "3.0.0", "com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1", "com.twitter" %% "chill" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1",
"com.typesafe" % "config" % "1.0.2" "com.typesafe" % "config" % "1.0.2",
"com.clearspring.analytics" % "stream" % "2.5.1"
) )
) )