[SPARK-8598] [MLLIB] Implementation of 1-sample, two-sided, Kolmogorov Smirnov Test for RDDs

This contribution is my original work and I license it to the project under it's open source license.

Author: jose.cambronero <jose.cambronero@cloudera.com>

Closes #6994 from josepablocam/master and squashes the following commits:

bbb30b1 [jose.cambronero] renamed KSTestResult to KolmogorovSmirnovTestResult, to stay consistent with method name
0d0c201 [jose.cambronero] kstTest -> kolmogorovSmirnovTest in statistics.md
1f56371 [jose.cambronero] changed ksTest in public API to kolmogorovSmirnovTest for clarity
a48ae7b [jose.cambronero] refactor code to account for serializable RealDistribution. Reuse testOneSample( _, cdf)
1bb44bd [jose.cambronero]  style and doc changes. Factored out ks test into 2 separate tests
2ec2aa6 [jose.cambronero] initialize to stdnormal when no params passed (and log). Change unit tests to approximate equivalence rather than strict
a4bc0c7 [jose.cambronero] changed ksTest(data, distName) to ksTest(data, distName, params*) after api discussions. Changed tests and docs accordingly
7e66f57 [jose.cambronero] copied implementation note to public api docs, and added @see for links to wiki info
e760ebd [jose.cambronero] line length changes to fit style check
3288e42 [jose.cambronero] addressed style changes, correctness change to simpler approach, and fixed edge case for foldLeft in searchOneSampleCandidates when a partition is empty
9026895 [jose.cambronero] addressed style changes, correctness change to simpler approach, and fixed edge case for foldLeft in searchOneSampleCandidates when a partition is empty
1226b30 [jose.cambronero] reindent multi-line lambdas, prior intepretation of style guide was wrong on my part
9c0f1af [jose.cambronero] additional style changes incorporated and added documentation to mllib statistics docs
3f81ad2 [jose.cambronero] renamed ks1 sample test for clarity
992293b [jose.cambronero] Style changes as per comments and added implementation note explaining the distributed approach.
6a4784f [jose.cambronero] specified what distributions are available for the convenience method ksTest(data, name) (solely standard normal)
4b8ba61 [jose.cambronero] fixed off by 1/N in cases when post-constant adjustment ecdf is above cdf, but prior to adj it was below
0b5e8ec [jose.cambronero] changed KS one sample test to perform just 1 distributed pass (in addition to the sorting pass), operates on each partition separately. Implementation of Sandy Ryza's algorithm
16b5c4c [jose.cambronero] renamed dat to data and eliminated recalc of RDD size by sharing as argument between empirical and evalOneSampleP
c18dc66 [jose.cambronero] removed ksTestOpt from API and changed comments in HypothesisTestSuite accordingly
f6951b6 [jose.cambronero] changed style and some comments based on feedback from pull request
b9cff3a [jose.cambronero] made small changes to pass style check
ce8e9a1 [jose.cambronero] added kstest testing in HypothesisTestSuite
4da189b [jose.cambronero] added user facing ks test functions
c659ea1 [jose.cambronero] created KS test class
13dfe4d [jose.cambronero] created test result class for ks test
This commit is contained in:
jose.cambronero 2015-07-10 20:55:45 -07:00 committed by Xiangrui Meng
parent 6e1c7e2798
commit 9c5075775d
5 changed files with 387 additions and 2 deletions

View file

@ -283,7 +283,7 @@ approxSample = data.sampleByKey(False, fractions);
Hypothesis testing is a powerful tool in statistics to determine whether a result is statistically Hypothesis testing is a powerful tool in statistics to determine whether a result is statistically
significant, whether this result occurred by chance or not. MLlib currently supports Pearson's significant, whether this result occurred by chance or not. MLlib currently supports Pearson's
chi-squared ( $\chi^2$) tests for goodness of fit and independence. The input data types determine chi-squared ( $\chi^2$) tests for goodness of fit and independence. The input data types determine
whether the goodness of fit or the independence test is conducted. The goodness of fit test requires whether the goodness of fit or the independence test is conducted. The goodness of fit test requires
an input type of `Vector`, whereas the independence test requires a `Matrix` as input. an input type of `Vector`, whereas the independence test requires a `Matrix` as input.
@ -422,6 +422,41 @@ for i, result in enumerate(featureTestResults):
</div> </div>
Additionally, MLlib provides a 1-sample, 2-sided implementation of the Kolmogorov-Smirnov (KS) test
for equality of probability distributions. By providing the name of a theoretical distribution
(currently solely supported for the normal distribution) and its parameters, or a function to
calculate the cumulative distribution according to a given theoretical distribution, the user can
test the null hypothesis that their sample is drawn from that distribution. In the case that the
user tests against the normal distribution (`distName="norm"`), but does not provide distribution
parameters, the test initializes to the standard normal distribution and logs an appropriate
message.
<div class="codetabs">
<div data-lang="scala" markdown="1">
[`Statistics`](api/scala/index.html#org.apache.spark.mllib.stat.Statistics$) provides methods to
run a 1-sample, 2-sided Kolmogorov-Smirnov test. The following example demonstrates how to run
and interpret the hypothesis tests.
{% highlight scala %}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.stat.Statistics._
val data: RDD[Double] = ... // an RDD of sample data
// run a KS test for the sample versus a standard normal distribution
val testResult = Statistics.kolmogorovSmirnovTest(data, "norm", 0, 1)
println(testResult) // summary of the test including the p-value, test statistic,
// and null hypothesis
// if our p-value indicates significance, we can reject the null hypothesis
// perform a KS test using a cumulative distribution function of our making
val myCDF: Double => Double = ...
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
{% endhighlight %}
</div>
</div>
## Random data generation ## Random data generation
Random data generation is useful for randomized algorithms, prototyping, and performance testing. Random data generation is useful for randomized algorithms, prototyping, and performance testing.

View file

@ -17,13 +17,16 @@
package org.apache.spark.mllib.stat package org.apache.spark.mllib.stat
import scala.annotation.varargs
import org.apache.spark.annotation.Experimental import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.linalg.{Matrix, Vector}
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.correlation.Correlations import org.apache.spark.mllib.stat.correlation.Correlations
import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSqTestResult} import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSqTestResult, KolmogorovSmirnovTest,
KolmogorovSmirnovTestResult}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
/** /**
@ -158,4 +161,39 @@ object Statistics {
def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = { def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = {
ChiSqTest.chiSquaredFeatures(data) ChiSqTest.chiSquaredFeatures(data)
} }
/**
* Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
* continuous distribution. By comparing the largest difference between the empirical cumulative
* distribution of the sample data and the theoretical distribution we can provide a test for the
* the null hypothesis that the sample data comes from that theoretical distribution.
* For more information on KS Test:
* @see [[https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test]]
*
* @param data an `RDD[Double]` containing the sample of data to test
* @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test
* statistic, p-value, and null hypothesis.
*/
def kolmogorovSmirnovTest(data: RDD[Double], cdf: Double => Double)
: KolmogorovSmirnovTestResult = {
KolmogorovSmirnovTest.testOneSample(data, cdf)
}
/**
* Convenience function to conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability
* distribution equality. Currently supports the normal distribution, taking as parameters
* the mean and standard deviation.
* (distName = "norm")
* @param data an `RDD[Double]` containing the sample of data to test
* @param distName a `String` name for a theoretical distribution
* @param params `Double*` specifying the parameters to be used for the theoretical distribution
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test
* statistic, p-value, and null hypothesis.
*/
@varargs
def kolmogorovSmirnovTest(data: RDD[Double], distName: String, params: Double*)
: KolmogorovSmirnovTestResult = {
KolmogorovSmirnovTest.testOneSample(data, distName, params: _*)
}
} }

View file

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.stat.test
import scala.annotation.varargs
import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution}
import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
/**
* Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a
* continuous distribution. By comparing the largest difference between the empirical cumulative
* distribution of the sample data and the theoretical distribution we can provide a test for the
* the null hypothesis that the sample data comes from that theoretical distribution.
* For more information on KS Test:
* @see [[https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test]]
*
* Implementation note: We seek to implement the KS test with a minimal number of distributed
* passes. We sort the RDD, and then perform the following operations on a per-partition basis:
* calculate an empirical cumulative distribution value for each observation, and a theoretical
* cumulative distribution value. We know the latter to be correct, while the former will be off by
* a constant (how large the constant is depends on how many values precede it in other partitions).
* However, given that this constant simply shifts the empirical CDF upwards, but doesn't
* change its shape, and furthermore, that constant is the same within a given partition, we can
* pick 2 values in each partition that can potentially resolve to the largest global distance.
* Namely, we pick the minimum distance and the maximum distance. Additionally, we keep track of how
* many elements are in each partition. Once these three values have been returned for every
* partition, we can collect and operate locally. Locally, we can now adjust each distance by the
* appropriate constant (the cumulative sum of number of elements in the prior partitions divided by
* thedata set size). Finally, we take the maximum absolute value, and this is the statistic.
*/
private[stat] object KolmogorovSmirnovTest extends Logging {
// Null hypothesis for the type of KS test to be included in the result.
object NullHypothesis extends Enumeration {
type NullHypothesis = Value
val OneSampleTwoSided = Value("Sample follows theoretical distribution")
}
/**
* Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution
* @param data `RDD[Double]` data on which to run test
* @param cdf `Double => Double` function to calculate the theoretical CDF
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] summarizing the test
* results (p-value, statistic, and null hypothesis)
*/
def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = {
val n = data.count().toDouble
val localData = data.sortBy(x => x).mapPartitions { part =>
val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
searchOneSampleCandidates(partDiffs) // candidates: local extrema
}.collect()
val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme
evalOneSampleP(ksStat, n.toLong)
}
/**
* Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution
* @param data `RDD[Double]` data on which to run test
* @param distObj `RealDistribution` a theoretical distribution
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] summarizing the test
* results (p-value, statistic, and null hypothesis)
*/
def testOneSample(data: RDD[Double], distObj: RealDistribution): KolmogorovSmirnovTestResult = {
val cdf = (x: Double) => distObj.cumulativeProbability(x)
testOneSample(data, cdf)
}
/**
* Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a
* partition
* @param partData `Iterator[Double]` 1 partition of a sorted RDD
* @param n `Double` the total size of the RDD
* @param cdf `Double => Double` a function the calculates the theoretical CDF of a value
* @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema
* in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF,
* the second element corresponds to empirical CDF - CDF. We can then search the resulting
* iterator for the minimum of the first and the maximum of the second element, and provide
* this as a partition's candidate extrema
*/
private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double)
: Iterator[(Double, Double)] = {
// zip data with index (within that partition)
// calculate local (unadjusted) empirical CDF and subtract CDF
partData.zipWithIndex.map { case (v, ix) =>
// dp and dl are later adjusted by constant, when global info is available
val dp = (ix + 1) / n
val dl = ix / n
val cdfVal = cdf(v)
(dl - cdfVal, dp - cdfVal)
}
}
/**
* Search the unadjusted differences in a partition and return the
* two extrema (furthest below and furthest above CDF), along with a count of elements in that
* partition
* @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF
* and CDFin a partition, which come as a tuple of
* (empirical CDF - 1/N - CDF, empirical CDF - CDF)
* @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements
*/
private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)])
: Iterator[(Double, Double, Double)] = {
val initAcc = (Double.MaxValue, Double.MinValue, 0.0)
val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) =>
(math.min(pMin, dl), math.max(pMax, dp), pCt + 1)
}
val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults)
results.iterator
}
/**
* Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after
* adjusting local extrema estimates from individual partitions with the amount of elements in
* preceding partitions
* @param localData `Array[(Double, Double, Double)]` A local array containing the collected
* results of `searchOneSampleCandidates` across all partitions
* @param n `Double`The size of the RDD
* @return The one-sample Kolmogorov Smirnov Statistic
*/
private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
: Double = {
val initAcc = (Double.MinValue, 0.0)
// adjust differences based on the number of elements preceding it, which should provide
// the correct distance between empirical CDF and CDF
val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
val adjConst = prevCt / n
val dist1 = math.abs(minCand + adjConst)
val dist2 = math.abs(maxCand + adjConst)
val maxVal = Array(prevMax, dist1, dist2).max
(maxVal, prevCt + ct)
}
results._1
}
/**
* A convenience function that allows running the KS test for 1 set of sample data against
* a named distribution
* @param data the sample data that we wish to evaluate
* @param distName the name of the theoretical distribution
* @param params Variable length parameter for distribution's parameters
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] summarizing the
* test results (p-value, statistic, and null hypothesis)
*/
@varargs
def testOneSample(data: RDD[Double], distName: String, params: Double*)
: KolmogorovSmirnovTestResult = {
val distObj =
distName match {
case "norm" => {
if (params.nonEmpty) {
// parameters are passed, then can only be 2
require(params.length == 2, "Normal distribution requires mean and standard " +
"deviation as parameters")
new NormalDistribution(params(0), params(1))
} else {
// if no parameters passed in initializes to standard normal
logInfo("No parameters specified for normal distribution," +
"initialized to standard normal (i.e. N(0, 1))")
new NormalDistribution(0, 1)
}
}
case _ => throw new UnsupportedOperationException(s"$distName not yet supported through" +
s" convenience method. Current options are:['norm'].")
}
testOneSample(data, distObj)
}
private def evalOneSampleP(ksStat: Double, n: Long): KolmogorovSmirnovTestResult = {
val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString)
}
}

