[SPARK-4728][MLLib] Add exponential, gamma, and log normal sampling to MLlib da...

...ta generators

This patch adds:

* Exponential, gamma, and log normal generators that wrap Apache Commons math3 to the private API
* Functions for generating exponential, gamma, and log normal RDDs and vector RDDs
* Tests for the above

Author: RJ Nowling <rnowling@gmail.com>

Closes #3680 from rnowling/spark4728 and squashes the following commits:

455f50a [RJ Nowling] Add tests for exponential, gamma, and log normal samplers to JavaRandomRDDsSuite
3e1134a [RJ Nowling] Fix val/var, unncessary creation of Distribution objects when setting seeds, and import line longer than line wrap limits
58f5b97 [RJ Nowling] Fix bounds in tests so they scale with variance, not stdev
84fd98d [RJ Nowling] Add more values for testing distributions.
9f96232 [RJ Nowling] [SPARK-4728] Add exponential, gamma, and log normal sampling to MLlib data generators
This commit is contained in:
RJ Nowling 2014-12-18 21:00:49 -08:00 committed by Xiangrui Meng
parent c3d91da5ea
commit ee1fb97a97
5 changed files with 622 additions and 4 deletions

View file

@ -17,7 +17,8 @@
package org.apache.spark.mllib.random
import org.apache.commons.math3.distribution.PoissonDistribution
import org.apache.commons.math3.distribution.{ExponentialDistribution,
GammaDistribution, LogNormalDistribution, PoissonDistribution}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
@ -88,14 +89,76 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] {
@DeveloperApi
class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] {
private var rng = new PoissonDistribution(mean)
private val rng = new PoissonDistribution(mean)
override def nextValue(): Double = rng.sample()
override def setSeed(seed: Long) {
rng = new PoissonDistribution(mean)
rng.reseedRandomGenerator(seed)
}
override def copy(): PoissonGenerator = new PoissonGenerator(mean)
}
/**
* :: DeveloperApi ::
* Generates i.i.d. samples from the exponential distribution with the given mean.
*
* @param mean mean for the exponential distribution.
*/
@DeveloperApi
class ExponentialGenerator(val mean: Double) extends RandomDataGenerator[Double] {
private val rng = new ExponentialDistribution(mean)
override def nextValue(): Double = rng.sample()
override def setSeed(seed: Long) {
rng.reseedRandomGenerator(seed)
}
override def copy(): ExponentialGenerator = new ExponentialGenerator(mean)
}
/**
* :: DeveloperApi ::
* Generates i.i.d. samples from the gamma distribution with the given shape and scale.
*
* @param shape shape for the gamma distribution.
* @param scale scale for the gamma distribution
*/
@DeveloperApi
class GammaGenerator(val shape: Double, val scale: Double) extends RandomDataGenerator[Double] {
private val rng = new GammaDistribution(shape, scale)
override def nextValue(): Double = rng.sample()
override def setSeed(seed: Long) {
rng.reseedRandomGenerator(seed)
}
override def copy(): GammaGenerator = new GammaGenerator(shape, scale)
}
/**
* :: DeveloperApi ::
* Generates i.i.d. samples from the log normal distribution with the
* given mean and standard deviation.
*
* @param mean mean for the log normal distribution.
* @param std standard deviation for the log normal distribution
*/
@DeveloperApi
class LogNormalGenerator(val mean: Double, val std: Double) extends RandomDataGenerator[Double] {
private val rng = new LogNormalDistribution(mean, std)
override def nextValue(): Double = rng.sample()
override def setSeed(seed: Long) {
rng.reseedRandomGenerator(seed)
}
override def copy(): LogNormalGenerator = new LogNormalGenerator(mean, std)
}

View file

