04e44b37cc
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
255 lines
10 KiB
Python
255 lines
10 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
from pyspark.rdd import RDD, ignore_unicode_prefix
|
|
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
|
|
from pyspark.mllib.linalg import Matrix, _convert_to_vector
|
|
from pyspark.mllib.regression import LabeledPoint
|
|
from pyspark.mllib.stat.test import ChiSqTestResult
|
|
|
|
|
|
__all__ = ['MultivariateStatisticalSummary', 'Statistics']
|
|
|
|
|
|
class MultivariateStatisticalSummary(JavaModelWrapper):
|
|
|
|
"""
|
|
Trait for multivariate statistical summary of a data matrix.
|
|
"""
|
|
|
|
def mean(self):
|
|
return self.call("mean").toArray()
|
|
|
|
def variance(self):
|
|
return self.call("variance").toArray()
|
|
|
|
def count(self):
|
|
return int(self.call("count"))
|
|
|
|
def numNonzeros(self):
|
|
return self.call("numNonzeros").toArray()
|
|
|
|
def max(self):
|
|
return self.call("max").toArray()
|
|
|
|
def min(self):
|
|
return self.call("min").toArray()
|
|
|
|
def normL1(self):
|
|
return self.call("normL1").toArray()
|
|
|
|
def normL2(self):
|
|
return self.call("normL2").toArray()
|
|
|
|
|
|
class Statistics(object):
|
|
|
|
@staticmethod
|
|
def colStats(rdd):
|
|
"""
|
|
Computes column-wise summary statistics for the input RDD[Vector].
|
|
|
|
:param rdd: an RDD[Vector] for which column-wise summary statistics
|
|
are to be computed.
|
|
:return: :class:`MultivariateStatisticalSummary` object containing
|
|
column-wise summary statistics.
|
|
|
|
>>> from pyspark.mllib.linalg import Vectors
|
|
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
|
|
... Vectors.dense([4, 5, 0, 3]),
|
|
... Vectors.dense([6, 7, 0, 8])])
|
|
>>> cStats = Statistics.colStats(rdd)
|
|
>>> cStats.mean()
|
|
array([ 4., 4., 0., 3.])
|
|
>>> cStats.variance()
|
|
array([ 4., 13., 0., 25.])
|
|
>>> cStats.count()
|
|
3
|
|
>>> cStats.numNonzeros()
|
|
array([ 3., 2., 0., 3.])
|
|
>>> cStats.max()
|
|
array([ 6., 7., 0., 8.])
|
|
>>> cStats.min()
|
|
array([ 2., 0., 0., -2.])
|
|
"""
|
|
cStats = callMLlibFunc("colStats", rdd.map(_convert_to_vector))
|
|
return MultivariateStatisticalSummary(cStats)
|
|
|
|
@staticmethod
|
|
def corr(x, y=None, method=None):
|
|
"""
|
|
Compute the correlation (matrix) for the input RDD(s) using the
|
|
specified method.
|
|
Methods currently supported: I{pearson (default), spearman}.
|
|
|
|
If a single RDD of Vectors is passed in, a correlation matrix
|
|
comparing the columns in the input RDD is returned. Use C{method=}
|
|
to specify the method to be used for single RDD inout.
|
|
If two RDDs of floats are passed in, a single float is returned.
|
|
|
|
:param x: an RDD of vector for which the correlation matrix is to be computed,
|
|
or an RDD of float of the same cardinality as y when y is specified.
|
|
:param y: an RDD of float of the same cardinality as x.
|
|
:param method: String specifying the method to use for computing correlation.
|
|
Supported: `pearson` (default), `spearman`
|
|
:return: Correlation matrix comparing columns in x.
|
|
|
|
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
|
|
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
|
|
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
|
|
>>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
|
|
True
|
|
>>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
|
|
True
|
|
>>> Statistics.corr(x, y, "spearman")
|
|
0.5
|
|
>>> from math import isnan
|
|
>>> isnan(Statistics.corr(x, zeros))
|
|
True
|
|
>>> from pyspark.mllib.linalg import Vectors
|
|
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
|
|
... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
|
|
>>> pearsonCorr = Statistics.corr(rdd)
|
|
>>> print(str(pearsonCorr).replace('nan', 'NaN'))
|
|
[[ 1. 0.05564149 NaN 0.40047142]
|
|
[ 0.05564149 1. NaN 0.91359586]
|
|
[ NaN NaN 1. NaN]
|
|
[ 0.40047142 0.91359586 NaN 1. ]]
|
|
>>> spearmanCorr = Statistics.corr(rdd, method="spearman")
|
|
>>> print(str(spearmanCorr).replace('nan', 'NaN'))
|
|
[[ 1. 0.10540926 NaN 0.4 ]
|
|
[ 0.10540926 1. NaN 0.9486833 ]
|
|
[ NaN NaN 1. NaN]
|
|
[ 0.4 0.9486833 NaN 1. ]]
|
|
>>> try:
|
|
... Statistics.corr(rdd, "spearman")
|
|
... print("Method name as second argument without 'method=' shouldn't be allowed.")
|
|
... except TypeError:
|
|
... pass
|
|
"""
|
|
# Check inputs to determine whether a single value or a matrix is needed for output.
|
|
# Since it's legal for users to use the method name as the second argument, we need to
|
|
# check if y is used to specify the method name instead.
|
|
if type(y) == str:
|
|
raise TypeError("Use 'method=' to specify method name.")
|
|
|
|
if not y:
|
|
return callMLlibFunc("corr", x.map(_convert_to_vector), method).toArray()
|
|
else:
|
|
return callMLlibFunc("corr", x.map(float), y.map(float), method)
|
|
|
|
@staticmethod
|
|
@ignore_unicode_prefix
|
|
def chiSqTest(observed, expected=None):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
If `observed` is Vector, conduct Pearson's chi-squared goodness
|
|
of fit test of the observed data against the expected distribution,
|
|
or againt the uniform distribution (by default), with each category
|
|
having an expected frequency of `1 / len(observed)`.
|
|
(Note: `observed` cannot contain negative values)
|
|
|
|
If `observed` is matrix, conduct Pearson's independence test on the
|
|
input contingency matrix, which cannot contain negative entries or
|
|
columns or rows that sum up to 0.
|
|
|
|
If `observed` is an RDD of LabeledPoint, conduct Pearson's independence
|
|
test for every feature against the label across the input RDD.
|
|
For each feature, the (feature, label) pairs are converted into a
|
|
contingency matrix for which the chi-squared statistic is computed.
|
|
All label and feature values must be categorical.
|
|
|
|
:param observed: it could be a vector containing the observed categorical
|
|
counts/relative frequencies, or the contingency matrix
|
|
(containing either counts or relative frequencies),
|
|
or an RDD of LabeledPoint containing the labeled dataset
|
|
with categorical features. Real-valued features will be
|
|
treated as categorical for each distinct value.
|
|
:param expected: Vector containing the expected categorical counts/relative
|
|
frequencies. `expected` is rescaled if the `expected` sum
|
|
differs from the `observed` sum.
|
|
:return: ChiSquaredTest object containing the test statistic, degrees
|
|
of freedom, p-value, the method used, and the null hypothesis.
|
|
|
|
>>> from pyspark.mllib.linalg import Vectors, Matrices
|
|
>>> observed = Vectors.dense([4, 6, 5])
|
|
>>> pearson = Statistics.chiSqTest(observed)
|
|
>>> print(pearson.statistic)
|
|
0.4
|
|
>>> pearson.degreesOfFreedom
|
|
2
|
|
>>> print(round(pearson.pValue, 4))
|
|
0.8187
|
|
>>> pearson.method
|
|
u'pearson'
|
|
>>> pearson.nullHypothesis
|
|
u'observed follows the same distribution as expected.'
|
|
|
|
>>> observed = Vectors.dense([21, 38, 43, 80])
|
|
>>> expected = Vectors.dense([3, 5, 7, 20])
|
|
>>> pearson = Statistics.chiSqTest(observed, expected)
|
|
>>> print(round(pearson.pValue, 4))
|
|
0.0027
|
|
|
|
>>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
|
|
>>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
|
|
>>> print(round(chi.statistic, 4))
|
|
21.9958
|
|
|
|
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
|
|
... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
|
|
... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
|
|
... LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
|
|
... LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
|
|
... LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
|
|
>>> rdd = sc.parallelize(data, 4)
|
|
>>> chi = Statistics.chiSqTest(rdd)
|
|
>>> print(chi[0].statistic)
|
|
0.75
|
|
>>> print(chi[1].statistic)
|
|
1.5
|
|
"""
|
|
if isinstance(observed, RDD):
|
|
if not isinstance(observed.first(), LabeledPoint):
|
|
raise ValueError("observed should be an RDD of LabeledPoint")
|
|
jmodels = callMLlibFunc("chiSqTest", observed)
|
|
return [ChiSqTestResult(m) for m in jmodels]
|
|
|
|
if isinstance(observed, Matrix):
|
|
jmodel = callMLlibFunc("chiSqTest", observed)
|
|
else:
|
|
if expected and len(expected) != len(observed):
|
|
raise ValueError("`expected` should have same length with `observed`")
|
|
jmodel = callMLlibFunc("chiSqTest", _convert_to_vector(observed), expected)
|
|
return ChiSqTestResult(jmodel)
|
|
|
|
|
|
def _test():
|
|
import doctest
|
|
from pyspark import SparkContext
|
|
globs = globals().copy()
|
|
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
|
|
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
|
|
globs['sc'].stop()
|
|
if failure_count:
|
|
exit(-1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_test()
|