spark-instrumented-optimizer/python/pyspark/mllib/tests.py
Xiangrui Meng 1a9c6cddad [SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD
Register MLlib's Vector as a SQL user-defined type (UDT) in both Scala and Python. With this PR, we can easily map a RDD[LabeledPoint] to a SchemaRDD, and then select columns or save to a Parquet file. Examples in Scala/Python are attached. The Scala code was copied from jkbradley.

~~This PR contains the changes from #3068 . I will rebase after #3068 is merged.~~

marmbrus jkbradley

Author: Xiangrui Meng <meng@databricks.com>

Closes #3070 from mengxr/SPARK-3573 and squashes the following commits:

3a0b6e5 [Xiangrui Meng] organize imports
236f0a0 [Xiangrui Meng] register vector as UDT and provide dataset examples
2014-11-03 22:29:48 -08:00

405 lines
16 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Fuller unit tests for Python MLlib.
"""
import sys
import array as pyarray
from numpy import array, array_equal
if sys.version_info[:2] <= (2, 6):
try:
import unittest2 as unittest
except ImportError:
sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
sys.exit(1)
else:
import unittest
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
from pyspark.serializers import PickleSerializer
from pyspark.sql import SQLContext
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
_have_scipy = False
try:
import scipy.sparse
_have_scipy = True
except:
# No SciPy, but that's okay, we'll skip those tests
pass
ser = PickleSerializer()
def _squared_distance(a, b):
if isinstance(a, Vector):
return a.squared_distance(b)
else:
return b.squared_distance(a)
class VectorTests(PySparkTestCase):
def _test_serialize(self, v):
jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
self.assertEqual(v, nv)
vs = [v] * 100
jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs)))
nvs = ser.loads(str(self.sc._jvm.SerDe.dumps(jvecs)))
self.assertEqual(vs, nvs)
def test_serialize(self):
self._test_serialize(DenseVector(range(10)))
self._test_serialize(DenseVector(array([1., 2., 3., 4.])))
self._test_serialize(DenseVector(pyarray.array('d', range(10))))
self._test_serialize(SparseVector(4, {1: 1, 3: 2}))
def test_dot(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = DenseVector(array([1., 2., 3., 4.]))
lst = DenseVector([1, 2, 3, 4])
mat = array([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]])
self.assertEquals(10.0, sv.dot(dv))
self.assertTrue(array_equal(array([3., 6., 9., 12.]), sv.dot(mat)))
self.assertEquals(30.0, dv.dot(dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), dv.dot(mat)))
self.assertEquals(30.0, lst.dot(dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), lst.dot(mat)))
def test_squared_distance(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = DenseVector(array([1., 2., 3., 4.]))
lst = DenseVector([4, 3, 2, 1])
self.assertEquals(15.0, _squared_distance(sv, dv))
self.assertEquals(25.0, _squared_distance(sv, lst))
self.assertEquals(20.0, _squared_distance(dv, lst))
self.assertEquals(15.0, _squared_distance(dv, sv))
self.assertEquals(25.0, _squared_distance(lst, sv))
self.assertEquals(20.0, _squared_distance(lst, dv))
self.assertEquals(0.0, _squared_distance(sv, sv))
self.assertEquals(0.0, _squared_distance(dv, dv))
self.assertEquals(0.0, _squared_distance(lst, lst))
class ListTests(PySparkTestCase):
"""
Test MLlib algorithms on plain lists, to make sure they're passed through
as NumPy arrays.
"""
def test_clustering(self):
from pyspark.mllib.clustering import KMeans
data = [
[0, 1.1],
[0, 1.2],
[1.1, 0],
[1.2, 0],
]
clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree
data = [
LabeledPoint(0.0, [1, 0, 0]),
LabeledPoint(1.0, [0, 1, 1]),
LabeledPoint(0.0, [2, 0, 0]),
LabeledPoint(1.0, [0, 2, 1])
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
lr_model = LogisticRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
svm_model = SVMWithSGD.train(rdd)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)
nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = \
DecisionTree.trainClassifier(rdd, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
data = [
LabeledPoint(-1.0, [0, -1]),
LabeledPoint(1.0, [0, 1]),
LabeledPoint(-1.0, [0, -2]),
LabeledPoint(1.0, [0, 2])
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
lr_model = LinearRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
lasso_model = LassoWithSGD.train(rdd)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
rr_model = RidgeRegressionWithSGD.train(rdd)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = \
DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
class StatTests(PySparkTestCase):
# SPARK-4023
def test_col_with_different_rdds(self):
# numpy
data = RandomRDDs.normalVectorRDD(self.sc, 1000, 10, 10)
summary = Statistics.colStats(data)
self.assertEqual(1000, summary.count())
# array
data = self.sc.parallelize([range(10)] * 10)
summary = Statistics.colStats(data)
self.assertEqual(10, summary.count())
# array
data = self.sc.parallelize([pyarray.array("d", range(10))] * 10)
summary = Statistics.colStats(data)
self.assertEqual(10, summary.count())
class VectorUDTTests(PySparkTestCase):
dv0 = DenseVector([])
dv1 = DenseVector([1.0, 2.0])
sv0 = SparseVector(2, [], [])
sv1 = SparseVector(2, [1], [2.0])
udt = VectorUDT()
def test_json_schema(self):
self.assertEqual(VectorUDT.fromJson(self.udt.jsonValue()), self.udt)
def test_serialization(self):
for v in [self.dv0, self.dv1, self.sv0, self.sv1]:
self.assertEqual(v, self.udt.deserialize(self.udt.serialize(v)))
def test_infer_schema(self):
sqlCtx = SQLContext(self.sc)
rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), LabeledPoint(0.0, self.sv1)])
srdd = sqlCtx.inferSchema(rdd)
schema = srdd.schema()
field = [f for f in schema.fields if f.name == "features"][0]
self.assertEqual(field.dataType, self.udt)
vectors = srdd.map(lambda p: p.features).collect()
self.assertEqual(len(vectors), 2)
for v in vectors:
if isinstance(v, SparseVector):
self.assertEqual(v, self.sv1)
elif isinstance(v, DenseVector):
self.assertEqual(v, self.dv1)
else:
raise ValueError("expecting a vector but got %r of type %r" % (v, type(v)))
@unittest.skipIf(not _have_scipy, "SciPy not installed")
class SciPyTests(PySparkTestCase):
"""
Test both vector operations and MLlib algorithms with SciPy sparse matrices,
if SciPy is available.
"""
def test_serialize(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
sv = SparseVector(4, {1: 1, 3: 2})
self.assertEquals(sv, _convert_to_vector(lil))
self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
self.assertEquals(sv, _convert_to_vector(lil.todok()))
def serialize(l):
return ser.loads(ser.dumps(_convert_to_vector(l)))
self.assertEquals(sv, serialize(lil))
self.assertEquals(sv, serialize(lil.tocsc()))
self.assertEquals(sv, serialize(lil.tocsr()))
self.assertEquals(sv, serialize(lil.todok()))
def test_dot(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
dv = DenseVector(array([1., 2., 3., 4.]))
self.assertEquals(10.0, dv.dot(lil))
def test_squared_distance(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 3
lil[3, 0] = 2
dv = DenseVector(array([1., 2., 3., 4.]))
sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
self.assertEquals(15.0, dv.squared_distance(lil))
self.assertEquals(15.0, sv.squared_distance(lil))
def scipy_matrix(self, size, values):
"""Create a column SciPy matrix from a dictionary of values"""
from scipy.sparse import lil_matrix
lil = lil_matrix((size, 1))
for key, value in values.items():
lil[key, 0] = value
return lil
def test_clustering(self):
from pyspark.mllib.clustering import KMeans
data = [
self.scipy_matrix(3, {1: 1.0}),
self.scipy_matrix(3, {1: 1.1}),
self.scipy_matrix(3, {2: 1.0}),
self.scipy_matrix(3, {2: 1.1})
]
clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree
data = [
LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
]
rdd = self.sc.parallelize(data)
features = [p.features for p in data]
lr_model = LogisticRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
svm_model = SVMWithSGD.train(rdd)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)
nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = DecisionTree.trainClassifier(rdd, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
data = [
LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
]
rdd = self.sc.parallelize(data)
features = [p.features for p in data]
lr_model = LinearRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
lasso_model = LassoWithSGD.train(rdd)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
rr_model = RidgeRegressionWithSGD.train(rdd)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
if __name__ == "__main__":
if not _have_scipy:
print "NOTE: Skipping SciPy tests as it does not seem to be installed"
unittest.main()
if not _have_scipy:
print "NOTE: SciPy tests were skipped as it does not seem to be installed"