@ -176,6 +176,176 @@ object RandomRDDs {
JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size))
}
/**
* Generates an RDD comprised of i.i.d. samples from the exponential distribution with
* the input mean.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or 1 / lambda, for the exponential distribution.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
* @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
def exponentialRDD(
sc: SparkContext,
mean: Double,
size: Long,
numPartitions: Int = 0,
seed: Long = Utils.random.nextLong()): RDD[Double] = {
val exponential = new ExponentialGenerator(mean)
randomRDD(sc, exponential, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
* Java-friendly version of [[RandomRDDs#exponentialRDD]].
*/
def exponentialJavaRDD(
jsc: JavaSparkContext,
mean: Double,
size: Long,
numPartitions: Int,
seed: Long): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size, numPartitions, seed))
}
/**
* [[RandomRDDs#exponentialJavaRDD]] with the default seed.
*/
def exponentialJavaRDD(
jsc: JavaSparkContext,
mean: Double,
size: Long,
numPartitions: Int): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size, numPartitions))
}
/**
* [[RandomRDDs#exponentialJavaRDD]] with the default number of partitions and the default seed.
*/
def exponentialJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size))
}
/**
* Generates an RDD comprised of i.i.d. samples from the gamma distribution with the input
* shape and scale.
*
* @param sc SparkContext used to create the RDD.
* @param shape shape parameter (> 0) for the gamma distribution
* @param scale scale parameter (> 0) for the gamma distribution
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
* @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
def gammaRDD(
sc: SparkContext,
shape: Double,
scale: Double,
size: Long,
numPartitions: Int = 0,
seed: Long = Utils.random.nextLong()): RDD[Double] = {
val gamma = new GammaGenerator(shape, scale)
randomRDD(sc, gamma, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
* Java-friendly version of [[RandomRDDs#gammaRDD]].
*/
def gammaJavaRDD(
jsc: JavaSparkContext,
shape: Double,
scale: Double,
size: Long,
numPartitions: Int,
seed: Long): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size, numPartitions, seed))
}
/**
* [[RandomRDDs#gammaJavaRDD]] with the default seed.
*/
def gammaJavaRDD(
jsc: JavaSparkContext,
shape: Double,
scale: Double,
size: Long,
numPartitions: Int): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size, numPartitions))
}
/**
* [[RandomRDDs#gammaJavaRDD]] with the default number of partitions and the default seed.
*/
def gammaJavaRDD(
jsc: JavaSparkContext,
shape: Double,
scale: Double,
size: Long): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size))
}
/**
* Generates an RDD comprised of i.i.d. samples from the log normal distribution with the input
* mean and standard deviation
*
* @param sc SparkContext used to create the RDD.
* @param mean mean for the log normal distribution
* @param std standard deviation for the log normal distribution
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
* @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
def logNormalRDD(
sc: SparkContext,
mean: Double,
std: Double,
size: Long,
numPartitions: Int = 0,
seed: Long = Utils.random.nextLong()): RDD[Double] = {
val logNormal = new LogNormalGenerator(mean, std)
randomRDD(sc, logNormal, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
* Java-friendly version of [[RandomRDDs#logNormalRDD]].
*/
def logNormalJavaRDD(
jsc: JavaSparkContext,
mean: Double,
std: Double,
size: Long,
numPartitions: Int,
seed: Long): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size, numPartitions, seed))
}
/**
* [[RandomRDDs#logNormalJavaRDD]] with the default seed.
*/
def logNormalJavaRDD(
jsc: JavaSparkContext,
mean: Double,
std: Double,
size: Long,
numPartitions: Int): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size, numPartitions))
}
/**
* [[RandomRDDs#logNormalJavaRDD]] with the default number of partitions and the default seed.
*/
def logNormalJavaRDD(
jsc: JavaSparkContext,
mean: Double,
std: Double,
size: Long): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size))
}
/**
* :: DeveloperApi ::
* Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator.
@ -307,6 +477,72 @@ object RandomRDDs {
normalVectorRDD(jsc.sc, numRows, numCols).toJavaRDD()
}
/**
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from a
* log normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean of the log normal distribution.
* @param std Standard deviation of the log normal distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
* @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples.
*/
def logNormalVectorRDD(
sc: SparkContext,
mean: Double,
std: Double,
numRows: Long,
numCols: Int,
numPartitions: Int = 0,
seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val logNormal = new LogNormalGenerator(mean, std)
randomVectorRDD(sc, logNormal, numRows, numCols,
numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
* Java-friendly version of [[RandomRDDs#logNormalVectorRDD]].
*/
def logNormalJavaVectorRDD(
jsc: JavaSparkContext,
mean: Double,
std: Double,
numRows: Long,
numCols: Int,
numPartitions: Int,
seed: Long): JavaRDD[Vector] = {
logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
* [[RandomRDDs#logNormalJavaVectorRDD]] with the default seed.
*/
def logNormalJavaVectorRDD(
jsc: JavaSparkContext,
mean: Double,
std: Double,
numRows: Long,
numCols: Int,
numPartitions: Int): JavaRDD[Vector] = {
logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, numPartitions).toJavaRDD()
}
/**
* [[RandomRDDs#logNormalJavaVectorRDD]] with the default number of partitions and
* the default seed.
*/
def logNormalJavaVectorRDD(
jsc: JavaSparkContext,
mean: Double,
std: Double,
numRows: Long,
numCols: Int): JavaRDD[Vector] = {
logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols).toJavaRDD()
}
/**
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* Poisson distribution with the input mean.
@ -366,6 +602,133 @@ object RandomRDDs {
poissonVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD()
}
/**
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* exponential distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or 1 / lambda, for the Exponential distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
* @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples ~ Exp(mean).
*/
def exponentialVectorRDD(
sc: SparkContext,
mean: Double,
numRows: Long,
numCols: Int,
numPartitions: Int = 0,
seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val exponential = new ExponentialGenerator(mean)
randomVectorRDD(sc, exponential, numRows, numCols,
numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
* Java-friendly version of [[RandomRDDs#exponentialVectorRDD]].
*/
def exponentialJavaVectorRDD(
jsc: JavaSparkContext,
mean: Double,
numRows: Long,
numCols: Int,
numPartitions: Int,
seed: Long): JavaRDD[Vector] = {
exponentialVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
* [[RandomRDDs#exponentialJavaVectorRDD]] with the default seed.
*/
def exponentialJavaVectorRDD(
jsc: JavaSparkContext,
mean: Double,
numRows: Long,
numCols: Int,
numPartitions: Int): JavaRDD[Vector] = {
exponentialVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD()
}
/**
* [[RandomRDDs#exponentialJavaVectorRDD]] with the default number of partitions
* and the default seed.
*/
def exponentialJavaVectorRDD(
jsc: JavaSparkContext,
mean: Double,
numRows: Long,
numCols: Int): JavaRDD[Vector] = {
exponentialVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD()
}
/**
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* gamma distribution with the input shape and scale.
*
* @param sc SparkContext used to create the RDD.
* @param shape shape parameter (> 0) for the gamma distribution.
* @param scale scale parameter (> 0) for the gamma distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
* @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples ~ Exp(mean).
*/
def gammaVectorRDD(
sc: SparkContext,
shape: Double,
scale: Double,
numRows: Long,
numCols: Int,
numPartitions: Int = 0,
seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val gamma = new GammaGenerator(shape, scale)
randomVectorRDD(sc, gamma, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
* Java-friendly version of [[RandomRDDs#gammaVectorRDD]].
*/
def gammaJavaVectorRDD(
jsc: JavaSparkContext,
shape: Double,
scale: Double,
numRows: Long,
numCols: Int,
numPartitions: Int,
seed: Long): JavaRDD[Vector] = {
gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
* [[RandomRDDs#gammaJavaVectorRDD]] with the default seed.
*/
def gammaJavaVectorRDD(
jsc: JavaSparkContext,
shape: Double,
scale: Double,
numRows: Long,
numCols: Int,
numPartitions: Int): JavaRDD[Vector] = {
gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, numPartitions).toJavaRDD()
}
/**
* [[RandomRDDs#gammaJavaVectorRDD]] with the default number of partitions and the default seed.
*/
def gammaJavaVectorRDD(
jsc: JavaSparkContext,
shape: Double,
scale: Double,
numRows: Long,
numCols: Int): JavaRDD[Vector] = {
gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols).toJavaRDD()
}
/**
* :: DeveloperApi ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the

View file

@ -69,6 +69,21 @@ public class JavaRandomRDDsSuite {
}
}
@Test
public void testLNormalRDD() {
double mean = 4.0;
double std = 2.0;
long m = 1000L;
int p = 2;
long seed = 1L;
JavaDoubleRDD rdd1 = logNormalJavaRDD(sc, mean, std, m);
JavaDoubleRDD rdd2 = logNormalJavaRDD(sc, mean, std, m, p);
JavaDoubleRDD rdd3 = logNormalJavaRDD(sc, mean, std, m, p, seed);
for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
Assert.assertEquals(m, rdd.count());
}
}
@Test
public void testPoissonRDD() {
double mean = 2.0;
@ -83,6 +98,36 @@ public class JavaRandomRDDsSuite {
}
}
@Test
public void testExponentialRDD() {
double mean = 2.0;
long m = 1000L;
int p = 2;
long seed = 1L;
JavaDoubleRDD rdd1 = exponentialJavaRDD(sc, mean, m);
JavaDoubleRDD rdd2 = exponentialJavaRDD(sc, mean, m, p);
JavaDoubleRDD rdd3 = exponentialJavaRDD(sc, mean, m, p, seed);
for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
Assert.assertEquals(m, rdd.count());
}
}
@Test
public void testGammaRDD() {
double shape = 1.0;
double scale = 2.0;
long m = 1000L;
int p = 2;
long seed = 1L;
JavaDoubleRDD rdd1 = gammaJavaRDD(sc, shape, scale, m);
JavaDoubleRDD rdd2 = gammaJavaRDD(sc, shape, scale, m, p);
JavaDoubleRDD rdd3 = gammaJavaRDD(sc, shape, scale, m, p, seed);
for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
Assert.assertEquals(m, rdd.count());
}
}
@Test
@SuppressWarnings("unchecked")
public void testUniformVectorRDD() {
@ -115,6 +160,24 @@ public class JavaRandomRDDsSuite {
}
}
@Test
@SuppressWarnings("unchecked")
public void testLogNormalVectorRDD() {
double mean = 4.0;
double std = 2.0;
long m = 100L;
int n = 10;
int p = 2;
long seed = 1L;
JavaRDD<Vector> rdd1 = logNormalJavaVectorRDD(sc, mean, std, m, n);
JavaRDD<Vector> rdd2 = logNormalJavaVectorRDD(sc, mean, std, m, n, p);
JavaRDD<Vector> rdd3 = logNormalJavaVectorRDD(sc, mean, std, m, n, p, seed);
for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
Assert.assertEquals(m, rdd.count());
Assert.assertEquals(n, rdd.first().size());
}
}
@Test
@SuppressWarnings("unchecked")
public void testPoissonVectorRDD() {
@ -131,4 +194,40 @@ public class JavaRandomRDDsSuite {
Assert.assertEquals(n, rdd.first().size());
}
}
@Test
@SuppressWarnings("unchecked")
public void testExponentialVectorRDD() {
double mean = 2.0;
long m = 100L;
int n = 10;
int p = 2;
long seed = 1L;
JavaRDD<Vector> rdd1 = exponentialJavaVectorRDD(sc, mean, m, n);
JavaRDD<Vector> rdd2 = exponentialJavaVectorRDD(sc, mean, m, n, p);
JavaRDD<Vector> rdd3 = exponentialJavaVectorRDD(sc, mean, m, n, p, seed);
for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
Assert.assertEquals(m, rdd.count());
Assert.assertEquals(n, rdd.first().size());
}
}
@Test
@SuppressWarnings("unchecked")
public void testGammaVectorRDD() {
double shape = 1.0;
double scale = 2.0;
long m = 100L;
int n = 10;
int p = 2;
long seed = 1L;
JavaRDD<Vector> rdd1 = gammaJavaVectorRDD(sc, shape, scale, m, n);
JavaRDD<Vector> rdd2 = gammaJavaVectorRDD(sc, shape, scale, m, n, p);
JavaRDD<Vector> rdd3 = gammaJavaVectorRDD(sc, shape, scale, m, n, p, seed);
for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
Assert.assertEquals(m, rdd.count());
Assert.assertEquals(n, rdd.first().size());
}
}
}

View file

@ -17,6 +17,8 @@
package org.apache.spark.mllib.random
import scala.math
import org.scalatest.FunSuite
import org.apache.spark.util.StatCounter
@ -25,7 +27,6 @@ import org.apache.spark.util.StatCounter
class RandomDataGeneratorSuite extends FunSuite {
def apiChecks(gen: RandomDataGenerator[Double]) {
// resetting seed should generate the same sequence of random numbers
gen.setSeed(42L)
val array1 = (0 until 1000).map(_ => gen.nextValue())
@ -79,6 +80,26 @@ class RandomDataGeneratorSuite extends FunSuite {
distributionChecks(normal, 0.0, 1.0)
}
test("LogNormalGenerator") {
List((0.0, 1.0), (0.0, 2.0), (2.0, 1.0), (2.0, 2.0)).map {
case (mean: Double, vari: Double) =>
val normal = new LogNormalGenerator(mean, math.sqrt(vari))
apiChecks(normal)
// mean of log normal = e^(mean + var / 2)
val expectedMean = math.exp(mean + 0.5 * vari)
// variance of log normal = (e^var - 1) * e^(2 * mean + var)
val expectedStd = math.sqrt((math.exp(vari) - 1.0) * math.exp(2.0 * mean + vari))
// since sampling error increases with variance, let's set
// the absolute tolerance as a percentage
val epsilon = 0.05 * expectedStd * expectedStd
distributionChecks(normal, expectedMean, expectedStd, epsilon)
}
}
test("PoissonGenerator") {
// mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
for (mean <- List(1.0, 5.0, 100.0)) {
@ -87,4 +108,33 @@ class RandomDataGeneratorSuite extends FunSuite {
distributionChecks(poisson, mean, math.sqrt(mean), 0.1)
}
}
test("ExponentialGenerator") {
// mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
for (mean <- List(2.0, 5.0, 10.0, 50.0, 100.0)) {
val exponential = new ExponentialGenerator(mean)
apiChecks(exponential)
// var of exp = lambda^-2 = (1.0 / mean)^-2 = mean^2
// since sampling error increases with variance, let's set
// the absolute tolerance as a percentage
val epsilon = 0.05 * mean * mean
distributionChecks(exponential, mean, mean, epsilon)
}
}
test("GammaGenerator") {
// mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
List((1.0, 2.0), (2.0, 2.0), (3.0, 2.0), (5.0, 1.0), (9.0, 0.5)).map {
case (shape: Double, scale: Double) =>
val gamma = new GammaGenerator(shape, scale)
apiChecks(gamma)
// mean of gamma = shape * scale
val expectedMean = shape * scale
// var of gamma = shape * scale^2
val expectedStd = math.sqrt(shape * scale * scale)
distributionChecks(gamma, expectedMean, expectedStd, 0.1)
}
}
}

View file

@ -110,7 +110,19 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
test("randomRDD for different distributions") {
val size = 100000L
val numPartitions = 10
// mean of log normal = e^(mean + var / 2)
val logNormalMean = math.exp(0.5)
// variance of log normal = (e^var - 1) * e^(2 * mean + var)
val logNormalStd = math.sqrt((math.E - 1.0) * math.E)
val gammaScale = 1.0
val gammaShape = 2.0
// mean of gamma = shape * scale
val gammaMean = gammaShape * gammaScale
// var of gamma = shape * scale^2
val gammaStd = math.sqrt(gammaShape * gammaScale * gammaScale)
val poissonMean = 100.0
val exponentialMean = 1.0
for (seed <- 0 until 5) {
val uniform = RandomRDDs.uniformRDD(sc, size, numPartitions, seed)
@ -119,8 +131,18 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
val normal = RandomRDDs.normalRDD(sc, size, numPartitions, seed)
testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0)
val logNormal = RandomRDDs.logNormalRDD(sc, 0.0, 1.0, size, numPartitions, seed)
testGeneratedRDD(logNormal, size, numPartitions, logNormalMean, logNormalStd, 0.1)
val poisson = RandomRDDs.poissonRDD(sc, poissonMean, size, numPartitions, seed)
testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1)
val exponential = RandomRDDs.exponentialRDD(sc, exponentialMean, size, numPartitions, seed)
testGeneratedRDD(exponential, size, numPartitions, exponentialMean, exponentialMean, 0.1)
val gamma = RandomRDDs.gammaRDD(sc, gammaShape, gammaScale, size, numPartitions, seed)
testGeneratedRDD(gamma, size, numPartitions, gammaMean, gammaStd, 0.1)
}
// mock distribution to check that partitions have unique seeds
@ -132,7 +154,19 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
val rows = 1000L
val cols = 100
val parts = 10
// mean of log normal = e^(mean + var / 2)
val logNormalMean = math.exp(0.5)
// variance of log normal = (e^var - 1) * e^(2 * mean + var)
val logNormalStd = math.sqrt((math.E - 1.0) * math.E)
val gammaScale = 1.0
val gammaShape = 2.0
// mean of gamma = shape * scale
val gammaMean = gammaShape * gammaScale
// var of gamma = shape * scale^2
val gammaStd = math.sqrt(gammaShape * gammaScale * gammaScale)
val poissonMean = 100.0
val exponentialMean = 1.0
for (seed <- 0 until 5) {
val uniform = RandomRDDs.uniformVectorRDD(sc, rows, cols, parts, seed)
@ -141,8 +175,17 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
val normal = RandomRDDs.normalVectorRDD(sc, rows, cols, parts, seed)
testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0)
val logNormal = RandomRDDs.logNormalVectorRDD(sc, 0.0, 1.0, rows, cols, parts, seed)
testGeneratedVectorRDD(logNormal, rows, cols, parts, logNormalMean, logNormalStd, 0.1)
val poisson = RandomRDDs.poissonVectorRDD(sc, poissonMean, rows, cols, parts, seed)
testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1)
val exponential = RandomRDDs.exponentialVectorRDD(sc, exponentialMean, rows, cols, parts, seed)
testGeneratedVectorRDD(exponential, rows, cols, parts, exponentialMean, exponentialMean, 0.1)
val gamma = RandomRDDs.gammaVectorRDD(sc, gammaShape, gammaScale, rows, cols, parts, seed)
testGeneratedVectorRDD(gamma, rows, cols, parts, gammaMean, gammaStd, 0.1)
}
}
}