spark-instrumented-optimizer/python/pyspark/mllib/tests.py
lewuathe 257cde7c36 [SPARK-6421][MLLIB] _regression_train_wrapper does not test initialWeights correctly
Weight parameters must be initialized correctly even when numpy array is passed as initial weights.

Author: lewuathe <lewuathe@me.com>

Closes #5101 from Lewuathe/SPARK-6421 and squashes the following commits:

7795201 [lewuathe] Fix lint-python errors
21d4fe3 [lewuathe] Fix init logic of weights
2015-03-20 17:18:18 -04:00

629 lines
26 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Fuller unit tests for Python MLlib.
"""
import os
import sys
import tempfile
import array as pyarray
from numpy import array, array_equal
from py4j.protocol import Py4JJavaError
if sys.version_info[:2] <= (2, 6):
try:
import unittest2 as unittest
except ImportError:
sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
sys.exit(1)
else:
import unittest
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, Vectors, Matrices
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
from pyspark.serializers import PickleSerializer
from pyspark.sql import SQLContext
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
_have_scipy = False
try:
import scipy.sparse
_have_scipy = True
except:
# No SciPy, but that's okay, we'll skip those tests
pass
ser = PickleSerializer()
def _squared_distance(a, b):
if isinstance(a, Vector):
return a.squared_distance(b)
else:
return b.squared_distance(a)
class VectorTests(PySparkTestCase):
def _test_serialize(self, v):
self.assertEqual(v, ser.loads(ser.dumps(v)))
jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
self.assertEqual(v, nv)
vs = [v] * 100
jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs)))
nvs = ser.loads(str(self.sc._jvm.SerDe.dumps(jvecs)))
self.assertEqual(vs, nvs)
def test_serialize(self):
self._test_serialize(DenseVector(range(10)))
self._test_serialize(DenseVector(array([1., 2., 3., 4.])))
self._test_serialize(DenseVector(pyarray.array('d', range(10))))
self._test_serialize(SparseVector(4, {1: 1, 3: 2}))
self._test_serialize(SparseVector(3, {}))
self._test_serialize(DenseMatrix(2, 3, range(6)))
def test_dot(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = DenseVector(array([1., 2., 3., 4.]))
lst = DenseVector([1, 2, 3, 4])
mat = array([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]])
self.assertEquals(10.0, sv.dot(dv))
self.assertTrue(array_equal(array([3., 6., 9., 12.]), sv.dot(mat)))
self.assertEquals(30.0, dv.dot(dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), dv.dot(mat)))
self.assertEquals(30.0, lst.dot(dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), lst.dot(mat)))
def test_squared_distance(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = DenseVector(array([1., 2., 3., 4.]))
lst = DenseVector([4, 3, 2, 1])
self.assertEquals(15.0, _squared_distance(sv, dv))
self.assertEquals(25.0, _squared_distance(sv, lst))
self.assertEquals(20.0, _squared_distance(dv, lst))
self.assertEquals(15.0, _squared_distance(dv, sv))
self.assertEquals(25.0, _squared_distance(lst, sv))
self.assertEquals(20.0, _squared_distance(lst, dv))
self.assertEquals(0.0, _squared_distance(sv, sv))
self.assertEquals(0.0, _squared_distance(dv, dv))
self.assertEquals(0.0, _squared_distance(lst, lst))
def test_conversion(self):
# numpy arrays should be automatically upcast to float64
# tests for fix of [SPARK-5089]
v = array([1, 2, 3, 4], dtype='float64')
dv = DenseVector(v)
self.assertTrue(dv.array.dtype == 'float64')
v = array([1, 2, 3, 4], dtype='float32')
dv = DenseVector(v)
self.assertTrue(dv.array.dtype == 'float64')
def test_sparse_vector_indexing(self):
sv = SparseVector(4, {1: 1, 3: 2})
self.assertEquals(sv[0], 0.)
self.assertEquals(sv[3], 2.)
self.assertEquals(sv[1], 1.)
self.assertEquals(sv[2], 0.)
self.assertEquals(sv[-1], 2)
self.assertEquals(sv[-2], 0)
self.assertEquals(sv[-4], 0)
for ind in [4, -5, 7.8]:
self.assertRaises(ValueError, sv.__getitem__, ind)
class ListTests(PySparkTestCase):
"""
Test MLlib algorithms on plain lists, to make sure they're passed through
as NumPy arrays.
"""
def test_kmeans(self):
from pyspark.mllib.clustering import KMeans
data = [
[0, 1.1],
[0, 1.2],
[1.1, 0],
[1.2, 0],
]
clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
def test_kmeans_deterministic(self):
from pyspark.mllib.clustering import KMeans
X = range(0, 100, 10)
Y = range(0, 100, 10)
data = [[x, y] for x, y in zip(X, Y)]
clusters1 = KMeans.train(self.sc.parallelize(data),
3, initializationMode="k-means||", seed=42)
clusters2 = KMeans.train(self.sc.parallelize(data),
3, initializationMode="k-means||", seed=42)
centers1 = clusters1.centers
centers2 = clusters2.centers
for c1, c2 in zip(centers1, centers2):
# TODO: Allow small numeric difference.
self.assertTrue(array_equal(c1, c2))
def test_gmm(self):
from pyspark.mllib.clustering import GaussianMixture
data = self.sc.parallelize([
[1, 2],
[8, 9],
[-4, -3],
[-6, -7],
])
clusters = GaussianMixture.train(data, 2, convergenceTol=0.001,
maxIterations=100, seed=56)
labels = clusters.predict(data).collect()
self.assertEquals(labels[0], labels[1])
self.assertEquals(labels[2], labels[3])
def test_gmm_deterministic(self):
from pyspark.mllib.clustering import GaussianMixture
x = range(0, 100, 10)
y = range(0, 100, 10)
data = self.sc.parallelize([[a, b] for a, b in zip(x, y)])
clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001,
maxIterations=100, seed=63)
clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001,
maxIterations=100, seed=63)
for c1, c2 in zip(clusters1.weights, clusters2.weights):
self.assertEquals(round(c1, 7), round(c2, 7))
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel, RandomForest,\
RandomForestModel, GradientBoostedTrees, GradientBoostedTreesModel
data = [
LabeledPoint(0.0, [1, 0, 0]),
LabeledPoint(1.0, [0, 1, 1]),
LabeledPoint(0.0, [2, 0, 0]),
LabeledPoint(1.0, [0, 2, 1])
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
temp_dir = tempfile.mkdtemp()
lr_model = LogisticRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
svm_model = SVMWithSGD.train(rdd)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)
nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = DecisionTree.trainClassifier(
rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
dt_model_dir = os.path.join(temp_dir, "dt")
dt_model.save(self.sc, dt_model_dir)
same_dt_model = DecisionTreeModel.load(self.sc, dt_model_dir)
self.assertEqual(same_dt_model.toDebugString(), dt_model.toDebugString())
rf_model = RandomForest.trainClassifier(
rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)
rf_model_dir = os.path.join(temp_dir, "rf")
rf_model.save(self.sc, rf_model_dir)
same_rf_model = RandomForestModel.load(self.sc, rf_model_dir)
self.assertEqual(same_rf_model.toDebugString(), rf_model.toDebugString())
gbt_model = GradientBoostedTrees.trainClassifier(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)
gbt_model_dir = os.path.join(temp_dir, "gbt")
gbt_model.save(self.sc, gbt_model_dir)
same_gbt_model = GradientBoostedTreesModel.load(self.sc, gbt_model_dir)
self.assertEqual(same_gbt_model.toDebugString(), gbt_model.toDebugString())
try:
os.removedirs(temp_dir)
except OSError:
pass
def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
data = [
LabeledPoint(-1.0, [0, -1]),
LabeledPoint(1.0, [0, 1]),
LabeledPoint(-1.0, [0, -2]),
LabeledPoint(1.0, [0, 2])
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
lr_model = LinearRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
lasso_model = LassoWithSGD.train(rdd)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
rr_model = RidgeRegressionWithSGD.train(rdd)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = DecisionTree.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
rf_model = RandomForest.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100, seed=1)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)
gbt_model = GradientBoostedTrees.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)
try:
LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
except ValueError:
self.fail()
class StatTests(PySparkTestCase):
# SPARK-4023
def test_col_with_different_rdds(self):
# numpy
data = RandomRDDs.normalVectorRDD(self.sc, 1000, 10, 10)
summary = Statistics.colStats(data)
self.assertEqual(1000, summary.count())
# array
data = self.sc.parallelize([range(10)] * 10)
summary = Statistics.colStats(data)
self.assertEqual(10, summary.count())
# array
data = self.sc.parallelize([pyarray.array("d", range(10))] * 10)
summary = Statistics.colStats(data)
self.assertEqual(10, summary.count())
class VectorUDTTests(PySparkTestCase):
dv0 = DenseVector([])
dv1 = DenseVector([1.0, 2.0])
sv0 = SparseVector(2, [], [])
sv1 = SparseVector(2, [1], [2.0])
udt = VectorUDT()
def test_json_schema(self):
self.assertEqual(VectorUDT.fromJson(self.udt.jsonValue()), self.udt)
def test_serialization(self):
for v in [self.dv0, self.dv1, self.sv0, self.sv1]:
self.assertEqual(v, self.udt.deserialize(self.udt.serialize(v)))
def test_infer_schema(self):
sqlCtx = SQLContext(self.sc)
rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), LabeledPoint(0.0, self.sv1)])
srdd = sqlCtx.inferSchema(rdd)
schema = srdd.schema
field = [f for f in schema.fields if f.name == "features"][0]
self.assertEqual(field.dataType, self.udt)
vectors = srdd.map(lambda p: p.features).collect()
self.assertEqual(len(vectors), 2)
for v in vectors:
if isinstance(v, SparseVector):
self.assertEqual(v, self.sv1)
elif isinstance(v, DenseVector):
self.assertEqual(v, self.dv1)
else:
raise ValueError("expecting a vector but got %r of type %r" % (v, type(v)))
@unittest.skipIf(not _have_scipy, "SciPy not installed")
class SciPyTests(PySparkTestCase):
"""
Test both vector operations and MLlib algorithms with SciPy sparse matrices,
if SciPy is available.
"""
def test_serialize(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
sv = SparseVector(4, {1: 1, 3: 2})
self.assertEquals(sv, _convert_to_vector(lil))
self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
self.assertEquals(sv, _convert_to_vector(lil.todok()))
def serialize(l):
return ser.loads(ser.dumps(_convert_to_vector(l)))
self.assertEquals(sv, serialize(lil))
self.assertEquals(sv, serialize(lil.tocsc()))
self.assertEquals(sv, serialize(lil.tocsr()))
self.assertEquals(sv, serialize(lil.todok()))
def test_dot(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
dv = DenseVector(array([1., 2., 3., 4.]))
self.assertEquals(10.0, dv.dot(lil))
def test_squared_distance(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 3
lil[3, 0] = 2
dv = DenseVector(array([1., 2., 3., 4.]))
sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
self.assertEquals(15.0, dv.squared_distance(lil))
self.assertEquals(15.0, sv.squared_distance(lil))
def scipy_matrix(self, size, values):
"""Create a column SciPy matrix from a dictionary of values"""
from scipy.sparse import lil_matrix
lil = lil_matrix((size, 1))
for key, value in values.items():
lil[key, 0] = value
return lil
def test_clustering(self):
from pyspark.mllib.clustering import KMeans
data = [
self.scipy_matrix(3, {1: 1.0}),
self.scipy_matrix(3, {1: 1.1}),
self.scipy_matrix(3, {2: 1.0}),
self.scipy_matrix(3, {2: 1.1})
]
clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree
data = [
LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
]
rdd = self.sc.parallelize(data)
features = [p.features for p in data]
lr_model = LogisticRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
svm_model = SVMWithSGD.train(rdd)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)
nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = DecisionTree.trainClassifier(rdd, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
data = [
LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
]
rdd = self.sc.parallelize(data)
features = [p.features for p in data]
lr_model = LinearRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
lasso_model = LassoWithSGD.train(rdd)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
rr_model = RidgeRegressionWithSGD.train(rdd)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
class ChiSqTestTests(PySparkTestCase):
def test_goodness_of_fit(self):
from numpy import inf
observed = Vectors.dense([4, 6, 5])
pearson = Statistics.chiSqTest(observed)
# Validated against the R command `chisq.test(c(4, 6, 5), p=c(1/3, 1/3, 1/3))`
self.assertEqual(pearson.statistic, 0.4)
self.assertEqual(pearson.degreesOfFreedom, 2)
self.assertAlmostEqual(pearson.pValue, 0.8187, 4)
# Different expected and observed sum
observed1 = Vectors.dense([21, 38, 43, 80])
expected1 = Vectors.dense([3, 5, 7, 20])
pearson1 = Statistics.chiSqTest(observed1, expected1)
# Results validated against the R command
# `chisq.test(c(21, 38, 43, 80), p=c(3/35, 1/7, 1/5, 4/7))`
self.assertAlmostEqual(pearson1.statistic, 14.1429, 4)
self.assertEqual(pearson1.degreesOfFreedom, 3)
self.assertAlmostEqual(pearson1.pValue, 0.002717, 4)
# Vectors with different sizes
observed3 = Vectors.dense([1.0, 2.0, 3.0])
expected3 = Vectors.dense([1.0, 2.0, 3.0, 4.0])
self.assertRaises(ValueError, Statistics.chiSqTest, observed3, expected3)
# Negative counts in observed
neg_obs = Vectors.dense([1.0, 2.0, 3.0, -4.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, neg_obs, expected1)
# Count = 0.0 in expected but not observed
zero_expected = Vectors.dense([1.0, 0.0, 3.0])
pearson_inf = Statistics.chiSqTest(observed, zero_expected)
self.assertEqual(pearson_inf.statistic, inf)
self.assertEqual(pearson_inf.degreesOfFreedom, 2)
self.assertEqual(pearson_inf.pValue, 0.0)
# 0.0 in expected and observed simultaneously
zero_observed = Vectors.dense([2.0, 0.0, 1.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, zero_observed, zero_expected)
def test_matrix_independence(self):
data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
# Results validated against R command
# `chisq.test(rbind(c(40, 56, 31, 30),c(24, 32, 10, 15), c(29, 42, 0, 12)))`
self.assertAlmostEqual(chi.statistic, 21.9958, 4)
self.assertEqual(chi.degreesOfFreedom, 6)
self.assertAlmostEqual(chi.pValue, 0.001213, 4)
# Negative counts
neg_counts = Matrices.dense(2, 2, [4.0, 5.0, 3.0, -3.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, neg_counts)
# Row sum = 0.0
row_zero = Matrices.dense(2, 2, [0.0, 1.0, 0.0, 2.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, row_zero)
# Column sum = 0.0
col_zero = Matrices.dense(2, 2, [0.0, 0.0, 2.0, 2.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, col_zero)
def test_chi_sq_pearson(self):
data = [
LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
LabeledPoint(1.0, Vectors.dense([3.5, 40.0]))
]
for numParts in [2, 4, 6, 8]:
chi = Statistics.chiSqTest(self.sc.parallelize(data, numParts))
feature1 = chi[0]
self.assertEqual(feature1.statistic, 0.75)
self.assertEqual(feature1.degreesOfFreedom, 2)
self.assertAlmostEqual(feature1.pValue, 0.6873, 4)
feature2 = chi[1]
self.assertEqual(feature2.statistic, 1.5)
self.assertEqual(feature2.degreesOfFreedom, 3)
self.assertAlmostEqual(feature2.pValue, 0.6823, 4)
def test_right_number_of_results(self):
num_cols = 1001
sparse_data = [
LabeledPoint(0.0, Vectors.sparse(num_cols, [(100, 2.0)])),
LabeledPoint(0.1, Vectors.sparse(num_cols, [(200, 1.0)]))
]
chi = Statistics.chiSqTest(self.sc.parallelize(sparse_data))
self.assertEqual(len(chi), num_cols)
self.assertIsNotNone(chi[1000])
if __name__ == "__main__":
if not _have_scipy:
print "NOTE: Skipping SciPy tests as it does not seem to be installed"
unittest.main()
if not _have_scipy:
print "NOTE: SciPy tests were skipped as it does not seem to be installed"