[SPARK-2514] [mllib] Random RDD generator

Utilities for generating random RDDs.

RandomRDD and RandomVectorRDD are created instead of using `sc.parallelize(range:Range)` because `Range` objects in Scala can only have `size <= Int.MaxValue`.

The object `RandomRDDGenerators` can be transformed into a generator class to reduce the number of auxiliary methods for optional arguments.

Author: Doris Xin <doris.s.xin@gmail.com>

Closes #1520 from dorx/randomRDD and squashes the following commits:

01121ac [Doris Xin] reviewer comments
6bf27d8 [Doris Xin] Merge branch 'master' into randomRDD
a8ea92d [Doris Xin] Reviewer comments
063ea0b [Doris Xin] Merge branch 'master' into randomRDD
aec68eb [Doris Xin] newline
bc90234 [Doris Xin] units passed.
d56cacb [Doris Xin] impl with RandomRDD
92d6f1c [Doris Xin] solution for Cloneable
df5bcff [Doris Xin] Merge branch 'generator' into randomRDD
f46d928 [Doris Xin] WIP
49ed20d [Doris Xin] alternative poisson distribution generator
7cb0e40 [Doris Xin] fix for data inconsistency
8881444 [Doris Xin] RandomRDDGenerator: initial design
This commit is contained in:
Doris Xin 2014-07-27 16:16:39 -07:00 committed by Xiangrui Meng
parent ecf30ee7e7
commit 81fcdd22c8
5 changed files with 940 additions and 0 deletions

View file

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
import org.apache.spark.annotation.Experimental
import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
/**
* :: Experimental ::
* Trait for random number generators that generate i.i.d. values from a distribution.
*/
@Experimental
trait DistributionGenerator extends Pseudorandom with Serializable {
/**
* Returns an i.i.d. sample as a Double from an underlying distribution.
*/
def nextValue(): Double
/**
* Returns a copy of the DistributionGenerator with a new instance of the rng object used in the
* class when applicable for non-locking concurrent usage.
*/
def copy(): DistributionGenerator
}
/**
* :: Experimental ::
* Generates i.i.d. samples from U[0.0, 1.0]
*/
@Experimental
class UniformGenerator extends DistributionGenerator {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
private val random = new XORShiftRandom()
override def nextValue(): Double = {
random.nextDouble()
}
override def setSeed(seed: Long) = random.setSeed(seed)
override def copy(): UniformGenerator = new UniformGenerator()
}
/**
* :: Experimental ::
* Generates i.i.d. samples from the standard normal distribution.
*/
@Experimental
class StandardNormalGenerator extends DistributionGenerator {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
private val random = new XORShiftRandom()
override def nextValue(): Double = {
random.nextGaussian()
}
override def setSeed(seed: Long) = random.setSeed(seed)
override def copy(): StandardNormalGenerator = new StandardNormalGenerator()
}
/**
* :: Experimental ::
* Generates i.i.d. samples from the Poisson distribution with the given mean.
*
* @param mean mean for the Poisson distribution.
*/
@Experimental
class PoissonGenerator(val mean: Double) extends DistributionGenerator {
private var rng = new Poisson(mean, new DRand)
override def nextValue(): Double = rng.nextDouble()
override def setSeed(seed: Long) {
rng = new Poisson(mean, new DRand(seed.toInt))
}
override def copy(): PoissonGenerator = new PoissonGenerator(mean)
}

View file

