spark-instrumented-optimizer/python/pyspark/mllib/common.py
Davies Liu c8abddc516 [SPARK-3964] [MLlib] [PySpark] add Hypothesis test Python API
```
pyspark.mllib.stat.StatisticschiSqTest(observed, expected=None)
    :: Experimental ::

    If `observed` is Vector, conduct Pearson's chi-squared goodness
    of fit test of the observed data against the expected distribution,
    or againt the uniform distribution (by default), with each category
    having an expected frequency of `1 / len(observed)`.
    (Note: `observed` cannot contain negative values)

    If `observed` is matrix, conduct Pearson's independence test on the
    input contingency matrix, which cannot contain negative entries or
    columns or rows that sum up to 0.

    If `observed` is an RDD of LabeledPoint, conduct Pearson's independence
    test for every feature against the label across the input RDD.
    For each feature, the (feature, label) pairs are converted into a
    contingency matrix for which the chi-squared statistic is computed.
    All label and feature values must be categorical.

    :param observed: it could be a vector containing the observed categorical
                     counts/relative frequencies, or the contingency matrix
                     (containing either counts or relative frequencies),
                     or an RDD of LabeledPoint containing the labeled dataset
                     with categorical features. Real-valued features will be
                     treated as categorical for each distinct value.
    :param expected: Vector containing the expected categorical counts/relative
                     frequencies. `expected` is rescaled if the `expected` sum
                     differs from the `observed` sum.
    :return: ChiSquaredTest object containing the test statistic, degrees
             of freedom, p-value, the method used, and the null hypothesis.
```

Author: Davies Liu <davies@databricks.com>

Closes #3091 from davies/his and squashes the following commits:

145d16c [Davies Liu] address comments
0ab0764 [Davies Liu] fix float
5097d54 [Davies Liu] add Hypothesis test Python API
2014-11-04 21:35:52 -08:00

141 lines
4.1 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
from py4j.java_collections import MapConverter, ListConverter, JavaArray, JavaList
from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
# Hack for support float('inf') in Py4j
_old_smart_decode = py4j.protocol.smart_decode
_float_str_mapping = {
'nan': 'NaN',
'inf': 'Infinity',
'-inf': '-Infinity',
}
def _new_smart_decode(obj):
if isinstance(obj, float):
s = unicode(obj)
return _float_str_mapping.get(s, s)
return _old_smart_decode(obj)
py4j.protocol.smart_decode = _new_smart_decode
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
]
# this will call the MLlib version of pythonToJava()
def _to_java_object_rdd(rdd, cache=False):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
if cache:
rdd.cache()
return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
def _py2java(sc, obj):
""" Convert Python object into Java """
if isinstance(obj, RDD):
obj = _to_java_object_rdd(obj)
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, dict):
obj = MapConverter().convert(obj, sc._gateway._gateway_client)
elif isinstance(obj, (list, tuple)):
obj = ListConverter().convert(obj, sc._gateway._gateway_client)
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, basestring)):
pass
else:
bytes = bytearray(PickleSerializer().dumps(obj))
obj = sc._jvm.SerDe.loads(bytes)
return obj
def _java2py(sc, r):
if isinstance(r, JavaObject):
clsName = r.getClass().getSimpleName()
# convert RDD into JavaRDD
if clsName != 'JavaRDD' and clsName.endswith("RDD"):
r = r.toJavaRDD()
clsName = 'JavaRDD'
if clsName == 'JavaRDD':
jrdd = sc._jvm.SerDe.javaToPython(r)
return RDD(jrdd, sc)
if clsName in _picklable_classes:
r = sc._jvm.SerDe.dumps(r)
elif isinstance(r, (JavaArray, JavaList)):
try:
r = sc._jvm.SerDe.dumps(r)
except Py4JJavaError:
pass # not pickable
if isinstance(r, bytearray):
r = PickleSerializer().loads(str(r))
return r
def callJavaFunc(sc, func, *args):
""" Call Java Function """
args = [_py2java(sc, a) for a in args]
return _java2py(sc, func(*args))
def callMLlibFunc(name, *args):
""" Call API in PythonMLLibAPI """
sc = SparkContext._active_spark_context
api = getattr(sc._jvm.PythonMLLibAPI(), name)
return callJavaFunc(sc, api, *args)
class JavaModelWrapper(object):
"""
Wrapper for the model in JVM
"""
def __init__(self, java_model):
self._sc = SparkContext._active_spark_context
self._java_model = java_model
def __del__(self):
self._sc._gateway.detach(self._java_model)
def call(self, name, *a):
"""Call method of java_model"""
return callJavaFunc(self._sc, getattr(self._java_model, name), *a)