[SPARK-14906][ML] Copy linalg in PySpark to new ML package

## What changes were proposed in this pull request?

Copy the linalg (Vector/Matrix and VectorUDT/MatrixUDT) in PySpark to new ML package.

## How was this patch tested?
Existing tests.

Author: Xiangrui Meng <meng@databricks.com>
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #13099 from viirya/move-pyspark-vector-matrix-udt4.
This commit is contained in:
Xiangrui Meng 2016-05-17 00:08:02 -07:00
parent 95f4fbae52
commit 8ad9f08c94
3 changed files with 1564 additions and 45 deletions

View file

@ -41,6 +41,14 @@ pyspark.ml.clustering module
:undoc-members:
:inherited-members:
pyspark.ml.linalg module
----------------------------
.. automodule:: pyspark.ml.linalg
:members:
:undoc-members:
:inherited-members:
pyspark.ml.recommendation module
--------------------------------

File diff suppressed because it is too large Load diff

View file

@ -18,7 +18,6 @@
"""
Unit tests for Spark ML Python APIs.
"""
import array
import sys
if sys.version > '3':
xrange = range
@ -40,15 +39,21 @@ else:
from shutil import rmtree
import tempfile
import array as pyarray
import numpy as np
from numpy import (
array, array_equal, zeros, inf, random, exp, dot, all, mean, abs, arange, tile, ones)
from numpy import sum as array_sum
import inspect
from pyspark import keyword_only
from pyspark import keyword_only, SparkContext
from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer
from pyspark.ml.classification import *
from pyspark.ml.clustering import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import *
from pyspark.ml.linalg import Vector, SparseVector, DenseVector, VectorUDT,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT, _convert_to_vector
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.recommendation import ALS
@ -57,13 +62,28 @@ from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, \
from pyspark.ml.tuning import *
from pyspark.ml.wrapper import JavaParams
from pyspark.mllib.common import _java2py
from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector
from pyspark.mllib.linalg import SparseVector as OldSparseVector, DenseVector as OldDenseVector,\
DenseMatrix as OldDenseMatrix, MatrixUDT as OldMatrixUDT, SparseMatrix as OldSparseMatrix,\
Vectors as OldVectors, VectorUDT as OldVectorUDT
from pyspark.mllib.regression import LabeledPoint
from pyspark.serializers import PickleSerializer
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import rand
from pyspark.sql.utils import IllegalArgumentException
from pyspark.storagelevel import *
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
ser = PickleSerializer()
class MLlibTestCase(unittest.TestCase):
def setUp(self):
self.sc = SparkContext('local[4]', "MLlib tests")
self.spark = SparkSession(self.sc)
def tearDown(self):
self.spark.stop()
class SparkSessionTestCase(PySparkTestCase):
@classmethod
@ -142,23 +162,23 @@ class ParamTypeConversionTests(PySparkTestCase):
def test_vector(self):
ewp = ElementwiseProduct(scalingVec=[1, 3])
self.assertEqual(ewp.getScalingVec(), DenseVector([1.0, 3.0]))
self.assertEqual(ewp.getScalingVec(), OldDenseVector([1.0, 3.0]))
ewp = ElementwiseProduct(scalingVec=np.array([1.2, 3.4]))
self.assertEqual(ewp.getScalingVec(), DenseVector([1.2, 3.4]))
self.assertEqual(ewp.getScalingVec(), OldDenseVector([1.2, 3.4]))
self.assertRaises(TypeError, lambda: ElementwiseProduct(scalingVec=["a", "b"]))
def test_list(self):
l = [0, 1]
for lst_like in [l, np.array(l), DenseVector(l), SparseVector(len(l), range(len(l)), l),
array.array('l', l), xrange(2), tuple(l)]:
for lst_like in [l, np.array(l), OldDenseVector(l), OldSparseVector(len(l),
range(len(l)), l), pyarray.array('l', l), xrange(2), tuple(l)]:
converted = TypeConverters.toList(lst_like)
self.assertEqual(type(converted), list)
self.assertListEqual(converted, l)
def test_list_int(self):
for indices in [[1.0, 2.0], np.array([1.0, 2.0]), DenseVector([1.0, 2.0]),
SparseVector(2, {0: 1.0, 1: 2.0}), xrange(1, 3), (1.0, 2.0),
array.array('d', [1.0, 2.0])]:
for indices in [[1.0, 2.0], np.array([1.0, 2.0]), OldDenseVector([1.0, 2.0]),
OldSparseVector(2, {0: 1.0, 1: 2.0}), xrange(1, 3), (1.0, 2.0),
pyarray.array('d', [1.0, 2.0])]:
vs = VectorSlicer(indices=indices)
self.assertListEqual(vs.getIndices(), [1, 2])
self.assertTrue(all([type(v) == int for v in vs.getIndices()]))
@ -390,9 +410,9 @@ class FeatureTests(SparkSessionTestCase):
def test_idf(self):
dataset = self.spark.createDataFrame([
(DenseVector([1.0, 2.0]),),
(DenseVector([0.0, 1.0]),),
(DenseVector([3.0, 0.2]),)], ["tf"])
(OldDenseVector([1.0, 2.0]),),
(OldDenseVector([0.0, 1.0]),),
(OldDenseVector([3.0, 0.2]),)], ["tf"])
idf0 = IDF(inputCol="tf")
self.assertListEqual(idf0.params, [idf0.inputCol, idf0.minDocFreq, idf0.outputCol])
idf0m = idf0.fit(dataset, {idf0.outputCol: "idf"})
@ -437,10 +457,10 @@ class FeatureTests(SparkSessionTestCase):
def test_count_vectorizer_with_binary(self):
dataset = self.spark.createDataFrame([
(0, "a a a b b c".split(' '), SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),),
(1, "a a".split(' '), SparseVector(3, {0: 1.0}),),
(2, "a b".split(' '), SparseVector(3, {0: 1.0, 1: 1.0}),),
(3, "c".split(' '), SparseVector(3, {2: 1.0}),)], ["id", "words", "expected"])
(0, "a a a b b c".split(' '), OldSparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),),
(1, "a a".split(' '), OldSparseVector(3, {0: 1.0}),),
(2, "a b".split(' '), OldSparseVector(3, {0: 1.0, 1: 1.0}),),
(3, "c".split(' '), OldSparseVector(3, {2: 1.0}),)], ["id", "words", "expected"])
cv = CountVectorizer(binary=True, inputCol="words", outputCol="features")
model = cv.fit(dataset)
@ -561,11 +581,11 @@ class CrossValidatorTests(SparkSessionTestCase):
# Save/load for CrossValidator will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
[(OldVectors.dense([0.0]), 0.0),
(OldVectors.dense([0.4]), 1.0),
(OldVectors.dense([0.5]), 0.0),
(OldVectors.dense([0.6]), 1.0),
(OldVectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
@ -634,11 +654,11 @@ class TrainValidationSplitTests(SparkSessionTestCase):
# Save/load for TrainValidationSplit will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
[(OldVectors.dense([0.0]), 0.0),
(OldVectors.dense([0.4]), 1.0),
(OldVectors.dense([0.5]), 0.0),
(OldVectors.dense([0.6]), 1.0),
(OldVectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
@ -837,8 +857,8 @@ class LDATest(SparkSessionTestCase):
def test_persistence(self):
# Test save/load for LDA, LocalLDAModel, DistributedLDAModel.
df = self.spark.createDataFrame([
[1, Vectors.dense([0.0, 1.0])],
[2, Vectors.sparse(2, {0: 1.0})],
[1, OldVectors.dense([0.0, 1.0])],
[2, OldVectors.sparse(2, {0: 1.0})],
], ["id", "features"])
# Fit model
lda = LDA(k=2, seed=1, optimizer="em")
@ -873,9 +893,8 @@ class LDATest(SparkSessionTestCase):
class TrainingSummaryTest(SparkSessionTestCase):
def test_linear_regression_summary(self):
from pyspark.mllib.linalg import Vectors
df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
(0.0, 2.0, Vectors.sparse(1, [], []))],
df = self.spark.createDataFrame([(1.0, 2.0, OldVectors.dense(1.0)),
(0.0, 2.0, OldVectors.sparse(1, [], []))],
["label", "weight", "features"])
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight",
fitIntercept=False)
@ -947,9 +966,8 @@ class TrainingSummaryTest(SparkSessionTestCase):
self.assertAlmostEqual(sameSummary.deviance, s.deviance)
def test_logistic_regression_summary(self):
from pyspark.mllib.linalg import Vectors
df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
(0.0, 2.0, Vectors.sparse(1, [], []))],
df = self.spark.createDataFrame([(1.0, 2.0, OldVectors.dense(1.0)),
(0.0, 2.0, OldVectors.sparse(1, [], []))],
["label", "weight", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False)
model = lr.fit(df)
@ -978,9 +996,9 @@ class TrainingSummaryTest(SparkSessionTestCase):
class OneVsRestTests(SparkSessionTestCase):
def test_copy(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)),
(1.0, OldVectors.sparse(2, [], [])),
(2.0, OldVectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
@ -992,9 +1010,9 @@ class OneVsRestTests(SparkSessionTestCase):
self.assertEqual(model1.getPredictionCol(), "indexed")
def test_output_columns(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)),
(1.0, OldVectors.sparse(2, [], [])),
(2.0, OldVectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
@ -1004,9 +1022,9 @@ class OneVsRestTests(SparkSessionTestCase):
def test_save_load(self):
temp_path = tempfile.mkdtemp()
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)),
(1.0, OldVectors.sparse(2, [], [])),
(2.0, OldVectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
@ -1034,7 +1052,7 @@ class HashingTFTest(SparkSessionTestCase):
hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True)
output = hashingTF.transform(df)
features = output.select("features").first().features.toArray()
expected = Vectors.dense([1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]).toArray()
expected = OldVectors.dense([1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]).toArray()
for i in range(0, n):
self.assertAlmostEqual(features[i], expected[i], 14, "Error at " + str(i) +
": expected " + str(expected[i]) + ", got " + str(features[i]))
@ -1109,6 +1127,354 @@ class DefaultValuesTests(PySparkTestCase):
self.check_params(cls())
def _squared_distance(a, b):
if isinstance(a, Vector):
return a.squared_distance(b)
else:
return b.squared_distance(a)
class VectorTests(MLlibTestCase):
def _test_serialize(self, v):
self.assertEqual(v, ser.loads(ser.dumps(v)))
jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
nv = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvec)))
self.assertEqual(v, nv)
vs = [v] * 100
jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs)))
nvs = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvecs)))
self.assertEqual(vs, nvs)
def test_serialize(self):
# Because pickle path still uses old vector/matrix
# TODO: Change this to new vector/matrix when pickle for new vector/matrix is ready.
self._test_serialize(OldDenseVector(range(10)))
self._test_serialize(OldDenseVector(array([1., 2., 3., 4.])))
self._test_serialize(OldDenseVector(pyarray.array('d', range(10))))
self._test_serialize(OldSparseVector(4, {1: 1, 3: 2}))
self._test_serialize(OldSparseVector(3, {}))
self._test_serialize(OldDenseMatrix(2, 3, range(6)))
sm1 = OldSparseMatrix(
3, 4, [0, 2, 2, 4, 4], [1, 2, 1, 2], [1.0, 2.0, 4.0, 5.0])
self._test_serialize(sm1)
def test_dot(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = DenseVector(array([1., 2., 3., 4.]))
lst = DenseVector([1, 2, 3, 4])
mat = array([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]])
arr = pyarray.array('d', [0, 1, 2, 3])
self.assertEqual(10.0, sv.dot(dv))
self.assertTrue(array_equal(array([3., 6., 9., 12.]), sv.dot(mat)))
self.assertEqual(30.0, dv.dot(dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), dv.dot(mat)))
self.assertEqual(30.0, lst.dot(dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), lst.dot(mat)))
self.assertEqual(7.0, sv.dot(arr))
def test_squared_distance(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = DenseVector(array([1., 2., 3., 4.]))
lst = DenseVector([4, 3, 2, 1])
lst1 = [4, 3, 2, 1]
arr = pyarray.array('d', [0, 2, 1, 3])
narr = array([0, 2, 1, 3])
self.assertEqual(15.0, _squared_distance(sv, dv))
self.assertEqual(25.0, _squared_distance(sv, lst))
self.assertEqual(20.0, _squared_distance(dv, lst))
self.assertEqual(15.0, _squared_distance(dv, sv))
self.assertEqual(25.0, _squared_distance(lst, sv))
self.assertEqual(20.0, _squared_distance(lst, dv))
self.assertEqual(0.0, _squared_distance(sv, sv))
self.assertEqual(0.0, _squared_distance(dv, dv))
self.assertEqual(0.0, _squared_distance(lst, lst))
self.assertEqual(25.0, _squared_distance(sv, lst1))
self.assertEqual(3.0, _squared_distance(sv, arr))
self.assertEqual(3.0, _squared_distance(sv, narr))
def test_hash(self):
v1 = DenseVector([0.0, 1.0, 0.0, 5.5])
v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
v3 = DenseVector([0.0, 1.0, 0.0, 5.5])
v4 = SparseVector(4, [(1, 1.0), (3, 2.5)])
self.assertEqual(hash(v1), hash(v2))
self.assertEqual(hash(v1), hash(v3))
self.assertEqual(hash(v2), hash(v3))
self.assertFalse(hash(v1) == hash(v4))
self.assertFalse(hash(v2) == hash(v4))
def test_eq(self):
v1 = DenseVector([0.0, 1.0, 0.0, 5.5])
v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
v3 = DenseVector([0.0, 1.0, 0.0, 5.5])
v4 = SparseVector(6, [(1, 1.0), (3, 5.5)])
v5 = DenseVector([0.0, 1.0, 0.0, 2.5])
v6 = SparseVector(4, [(1, 1.0), (3, 2.5)])
self.assertEqual(v1, v2)
self.assertEqual(v1, v3)
self.assertFalse(v2 == v4)
self.assertFalse(v1 == v5)
self.assertFalse(v1 == v6)
def test_equals(self):
indices = [1, 2, 4]
values = [1., 3., 2.]
self.assertTrue(Vectors._equals(indices, values, list(range(5)), [0., 1., 3., 0., 2.]))
self.assertFalse(Vectors._equals(indices, values, list(range(5)), [0., 3., 1., 0., 2.]))
self.assertFalse(Vectors._equals(indices, values, list(range(5)), [0., 3., 0., 2.]))
self.assertFalse(Vectors._equals(indices, values, list(range(5)), [0., 1., 3., 2., 2.]))
def test_conversion(self):
# numpy arrays should be automatically upcast to float64
# tests for fix of [SPARK-5089]
v = array([1, 2, 3, 4], dtype='float64')
dv = DenseVector(v)
self.assertTrue(dv.array.dtype == 'float64')
v = array([1, 2, 3, 4], dtype='float32')
dv = DenseVector(v)
self.assertTrue(dv.array.dtype == 'float64')
def test_sparse_vector_indexing(self):
sv = SparseVector(5, {1: 1, 3: 2})
self.assertEqual(sv[0], 0.)
self.assertEqual(sv[3], 2.)
self.assertEqual(sv[1], 1.)
self.assertEqual(sv[2], 0.)
self.assertEqual(sv[4], 0.)
self.assertEqual(sv[-1], 0.)
self.assertEqual(sv[-2], 2.)
self.assertEqual(sv[-3], 0.)
self.assertEqual(sv[-5], 0.)
for ind in [5, -6]:
self.assertRaises(ValueError, sv.__getitem__, ind)
for ind in [7.8, '1']:
self.assertRaises(TypeError, sv.__getitem__, ind)
zeros = SparseVector(4, {})
self.assertEqual(zeros[0], 0.0)
self.assertEqual(zeros[3], 0.0)
for ind in [4, -5]:
self.assertRaises(ValueError, zeros.__getitem__, ind)
empty = SparseVector(0, {})
for ind in [-1, 0, 1]:
self.assertRaises(ValueError, empty.__getitem__, ind)
def test_matrix_indexing(self):
mat = DenseMatrix(3, 2, [0, 1, 4, 6, 8, 10])
expected = [[0, 6], [1, 8], [4, 10]]
for i in range(3):
for j in range(2):
self.assertEqual(mat[i, j], expected[i][j])
def test_repr_dense_matrix(self):
mat = DenseMatrix(3, 2, [0, 1, 4, 6, 8, 10])
self.assertTrue(
repr(mat),
'DenseMatrix(3, 2, [0.0, 1.0, 4.0, 6.0, 8.0, 10.0], False)')
mat = DenseMatrix(3, 2, [0, 1, 4, 6, 8, 10], True)
self.assertTrue(
repr(mat),
'DenseMatrix(3, 2, [0.0, 1.0, 4.0, 6.0, 8.0, 10.0], False)')
mat = DenseMatrix(6, 3, zeros(18))
self.assertTrue(
repr(mat),
'DenseMatrix(6, 3, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..., \
0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], False)')
def test_repr_sparse_matrix(self):
sm1t = SparseMatrix(
3, 4, [0, 2, 3, 5], [0, 1, 2, 0, 2], [3.0, 2.0, 4.0, 9.0, 8.0],
isTransposed=True)
self.assertTrue(
repr(sm1t),
'SparseMatrix(3, 4, [0, 2, 3, 5], [0, 1, 2, 0, 2], [3.0, 2.0, 4.0, 9.0, 8.0], True)')
indices = tile(arange(6), 3)
values = ones(18)
sm = SparseMatrix(6, 3, [0, 6, 12, 18], indices, values)
self.assertTrue(
repr(sm), "SparseMatrix(6, 3, [0, 6, 12, 18], \
[0, 1, 2, 3, 4, 5, 0, 1, ..., 4, 5, 0, 1, 2, 3, 4, 5], \
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ..., \
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], False)")
self.assertTrue(
str(sm),
"6 X 3 CSCMatrix\n\
(0,0) 1.0\n(1,0) 1.0\n(2,0) 1.0\n(3,0) 1.0\n(4,0) 1.0\n(5,0) 1.0\n\
(0,1) 1.0\n(1,1) 1.0\n(2,1) 1.0\n(3,1) 1.0\n(4,1) 1.0\n(5,1) 1.0\n\
(0,2) 1.0\n(1,2) 1.0\n(2,2) 1.0\n(3,2) 1.0\n..\n..")
sm = SparseMatrix(1, 18, zeros(19), [], [])
self.assertTrue(
repr(sm),
'SparseMatrix(1, 18, \
[0, 0, 0, 0, 0, 0, 0, 0, ..., 0, 0, 0, 0, 0, 0, 0, 0], [], [], False)')
def test_sparse_matrix(self):
# Test sparse matrix creation.
sm1 = SparseMatrix(
3, 4, [0, 2, 2, 4, 4], [1, 2, 1, 2], [1.0, 2.0, 4.0, 5.0])
self.assertEqual(sm1.numRows, 3)
self.assertEqual(sm1.numCols, 4)
self.assertEqual(sm1.colPtrs.tolist(), [0, 2, 2, 4, 4])
self.assertEqual(sm1.rowIndices.tolist(), [1, 2, 1, 2])
self.assertEqual(sm1.values.tolist(), [1.0, 2.0, 4.0, 5.0])
self.assertTrue(
repr(sm1),
'SparseMatrix(3, 4, [0, 2, 2, 4, 4], [1, 2, 1, 2], [1.0, 2.0, 4.0, 5.0], False)')
# Test indexing
expected = [
[0, 0, 0, 0],
[1, 0, 4, 0],
[2, 0, 5, 0]]
for i in range(3):
for j in range(4):
self.assertEqual(expected[i][j], sm1[i, j])
self.assertTrue(array_equal(sm1.toArray(), expected))
# Test conversion to dense and sparse.
smnew = sm1.toDense().toSparse()
self.assertEqual(sm1.numRows, smnew.numRows)
self.assertEqual(sm1.numCols, smnew.numCols)
self.assertTrue(array_equal(sm1.colPtrs, smnew.colPtrs))
self.assertTrue(array_equal(sm1.rowIndices, smnew.rowIndices))
self.assertTrue(array_equal(sm1.values, smnew.values))
sm1t = SparseMatrix(
3, 4, [0, 2, 3, 5], [0, 1, 2, 0, 2], [3.0, 2.0, 4.0, 9.0, 8.0],
isTransposed=True)
self.assertEqual(sm1t.numRows, 3)
self.assertEqual(sm1t.numCols, 4)
self.assertEqual(sm1t.colPtrs.tolist(), [0, 2, 3, 5])
self.assertEqual(sm1t.rowIndices.tolist(), [0, 1, 2, 0, 2])
self.assertEqual(sm1t.values.tolist(), [3.0, 2.0, 4.0, 9.0, 8.0])
expected = [
[3, 2, 0, 0],
[0, 0, 4, 0],
[9, 0, 8, 0]]
for i in range(3):
for j in range(4):
self.assertEqual(expected[i][j], sm1t[i, j])
self.assertTrue(array_equal(sm1t.toArray(), expected))
def test_dense_matrix_is_transposed(self):
mat1 = DenseMatrix(3, 2, [0, 4, 1, 6, 3, 9], isTransposed=True)
mat = DenseMatrix(3, 2, [0, 1, 3, 4, 6, 9])
self.assertEqual(mat1, mat)
expected = [[0, 4], [1, 6], [3, 9]]
for i in range(3):
for j in range(2):
self.assertEqual(mat1[i, j], expected[i][j])
self.assertTrue(array_equal(mat1.toArray(), expected))
sm = mat1.toSparse()
self.assertTrue(array_equal(sm.rowIndices, [1, 2, 0, 1, 2]))
self.assertTrue(array_equal(sm.colPtrs, [0, 2, 5]))
self.assertTrue(array_equal(sm.values, [1, 3, 4, 6, 9]))
def test_norms(self):
a = DenseVector([0, 2, 3, -1])
self.assertAlmostEqual(a.norm(2), 3.742, 3)
self.assertTrue(a.norm(1), 6)
self.assertTrue(a.norm(inf), 3)
a = SparseVector(4, [0, 2], [3, -4])
self.assertAlmostEqual(a.norm(2), 5)
self.assertTrue(a.norm(1), 7)
self.assertTrue(a.norm(inf), 4)
tmp = SparseVector(4, [0, 2], [3, 0])
self.assertEqual(tmp.numNonzeros(), 1)
class VectorUDTTests(MLlibTestCase):
dv0 = DenseVector([])
dv1 = DenseVector([1.0, 2.0])
sv0 = SparseVector(2, [], [])
sv1 = SparseVector(2, [1], [2.0])
udt = VectorUDT()
old_dv0 = OldDenseVector([])
old_dv1 = OldDenseVector([1.0, 2.0])
old_sv0 = OldSparseVector(2, [], [])
old_sv1 = OldSparseVector(2, [1], [2.0])
old_udt = OldVectorUDT()
def test_json_schema(self):
self.assertEqual(VectorUDT.fromJson(self.udt.jsonValue()), self.udt)
def test_serialization(self):
for v in [self.dv0, self.dv1, self.sv0, self.sv1]:
self.assertEqual(v, self.udt.deserialize(self.udt.serialize(v)))
def test_infer_schema(self):
rdd = self.sc.parallelize([LabeledPoint(1.0, self.old_dv1),
LabeledPoint(0.0, self.old_sv1)])
df = rdd.toDF()
schema = df.schema
field = [f for f in schema.fields if f.name == "features"][0]
self.assertEqual(field.dataType, self.old_udt)
vectors = df.rdd.map(lambda p: p.features).collect()
self.assertEqual(len(vectors), 2)
for v in vectors:
if isinstance(v, OldSparseVector):
self.assertEqual(v, self.old_sv1)
elif isinstance(v, OldDenseVector):
self.assertEqual(v, self.old_dv1)
else:
raise TypeError("expecting a vector but got %r of type %r" % (v, type(v)))
class MatrixUDTTests(MLlibTestCase):
dm1 = DenseMatrix(3, 2, [0, 1, 4, 5, 9, 10])
dm2 = DenseMatrix(3, 2, [0, 1, 4, 5, 9, 10], isTransposed=True)
sm1 = SparseMatrix(1, 1, [0, 1], [0], [2.0])
sm2 = SparseMatrix(2, 1, [0, 0, 1], [0], [5.0], isTransposed=True)
udt = MatrixUDT()
old_dm1 = OldDenseMatrix(3, 2, [0, 1, 4, 5, 9, 10])
old_dm2 = OldDenseMatrix(3, 2, [0, 1, 4, 5, 9, 10], isTransposed=True)
old_sm1 = OldSparseMatrix(1, 1, [0, 1], [0], [2.0])
old_sm2 = OldSparseMatrix(2, 1, [0, 0, 1], [0], [5.0], isTransposed=True)
old_udt = OldMatrixUDT()
def test_json_schema(self):
self.assertEqual(MatrixUDT.fromJson(self.udt.jsonValue()), self.udt)
def test_serialization(self):
for m in [self.dm1, self.dm2, self.sm1, self.sm2]:
self.assertEqual(m, self.udt.deserialize(self.udt.serialize(m)))
def test_infer_schema(self):
rdd = self.sc.parallelize([("dense", self.old_dm1), ("sparse", self.old_sm1)])
df = rdd.toDF()
schema = df.schema
self.assertTrue(schema.fields[1].dataType, self.old_udt)
matrices = df.rdd.map(lambda x: x._2).collect()
self.assertEqual(len(matrices), 2)
for m in matrices:
if isinstance(m, OldDenseMatrix):
self.assertTrue(m, self.old_dm1)
elif isinstance(m, OldSparseMatrix):
self.assertTrue(m, self.old_sm1)
else:
raise ValueError("Expected a matrix but got type %r" % type(m))
if __name__ == "__main__":
from pyspark.ml.tests import *
if xmlrunner: