d0b4e93f7e
Python API for the KS-test Statistics.kolmogorovSmirnovTest(data, distName, *params) I'm not quite sure how to support the callable function since it is not serializable. Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #7430 from MechCoder/spark-8996 and squashes the following commits: 2dd009d [MechCoder] minor 021d233 [MechCoder] Remove one wrapper and other minor stuff 49d07ab [MechCoder] [SPARK-8996] [MLlib] Python API for Kolmogorov-Smirnov Test
320 lines
13 KiB
Python
320 lines
13 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import sys
|
|
if sys.version >= '3':
|
|
basestring = str
|
|
|
|
from pyspark.rdd import RDD, ignore_unicode_prefix
|
|
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
|
|
from pyspark.mllib.linalg import Matrix, _convert_to_vector
|
|
from pyspark.mllib.regression import LabeledPoint
|
|
from pyspark.mllib.stat.test import ChiSqTestResult, KolmogorovSmirnovTestResult
|
|
|
|
|
|
__all__ = ['MultivariateStatisticalSummary', 'Statistics']
|
|
|
|
|
|
class MultivariateStatisticalSummary(JavaModelWrapper):
|
|
|
|
"""
|
|
Trait for multivariate statistical summary of a data matrix.
|
|
"""
|
|
|
|
def mean(self):
|
|
return self.call("mean").toArray()
|
|
|
|
def variance(self):
|
|
return self.call("variance").toArray()
|
|
|
|
def count(self):
|
|
return int(self.call("count"))
|
|
|
|
def numNonzeros(self):
|
|
return self.call("numNonzeros").toArray()
|
|
|
|
def max(self):
|
|
return self.call("max").toArray()
|
|
|
|
def min(self):
|
|
return self.call("min").toArray()
|
|
|
|
def normL1(self):
|
|
return self.call("normL1").toArray()
|
|
|
|
def normL2(self):
|
|
return self.call("normL2").toArray()
|
|
|
|
|
|
class Statistics(object):
|
|
|
|
@staticmethod
|
|
def colStats(rdd):
|
|
"""
|
|
Computes column-wise summary statistics for the input RDD[Vector].
|
|
|
|
:param rdd: an RDD[Vector] for which column-wise summary statistics
|
|
are to be computed.
|
|
:return: :class:`MultivariateStatisticalSummary` object containing
|
|
column-wise summary statistics.
|
|
|
|
>>> from pyspark.mllib.linalg import Vectors
|
|
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
|
|
... Vectors.dense([4, 5, 0, 3]),
|
|
... Vectors.dense([6, 7, 0, 8])])
|
|
>>> cStats = Statistics.colStats(rdd)
|
|
>>> cStats.mean()
|
|
array([ 4., 4., 0., 3.])
|
|
>>> cStats.variance()
|
|
array([ 4., 13., 0., 25.])
|
|
>>> cStats.count()
|
|
3
|
|
>>> cStats.numNonzeros()
|
|
array([ 3., 2., 0., 3.])
|
|
>>> cStats.max()
|
|
array([ 6., 7., 0., 8.])
|
|
>>> cStats.min()
|
|
array([ 2., 0., 0., -2.])
|
|
"""
|
|
cStats = callMLlibFunc("colStats", rdd.map(_convert_to_vector))
|
|
return MultivariateStatisticalSummary(cStats)
|
|
|
|
@staticmethod
|
|
def corr(x, y=None, method=None):
|
|
"""
|
|
Compute the correlation (matrix) for the input RDD(s) using the
|
|
specified method.
|
|
Methods currently supported: I{pearson (default), spearman}.
|
|
|
|
If a single RDD of Vectors is passed in, a correlation matrix
|
|
comparing the columns in the input RDD is returned. Use C{method=}
|
|
to specify the method to be used for single RDD inout.
|
|
If two RDDs of floats are passed in, a single float is returned.
|
|
|
|
:param x: an RDD of vector for which the correlation matrix is to be computed,
|
|
or an RDD of float of the same cardinality as y when y is specified.
|
|
:param y: an RDD of float of the same cardinality as x.
|
|
:param method: String specifying the method to use for computing correlation.
|
|
Supported: `pearson` (default), `spearman`
|
|
:return: Correlation matrix comparing columns in x.
|
|
|
|
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
|
|
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
|
|
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
|
|
>>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
|
|
True
|
|
>>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
|
|
True
|
|
>>> Statistics.corr(x, y, "spearman")
|
|
0.5
|
|
>>> from math import isnan
|
|
>>> isnan(Statistics.corr(x, zeros))
|
|
True
|
|
>>> from pyspark.mllib.linalg import Vectors
|
|
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
|
|
... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
|
|
>>> pearsonCorr = Statistics.corr(rdd)
|
|
>>> print(str(pearsonCorr).replace('nan', 'NaN'))
|
|
[[ 1. 0.05564149 NaN 0.40047142]
|
|
[ 0.05564149 1. NaN 0.91359586]
|
|
[ NaN NaN 1. NaN]
|
|
[ 0.40047142 0.91359586 NaN 1. ]]
|
|
>>> spearmanCorr = Statistics.corr(rdd, method="spearman")
|
|
>>> print(str(spearmanCorr).replace('nan', 'NaN'))
|
|
[[ 1. 0.10540926 NaN 0.4 ]
|
|
[ 0.10540926 1. NaN 0.9486833 ]
|
|
[ NaN NaN 1. NaN]
|
|
[ 0.4 0.9486833 NaN 1. ]]
|
|
>>> try:
|
|
... Statistics.corr(rdd, "spearman")
|
|
... print("Method name as second argument without 'method=' shouldn't be allowed.")
|
|
... except TypeError:
|
|
... pass
|
|
"""
|
|
# Check inputs to determine whether a single value or a matrix is needed for output.
|
|
# Since it's legal for users to use the method name as the second argument, we need to
|
|
# check if y is used to specify the method name instead.
|
|
if type(y) == str:
|
|
raise TypeError("Use 'method=' to specify method name.")
|
|
|
|
if not y:
|
|
return callMLlibFunc("corr", x.map(_convert_to_vector), method).toArray()
|
|
else:
|
|
return callMLlibFunc("corr", x.map(float), y.map(float), method)
|
|
|
|
@staticmethod
|
|
@ignore_unicode_prefix
|
|
def chiSqTest(observed, expected=None):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
If `observed` is Vector, conduct Pearson's chi-squared goodness
|
|
of fit test of the observed data against the expected distribution,
|
|
or againt the uniform distribution (by default), with each category
|
|
having an expected frequency of `1 / len(observed)`.
|
|
(Note: `observed` cannot contain negative values)
|
|
|
|
If `observed` is matrix, conduct Pearson's independence test on the
|
|
input contingency matrix, which cannot contain negative entries or
|
|
columns or rows that sum up to 0.
|
|
|
|
If `observed` is an RDD of LabeledPoint, conduct Pearson's independence
|
|
test for every feature against the label across the input RDD.
|
|
For each feature, the (feature, label) pairs are converted into a
|
|
contingency matrix for which the chi-squared statistic is computed.
|
|
All label and feature values must be categorical.
|
|
|
|
:param observed: it could be a vector containing the observed categorical
|
|
counts/relative frequencies, or the contingency matrix
|
|
(containing either counts or relative frequencies),
|
|
or an RDD of LabeledPoint containing the labeled dataset
|
|
with categorical features. Real-valued features will be
|
|
treated as categorical for each distinct value.
|
|
:param expected: Vector containing the expected categorical counts/relative
|
|
frequencies. `expected` is rescaled if the `expected` sum
|
|
differs from the `observed` sum.
|
|
:return: ChiSquaredTest object containing the test statistic, degrees
|
|
of freedom, p-value, the method used, and the null hypothesis.
|
|
|
|
>>> from pyspark.mllib.linalg import Vectors, Matrices
|
|
>>> observed = Vectors.dense([4, 6, 5])
|
|
>>> pearson = Statistics.chiSqTest(observed)
|
|
>>> print(pearson.statistic)
|
|
0.4
|
|
>>> pearson.degreesOfFreedom
|
|
2
|
|
>>> print(round(pearson.pValue, 4))
|
|
0.8187
|
|
>>> pearson.method
|
|
u'pearson'
|
|
>>> pearson.nullHypothesis
|
|
u'observed follows the same distribution as expected.'
|
|
|
|
>>> observed = Vectors.dense([21, 38, 43, 80])
|
|
>>> expected = Vectors.dense([3, 5, 7, 20])
|
|
>>> pearson = Statistics.chiSqTest(observed, expected)
|
|
>>> print(round(pearson.pValue, 4))
|
|
0.0027
|
|
|
|
>>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
|
|
>>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
|
|
>>> print(round(chi.statistic, 4))
|
|
21.9958
|
|
|
|
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
|
|
... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
|
|
... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
|
|
... LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
|
|
... LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
|
|
... LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
|
|
>>> rdd = sc.parallelize(data, 4)
|
|
>>> chi = Statistics.chiSqTest(rdd)
|
|
>>> print(chi[0].statistic)
|
|
0.75
|
|
>>> print(chi[1].statistic)
|
|
1.5
|
|
"""
|
|
if isinstance(observed, RDD):
|
|
if not isinstance(observed.first(), LabeledPoint):
|
|
raise ValueError("observed should be an RDD of LabeledPoint")
|
|
jmodels = callMLlibFunc("chiSqTest", observed)
|
|
return [ChiSqTestResult(m) for m in jmodels]
|
|
|
|
if isinstance(observed, Matrix):
|
|
jmodel = callMLlibFunc("chiSqTest", observed)
|
|
else:
|
|
if expected and len(expected) != len(observed):
|
|
raise ValueError("`expected` should have same length with `observed`")
|
|
jmodel = callMLlibFunc("chiSqTest", _convert_to_vector(observed), expected)
|
|
return ChiSqTestResult(jmodel)
|
|
|
|
@staticmethod
|
|
@ignore_unicode_prefix
|
|
def kolmogorovSmirnovTest(data, distName="norm", *params):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Performs the Kolmogorov-Smirnov (KS) test for data sampled from
|
|
a continuous distribution. It tests the null hypothesis that
|
|
the data is generated from a particular distribution.
|
|
|
|
The given data is sorted and the Empirical Cumulative
|
|
Distribution Function (ECDF) is calculated
|
|
which for a given point is the number of points having a CDF
|
|
value lesser than it divided by the total number of points.
|
|
|
|
Since the data is sorted, this is a step function
|
|
that rises by (1 / length of data) for every ordered point.
|
|
|
|
The KS statistic gives us the maximum distance between the
|
|
ECDF and the CDF. Intuitively if this statistic is large, the
|
|
probabilty that the null hypothesis is true becomes small.
|
|
For specific details of the implementation, please have a look
|
|
at the Scala documentation.
|
|
|
|
:param data: RDD, samples from the data
|
|
:param distName: string, currently only "norm" is supported.
|
|
(Normal distribution) to calculate the
|
|
theoretical distribution of the data.
|
|
:param params: additional values which need to be provided for
|
|
a certain distribution.
|
|
If not provided, the default values are used.
|
|
:return: KolmogorovSmirnovTestResult object containing the test
|
|
statistic, degrees of freedom, p-value,
|
|
the method used, and the null hypothesis.
|
|
|
|
>>> kstest = Statistics.kolmogorovSmirnovTest
|
|
>>> data = sc.parallelize([-1.0, 0.0, 1.0])
|
|
>>> ksmodel = kstest(data, "norm")
|
|
>>> print(round(ksmodel.pValue, 3))
|
|
1.0
|
|
>>> print(round(ksmodel.statistic, 3))
|
|
0.175
|
|
>>> ksmodel.nullHypothesis
|
|
u'Sample follows theoretical distribution'
|
|
|
|
>>> data = sc.parallelize([2.0, 3.0, 4.0])
|
|
>>> ksmodel = kstest(data, "norm", 3.0, 1.0)
|
|
>>> print(round(ksmodel.pValue, 3))
|
|
1.0
|
|
>>> print(round(ksmodel.statistic, 3))
|
|
0.175
|
|
"""
|
|
if not isinstance(data, RDD):
|
|
raise TypeError("data should be an RDD, got %s." % type(data))
|
|
if not isinstance(distName, basestring):
|
|
raise TypeError("distName should be a string, got %s." % type(distName))
|
|
|
|
params = [float(param) for param in params]
|
|
return KolmogorovSmirnovTestResult(
|
|
callMLlibFunc("kolmogorovSmirnovTest", data, distName, params))
|
|
|
|
|
|
def _test():
|
|
import doctest
|
|
from pyspark import SparkContext
|
|
globs = globals().copy()
|
|
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
|
|
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
|
|
globs['sc'].stop()
|
|
if failure_count:
|
|
exit(-1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_test()
|