spark-instrumented-optimizer/python/pyspark/mllib/stat.py
Davies Liu fce5e251d6 [SPARK-3491] [MLlib] [PySpark] use pickle to serialize data in MLlib
Currently, we serialize the data between JVM and Python case by case manually, this cannot scale to support so many APIs in MLlib.

This patch will try to address this problem by serialize the data using pickle protocol, using Pyrolite library to serialize/deserialize in JVM. Pickle protocol can be easily extended to support customized class.

All the modules are refactored to use this protocol.

Known issues: There will be some performance regression (both CPU and memory, the serialized data increased)

Author: Davies Liu <davies.liu@gmail.com>

Closes #2378 from davies/pickle_mllib and squashes the following commits:

dffbba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into pickle_mllib
810f97f [Davies Liu] fix equal of matrix
032cd62 [Davies Liu] add more type check and conversion for user_product
bd738ab [Davies Liu] address comments
e431377 [Davies Liu] fix cache of rdd, refactor
19d0967 [Davies Liu] refactor Picklers
2511e76 [Davies Liu] cleanup
1fccf1a [Davies Liu] address comments
a2cc855 [Davies Liu] fix tests
9ceff73 [Davies Liu] test size of serialized Rating
44e0551 [Davies Liu] fix cache
a379a81 [Davies Liu] fix pickle array in python2.7
df625c7 [Davies Liu] Merge commit '154d141' into pickle_mllib
154d141 [Davies Liu] fix autobatchedpickler
44736d7 [Davies Liu] speed up pickling array in Python 2.7
e1d1bfc [Davies Liu] refactor
708dc02 [Davies Liu] fix tests
9dcfb63 [Davies Liu] fix style
88034f0 [Davies Liu] rafactor, address comments
46a501e [Davies Liu] choose batch size automatically
df19464 [Davies Liu] memorize the module and class name during pickleing
f3506c5 [Davies Liu] Merge branch 'master' into pickle_mllib
722dd96 [Davies Liu] cleanup _common.py
0ee1525 [Davies Liu] remove outdated tests
b02e34f [Davies Liu] remove _common.py
84c721d [Davies Liu] Merge branch 'master' into pickle_mllib
4d7963e [Davies Liu] remove muanlly serialization
6d26b03 [Davies Liu] fix tests
c383544 [Davies Liu] classification
f2a0856 [Davies Liu] mllib/regression
d9f691f [Davies Liu] mllib/util
cccb8b1 [Davies Liu] mllib/tree
8fe166a [Davies Liu] Merge branch 'pickle' into pickle_mllib
aa2287e [Davies Liu] random
f1544c4 [Davies Liu] refactor clustering
52d1350 [Davies Liu] use new protocol in mllib/stat
b30ef35 [Davies Liu] use pickle to serialize data for mllib/recommendation
f44f771 [Davies Liu] enable tests about array
3908f5c [Davies Liu] Merge branch 'master' into pickle
c77c87b [Davies Liu] cleanup debugging code
60e4e2f [Davies Liu] support unpickle array.array for Python 2.6
2014-09-19 15:01:11 -07:00

189 lines
6.2 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Python package for statistical functions in MLlib.
"""
from functools import wraps
from pyspark import PickleSerializer
__all__ = ['MultivariateStatisticalSummary', 'Statistics']
def serialize(f):
ser = PickleSerializer()
@wraps(f)
def func(self):
jvec = f(self)
bytes = self._sc._jvm.SerDe.dumps(jvec)
return ser.loads(str(bytes)).toArray()
return func
class MultivariateStatisticalSummary(object):
"""
Trait for multivariate statistical summary of a data matrix.
"""
def __init__(self, sc, java_summary):
"""
:param sc: Spark context
:param java_summary: Handle to Java summary object
"""
self._sc = sc
self._java_summary = java_summary
def __del__(self):
self._sc._gateway.detach(self._java_summary)
@serialize
def mean(self):
return self._java_summary.mean()
@serialize
def variance(self):
return self._java_summary.variance()
def count(self):
return self._java_summary.count()
@serialize
def numNonzeros(self):
return self._java_summary.numNonzeros()
@serialize
def max(self):
return self._java_summary.max()
@serialize
def min(self):
return self._java_summary.min()
class Statistics(object):
@staticmethod
def colStats(rdd):
"""
Computes column-wise summary statistics for the input RDD[Vector].
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
... Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8])])
>>> cStats = Statistics.colStats(rdd)
>>> cStats.mean()
array([ 4., 4., 0., 3.])
>>> cStats.variance()
array([ 4., 13., 0., 25.])
>>> cStats.count()
3L
>>> cStats.numNonzeros()
array([ 3., 2., 0., 3.])
>>> cStats.max()
array([ 6., 7., 0., 8.])
>>> cStats.min()
array([ 2., 0., 0., -2.])
"""
sc = rdd.ctx
jrdd = rdd._to_java_object_rdd()
cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
return MultivariateStatisticalSummary(sc, cStats)
@staticmethod
def corr(x, y=None, method=None):
"""
Compute the correlation (matrix) for the input RDD(s) using the
specified method.
Methods currently supported: I{pearson (default), spearman}.
If a single RDD of Vectors is passed in, a correlation matrix
comparing the columns in the input RDD is returned. Use C{method=}
to specify the method to be used for single RDD inout.
If two RDDs of floats are passed in, a single float is returned.
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
>>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
True
>>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
True
>>> Statistics.corr(x, y, "spearman")
0.5
>>> from math import isnan
>>> isnan(Statistics.corr(x, zeros))
True
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
>>> pearsonCorr = Statistics.corr(rdd)
>>> print str(pearsonCorr).replace('nan', 'NaN')
[[ 1. 0.05564149 NaN 0.40047142]
[ 0.05564149 1. NaN 0.91359586]
[ NaN NaN 1. NaN]
[ 0.40047142 0.91359586 NaN 1. ]]
>>> spearmanCorr = Statistics.corr(rdd, method="spearman")
>>> print str(spearmanCorr).replace('nan', 'NaN')
[[ 1. 0.10540926 NaN 0.4 ]
[ 0.10540926 1. NaN 0.9486833 ]
[ NaN NaN 1. NaN]
[ 0.4 0.9486833 NaN 1. ]]
>>> try:
... Statistics.corr(rdd, "spearman")
... print "Method name as second argument without 'method=' shouldn't be allowed."
... except TypeError:
... pass
"""
sc = x.ctx
# Check inputs to determine whether a single value or a matrix is needed for output.
# Since it's legal for users to use the method name as the second argument, we need to
# check if y is used to specify the method name instead.
if type(y) == str:
raise TypeError("Use 'method=' to specify method name.")
jx = x._to_java_object_rdd()
if not y:
resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
bytes = sc._jvm.SerDe.dumps(resultMat)
ser = PickleSerializer()
return ser.loads(str(bytes)).toArray()
else:
jy = y._to_java_object_rdd()
return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)
def _test():
import doctest
from pyspark import SparkContext
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()