View file

@ -90,3 +90,20 @@ class ChiSqTestResult private[stat] (override val pValue: Double,
super.toString super.toString
} }
} }
/**
* :: Experimental ::
* Object containing the test results for the Kolmogorov-Smirnov test.
*/
@Experimental
class KolmogorovSmirnovTestResult private[stat] (
override val pValue: Double,
override val statistic: Double,
override val nullHypothesis: String) extends TestResult[Int] {
override val degreesOfFreedom = 0
override def toString: String = {
"Kolmogorov-Smirnov test summary:\n" + super.toString
}
}

View file

@ -19,6 +19,10 @@ package org.apache.spark.mllib.stat
import java.util.Random import java.util.Random
import org.apache.commons.math3.distribution.{ExponentialDistribution,
NormalDistribution, UniformRealDistribution}
import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors} import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LabeledPoint
@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
} }
} }
test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") {
// Create theoretical distributions
val stdNormalDist = new NormalDistribution(0, 1)
val expDist = new ExponentialDistribution(0.6)
val unifDist = new UniformRealDistribution()
// set seeds
val seed = 10L
stdNormalDist.reseedRandomGenerator(seed)
expDist.reseedRandomGenerator(seed)
unifDist.reseedRandomGenerator(seed)
// Sample data from the distributions and parallelize it
val n = 100000
val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10)
val sampledExp = sc.parallelize(expDist.sample(n), 10)
val sampledUnif = sc.parallelize(unifDist.sample(n), 10)
// Use a apache math commons local KS test to verify calculations
val ksTest = new KolmogorovSmirnovTest()
val pThreshold = 0.05
// Comparing a standard normal sample to a standard normal distribution
val result1 = Statistics.kolmogorovSmirnovTest(sampledNorm, "norm", 0, 1)
val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledNorm.collect())
val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n)
// Verify vs apache math commons ks test
assert(result1.statistic ~== referenceStat1 relTol 1e-4)
assert(result1.pValue ~== referencePVal1 relTol 1e-4)
// Cannot reject null hypothesis
assert(result1.pValue > pThreshold)
// Comparing an exponential sample to a standard normal distribution
val result2 = Statistics.kolmogorovSmirnovTest(sampledExp, "norm", 0, 1)
val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledExp.collect())
val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n)
// verify vs apache math commons ks test
assert(result2.statistic ~== referenceStat2 relTol 1e-4)
assert(result2.pValue ~== referencePVal2 relTol 1e-4)
// reject null hypothesis
assert(result2.pValue < pThreshold)
// Testing the use of a user provided CDF function
// Distribution is not serializable, so will have to create in the lambda
val expCDF = (x: Double) => new ExponentialDistribution(0.2).cumulativeProbability(x)
// Comparing an exponential sample with mean X to an exponential distribution with mean Y
// Where X != Y
val result3 = Statistics.kolmogorovSmirnovTest(sampledExp, expCDF)
val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new ExponentialDistribution(0.2),
sampledExp.collect())
val referencePVal3 = 1 - ksTest.cdf(referenceStat3, sampledNorm.count().toInt)
// verify vs apache math commons ks test
assert(result3.statistic ~== referenceStat3 relTol 1e-4)
assert(result3.pValue ~== referencePVal3 relTol 1e-4)
// reject null hypothesis
assert(result3.pValue < pThreshold)
}
test("1 sample Kolmogorov-Smirnov test: R implementation equivalence") {
/*
Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample
> sessionInfo()
R version 3.2.0 (2015-04-16)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
> set.seed(20)
> v <- rnorm(20)
> v
[1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612
[7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222
[13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264
[19] -0.08578219 0.38921440
> ks.test(v, pnorm, alternative = "two.sided")
One-sample Kolmogorov-Smirnov test
data: v
D = 0.18874, p-value = 0.4223
alternative hypothesis: two-sided
*/
val rKSStat = 0.18874
val rKSPVal = 0.4223
val rData = sc.parallelize(
Array(
1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
-0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
-0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
-0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
)
)
val rCompResult = Statistics.kolmogorovSmirnovTest(rData, "norm", 0, 1)
assert(rCompResult.statistic ~== rKSStat relTol 1e-4)
assert(rCompResult.pValue ~== rKSPVal relTol 1e-4)
}
} }