@ -0,0 +1,473 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.random
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
/**
* :: Experimental ::
* Generator methods for creating RDDs comprised of i.i.d samples from some distribution.
*/
@Experimental
object RandomRDDGenerators {
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
* @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
*/
@Experimental
def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
val uniform = new UniformGenerator()
randomRDD(sc, uniform, size, numPartitions, seed)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD.
* @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
*/
@Experimental
def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
* @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
*/
@Experimental
def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
* @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
*/
@Experimental
def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
val normal = new StandardNormalGenerator()
randomRDD(sc, normal, size, numPartitions, seed)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD.
* @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
*/
@Experimental
def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
normalRDD(sc, size, numPartitions, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the standard normal distribution.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
* @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
*/
@Experimental
def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
@Experimental
def poissonRDD(sc: SparkContext,
mean: Double,
size: Long,
numPartitions: Int,
seed: Long): RDD[Double] = {
val poisson = new PoissonGenerator(mean)
randomRDD(sc, poisson, size, numPartitions, seed)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD.
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
@Experimental
def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = {
poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param size Size of the RDD.
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
@Experimental
def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = {
poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
*
* @param sc SparkContext used to create the RDD.
* @param generator DistributionGenerator used to populate the RDD.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
* @return RDD[Double] comprised of i.i.d. samples produced by generator.
*/
@Experimental
def randomRDD(sc: SparkContext,
generator: DistributionGenerator,
size: Long,
numPartitions: Int,
seed: Long): RDD[Double] = {
new RandomRDD(sc, size, numPartitions, generator, seed)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
*
* @param sc SparkContext used to create the RDD.
* @param generator DistributionGenerator used to populate the RDD.
* @param size Size of the RDD.
* @param numPartitions Number of partitions in the RDD.
* @return RDD[Double] comprised of i.i.d. samples produced by generator.
*/
@Experimental
def randomRDD(sc: SparkContext,
generator: DistributionGenerator,
size: Long,
numPartitions: Int): RDD[Double] = {
randomRDD(sc, generator, size, numPartitions, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param generator DistributionGenerator used to populate the RDD.
* @param size Size of the RDD.
* @return RDD[Double] comprised of i.i.d. samples produced by generator.
*/
@Experimental
def randomRDD(sc: SparkContext,
generator: DistributionGenerator,
size: Long): RDD[Double] = {
randomRDD(sc, generator, size, sc.defaultParallelism, Utils.random.nextLong)
}
// TODO Generate RDD[Vector] from multivariate distributions.
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* uniform distribution on [0.0 1.0].
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
* @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
*/
@Experimental
def uniformVectorRDD(sc: SparkContext,
numRows: Long,
numCols: Int,
numPartitions: Int,
seed: Long): RDD[Vector] = {
val uniform = new UniformGenerator()
randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* uniform distribution on [0.0 1.0].
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
*/
@Experimental
def uniformVectorRDD(sc: SparkContext,
numRows: Long,
numCols: Int,
numPartitions: Int): RDD[Vector] = {
uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* uniform distribution on [0.0 1.0].
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
*/
@Experimental
def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
* @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
*/
@Experimental
def normalVectorRDD(sc: SparkContext,
numRows: Long,
numCols: Int,
numPartitions: Int,
seed: Long): RDD[Vector] = {
val uniform = new StandardNormalGenerator()
randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
*/
@Experimental
def normalVectorRDD(sc: SparkContext,
numRows: Long,
numCols: Int,
numPartitions: Int): RDD[Vector] = {
normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* standard normal distribution.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
*/
@Experimental
def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* Poisson distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
* @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
*/
@Experimental
def poissonVectorRDD(sc: SparkContext,
mean: Double,
numRows: Long,
numCols: Int,
numPartitions: Int,
seed: Long): RDD[Vector] = {
val poisson = new PoissonGenerator(mean)
randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* Poisson distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
*/
@Experimental
def poissonVectorRDD(sc: SparkContext,
mean: Double,
numRows: Long,
numCols: Int,
numPartitions: Int): RDD[Vector] = {
poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
* Poisson distribution with the input mean.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
*/
@Experimental
def poissonVectorRDD(sc: SparkContext,
mean: Double,
numRows: Long,
numCols: Int): RDD[Vector] = {
poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
* input DistributionGenerator.
*
* @param sc SparkContext used to create the RDD.
* @param generator DistributionGenerator used to populate the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
* @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
*/
@Experimental
def randomVectorRDD(sc: SparkContext,
generator: DistributionGenerator,
numRows: Long,
numCols: Int,
numPartitions: Int,
seed: Long): RDD[Vector] = {
new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
* input DistributionGenerator.
*
* @param sc SparkContext used to create the RDD.
* @param generator DistributionGenerator used to populate the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
*/
@Experimental
def randomVectorRDD(sc: SparkContext,
generator: DistributionGenerator,
numRows: Long,
numCols: Int,
numPartitions: Int): RDD[Vector] = {
randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong)
}
/**
* :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
* input DistributionGenerator.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param generator DistributionGenerator used to populate the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
*/
@Experimental
def randomVectorRDD(sc: SparkContext,
generator: DistributionGenerator,
numRows: Long,
numCols: Int): RDD[Vector] = {
randomVectorRDD(sc, generator, numRows, numCols,
sc.defaultParallelism, Utils.random.nextLong)
}
}

View file

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.rdd
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.mllib.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.random.DistributionGenerator
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import scala.util.Random
private[mllib] class RandomRDDPartition(override val index: Int,
val size: Int,
val generator: DistributionGenerator,
val seed: Long) extends Partition {
require(size >= 0, "Non-negative partition size required.")
}
// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
private[mllib] class RandomRDD(@transient sc: SparkContext,
size: Long,
numPartitions: Int,
@transient rng: DistributionGenerator,
@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
require(size > 0, "Positive RDD size required.")
require(numPartitions > 0, "Positive number of partitions required")
require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue,
"Partition size cannot exceed Int.MaxValue")
override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
val split = splitIn.asInstanceOf[RandomRDDPartition]
RandomRDD.getPointIterator(split)
}
override def getPartitions: Array[Partition] = {
RandomRDD.getPartitions(size, numPartitions, rng, seed)
}
}
private[mllib] class RandomVectorRDD(@transient sc: SparkContext,
size: Long,
vectorSize: Int,
numPartitions: Int,
@transient rng: DistributionGenerator,
@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
require(size > 0, "Positive RDD size required.")
require(numPartitions > 0, "Positive number of partitions required")
require(vectorSize > 0, "Positive vector size required.")
require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue,
"Partition size cannot exceed Int.MaxValue")
override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
val split = splitIn.asInstanceOf[RandomRDDPartition]
RandomRDD.getVectorIterator(split, vectorSize)
}
override protected def getPartitions: Array[Partition] = {
RandomRDD.getPartitions(size, numPartitions, rng, seed)
}
}
private[mllib] object RandomRDD {
def getPartitions(size: Long,
numPartitions: Int,
rng: DistributionGenerator,
seed: Long): Array[Partition] = {
val partitions = new Array[RandomRDDPartition](numPartitions)
var i = 0
var start: Long = 0
var end: Long = 0
val random = new Random(seed)
while (i < numPartitions) {
end = ((i + 1) * size) / numPartitions
partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, random.nextLong())
start = end
i += 1
}
partitions.asInstanceOf[Array[Partition]]
}
// The RNG has to be reset every time the iterator is requested to guarantee same data
// every time the content of the RDD is examined.
def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
val generator = partition.generator.copy()
generator.setSeed(partition.seed)
Array.fill(partition.size)(generator.nextValue()).toIterator
}
// The RNG has to be reset every time the iterator is requested to guarantee same data
// every time the content of the RDD is examined.
def getVectorIterator(partition: RandomRDDPartition, vectorSize: Int): Iterator[Vector] = {
val generator = partition.generator.copy()
generator.setSeed(partition.seed)
Array.fill(partition.size)(new DenseVector(
(0 until vectorSize).map { _ => generator.nextValue() }.toArray)).toIterator
}
}

View file

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.random
import org.scalatest.FunSuite
import org.apache.spark.util.StatCounter
// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
class DistributionGeneratorSuite extends FunSuite {
def apiChecks(gen: DistributionGenerator) {
// resetting seed should generate the same sequence of random numbers
gen.setSeed(42L)
val array1 = (0 until 1000).map(_ => gen.nextValue())
gen.setSeed(42L)
val array2 = (0 until 1000).map(_ => gen.nextValue())
assert(array1.equals(array2))
// newInstance should contain a difference instance of the rng
// i.e. setting difference seeds for difference instances produces different sequences of
// random numbers.
val gen2 = gen.copy()
gen.setSeed(0L)
val array3 = (0 until 1000).map(_ => gen.nextValue())
gen2.setSeed(1L)
val array4 = (0 until 1000).map(_ => gen2.nextValue())
// Compare arrays instead of elements since individual elements can coincide by chance but the
// sequences should differ given two different seeds.
assert(!array3.equals(array4))
// test that setting the same seed in the copied instance produces the same sequence of numbers
gen.setSeed(0L)
val array5 = (0 until 1000).map(_ => gen.nextValue())
gen2.setSeed(0L)
val array6 = (0 until 1000).map(_ => gen2.nextValue())
assert(array5.equals(array6))
}
def distributionChecks(gen: DistributionGenerator,
mean: Double = 0.0,
stddev: Double = 1.0,
epsilon: Double = 0.01) {
for (seed <- 0 until 5) {
gen.setSeed(seed.toLong)
val sample = (0 until 100000).map { _ => gen.nextValue()}
val stats = new StatCounter(sample)
assert(math.abs(stats.mean - mean) < epsilon)
assert(math.abs(stats.stdev - stddev) < epsilon)
}
}
test("UniformGenerator") {
val uniform = new UniformGenerator()
apiChecks(uniform)
// Stddev of uniform distribution = (ub - lb) / math.sqrt(12)
distributionChecks(uniform, 0.5, 1 / math.sqrt(12))
}
test("StandardNormalGenerator") {
val normal = new StandardNormalGenerator()
apiChecks(normal)
distributionChecks(normal, 0.0, 1.0)
}
test("PoissonGenerator") {
// mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
for (mean <- List(1.0, 5.0, 100.0)) {
val poisson = new PoissonGenerator(mean)
apiChecks(poisson)
distributionChecks(poisson, mean, math.sqrt(mean), 0.1)
}
}
}

View file

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.random
import scala.collection.mutable.ArrayBuffer
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.StatCounter
/*
* Note: avoid including APIs that do not set the seed for the RNG in unit tests
* in order to guarantee deterministic behavior.
*
* TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
*/
class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable {
def testGeneratedRDD(rdd: RDD[Double],
expectedSize: Long,
expectedNumPartitions: Int,
expectedMean: Double,
expectedStddev: Double,
epsilon: Double = 0.01) {
val stats = rdd.stats()
assert(expectedSize === stats.count)
assert(expectedNumPartitions === rdd.partitions.size)
assert(math.abs(stats.mean - expectedMean) < epsilon)
assert(math.abs(stats.stdev - expectedStddev) < epsilon)
}
// assume test RDDs are small
def testGeneratedVectorRDD(rdd: RDD[Vector],
expectedRows: Long,
expectedColumns: Int,
expectedNumPartitions: Int,
expectedMean: Double,
expectedStddev: Double,
epsilon: Double = 0.01) {
assert(expectedNumPartitions === rdd.partitions.size)
val values = new ArrayBuffer[Double]()
rdd.collect.foreach { vector => {
assert(vector.size === expectedColumns)
values ++= vector.toArray
}}
assert(expectedRows === values.size / expectedColumns)
val stats = new StatCounter(values)
assert(math.abs(stats.mean - expectedMean) < epsilon)
assert(math.abs(stats.stdev - expectedStddev) < epsilon)
}
test("RandomRDD sizes") {
// some cases where size % numParts != 0 to test getPartitions behaves correctly
for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) {
val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
assert(rdd.count() === size)
assert(rdd.partitions.size === numPartitions)
// check that partition sizes are balanced
val partSizes = rdd.partitions.map(p => p.asInstanceOf[RandomRDDPartition].size.toDouble)
val partStats = new StatCounter(partSizes)
assert(partStats.max - partStats.min <= 1)
}
// size > Int.MaxValue
val size = Int.MaxValue.toLong * 100L
val numPartitions = 101
val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
assert(rdd.partitions.size === numPartitions)
val count = rdd.partitions.foldLeft(0L) { (count, part) =>
count + part.asInstanceOf[RandomRDDPartition].size
}
assert(count === size)
// size needs to be positive
intercept[IllegalArgumentException] { new RandomRDD(sc, 0, 10, new UniformGenerator, 0L) }
// numPartitions needs to be positive
intercept[IllegalArgumentException] { new RandomRDD(sc, 100, 0, new UniformGenerator, 0L) }
// partition size needs to be <= Int.MaxValue
intercept[IllegalArgumentException] {
new RandomRDD(sc, Int.MaxValue.toLong * 100L, 99, new UniformGenerator, 0L)
}
}
test("randomRDD for different distributions") {
val size = 100000L
val numPartitions = 10
val poissonMean = 100.0
for (seed <- 0 until 5) {
val uniform = RandomRDDGenerators.uniformRDD(sc, size, numPartitions, seed)
testGeneratedRDD(uniform, size, numPartitions, 0.5, 1 / math.sqrt(12))
val normal = RandomRDDGenerators.normalRDD(sc, size, numPartitions, seed)
testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0)
val poisson = RandomRDDGenerators.poissonRDD(sc, poissonMean, size, numPartitions, seed)
testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1)
}
// mock distribution to check that partitions have unique seeds
val random = RandomRDDGenerators.randomRDD(sc, new MockDistro(), 1000L, 1000, 0L)
assert(random.collect.size === random.collect.distinct.size)
}
test("randomVectorRDD for different distributions") {
val rows = 1000L
val cols = 100
val parts = 10
val poissonMean = 100.0
for (seed <- 0 until 5) {
val uniform = RandomRDDGenerators.uniformVectorRDD(sc, rows, cols, parts, seed)
testGeneratedVectorRDD(uniform, rows, cols, parts, 0.5, 1 / math.sqrt(12))
val normal = RandomRDDGenerators.normalVectorRDD(sc, rows, cols, parts, seed)
testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0)
val poisson = RandomRDDGenerators.poissonVectorRDD(sc, poissonMean, rows, cols, parts, seed)
testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1)
}
}
}
private[random] class MockDistro extends DistributionGenerator {
var seed = 0L
// This allows us to check that each partition has a different seed
override def nextValue(): Double = seed.toDouble
override def setSeed(seed: Long) = this.seed = seed
override def copy(): MockDistro = new MockDistro
}