[SPARK-6227][MLLIB][PYSPARK] Implement PySpark wrappers for SVD and PCA (v2)
Add PCA and SVD to PySpark's wrappers for `RowMatrix` and `IndexedRowMatrix` (SVD only). Based on #7963, updated. ## How was this patch tested? New doc tests and unit tests. Ran all examples locally. Author: MechCoder <manojkumarsivaraj334@gmail.com> Author: Nick Pentreath <nickp@za.ibm.com> Closes #17621 from MLnick/SPARK-6227-pyspark-svd-pca.
This commit is contained in:
parent
6235132a8c
commit
db2fb84b4a
|
@ -76,13 +76,14 @@ Refer to the [`SingularValueDecomposition` Java docs](api/java/org/apache/spark/
|
|||
|
||||
The same code applies to `IndexedRowMatrix` if `U` is defined as an
|
||||
`IndexedRowMatrix`.
|
||||
</div>
|
||||
<div data-lang="python" markdown="1">
|
||||
Refer to the [`SingularValueDecomposition` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.SingularValueDecomposition) for details on the API.
|
||||
|
||||
In order to run the above application, follow the instructions
|
||||
provided in the [Self-Contained
|
||||
Applications](quick-start.html#self-contained-applications) section of the Spark
|
||||
quick-start guide. Be sure to also include *spark-mllib* to your build file as
|
||||
a dependency.
|
||||
{% include_example python/mllib/svd_example.py %}
|
||||
|
||||
The same code applies to `IndexedRowMatrix` if `U` is defined as an
|
||||
`IndexedRowMatrix`.
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
@ -118,17 +119,21 @@ Refer to the [`PCA` Scala docs](api/scala/index.html#org.apache.spark.mllib.feat
|
|||
|
||||
The following code demonstrates how to compute principal components on a `RowMatrix`
|
||||
and use them to project the vectors into a low-dimensional space.
|
||||
The number of columns should be small, e.g, less than 1000.
|
||||
|
||||
Refer to the [`RowMatrix` Java docs](api/java/org/apache/spark/mllib/linalg/distributed/RowMatrix.html) for details on the API.
|
||||
|
||||
{% include_example java/org/apache/spark/examples/mllib/JavaPCAExample.java %}
|
||||
|
||||
</div>
|
||||
</div>
|
||||
|
||||
In order to run the above application, follow the instructions
|
||||
provided in the [Self-Contained Applications](quick-start.html#self-contained-applications)
|
||||
section of the Spark
|
||||
quick-start guide. Be sure to also include *spark-mllib* to your build file as
|
||||
a dependency.
|
||||
<div data-lang="python" markdown="1">
|
||||
|
||||
The following code demonstrates how to compute principal components on a `RowMatrix`
|
||||
and use them to project the vectors into a low-dimensional space.
|
||||
|
||||
Refer to the [`RowMatrix` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix) for details on the API.
|
||||
|
||||
{% include_example python/mllib/pca_rowmatrix_example.py %}
|
||||
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
package org.apache.spark.examples.mllib;
|
||||
|
||||
// $example on$
|
||||
import java.util.LinkedList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
// $example off$
|
||||
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -39,21 +40,25 @@ public class JavaPCAExample {
|
|||
public static void main(String[] args) {
|
||||
SparkConf conf = new SparkConf().setAppName("PCA Example");
|
||||
SparkContext sc = new SparkContext(conf);
|
||||
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
|
||||
|
||||
// $example on$
|
||||
double[][] array = {{1.12, 2.05, 3.12}, {5.56, 6.28, 8.94}, {10.2, 8.0, 20.5}};
|
||||
LinkedList<Vector> rowsList = new LinkedList<>();
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
Vector currentRow = Vectors.dense(array[i]);
|
||||
rowsList.add(currentRow);
|
||||
}
|
||||
JavaRDD<Vector> rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList);
|
||||
List<Vector> data = Arrays.asList(
|
||||
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
|
||||
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
|
||||
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
|
||||
);
|
||||
|
||||
JavaRDD<Vector> rows = jsc.parallelize(data);
|
||||
|
||||
// Create a RowMatrix from JavaRDD<Vector>.
|
||||
RowMatrix mat = new RowMatrix(rows.rdd());
|
||||
|
||||
// Compute the top 3 principal components.
|
||||
Matrix pc = mat.computePrincipalComponents(3);
|
||||
// Compute the top 4 principal components.
|
||||
// Principal components are stored in a local dense matrix.
|
||||
Matrix pc = mat.computePrincipalComponents(4);
|
||||
|
||||
// Project the rows to the linear space spanned by the top 4 principal components.
|
||||
RowMatrix projected = mat.multiply(pc);
|
||||
// $example off$
|
||||
Vector[] collectPartitions = (Vector[])projected.rows().collect();
|
||||
|
@ -61,6 +66,6 @@ public class JavaPCAExample {
|
|||
for (Vector vector : collectPartitions) {
|
||||
System.out.println("\t" + vector);
|
||||
}
|
||||
sc.stop();
|
||||
jsc.stop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
package org.apache.spark.examples.mllib;
|
||||
|
||||
// $example on$
|
||||
import java.util.LinkedList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
// $example off$
|
||||
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -43,22 +44,22 @@ public class JavaSVDExample {
|
|||
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
|
||||
|
||||
// $example on$
|
||||
double[][] array = {{1.12, 2.05, 3.12}, {5.56, 6.28, 8.94}, {10.2, 8.0, 20.5}};
|
||||
LinkedList<Vector> rowsList = new LinkedList<>();
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
Vector currentRow = Vectors.dense(array[i]);
|
||||
rowsList.add(currentRow);
|
||||
}
|
||||
JavaRDD<Vector> rows = jsc.parallelize(rowsList);
|
||||
List<Vector> data = Arrays.asList(
|
||||
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
|
||||
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
|
||||
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
|
||||
);
|
||||
|
||||
JavaRDD<Vector> rows = jsc.parallelize(data);
|
||||
|
||||
// Create a RowMatrix from JavaRDD<Vector>.
|
||||
RowMatrix mat = new RowMatrix(rows.rdd());
|
||||
|
||||
// Compute the top 3 singular values and corresponding singular vectors.
|
||||
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(3, true, 1.0E-9d);
|
||||
RowMatrix U = svd.U();
|
||||
Vector s = svd.s();
|
||||
Matrix V = svd.V();
|
||||
// Compute the top 5 singular values and corresponding singular vectors.
|
||||
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
|
||||
RowMatrix U = svd.U(); // The U factor is a RowMatrix.
|
||||
Vector s = svd.s(); // The singular values are stored in a local dense vector.
|
||||
Matrix V = svd.V(); // The V factor is a local dense matrix.
|
||||
// $example off$
|
||||
Vector[] collectPartitions = (Vector[]) U.rows().collect();
|
||||
System.out.println("U factor is:");
|
||||
|
|
46
examples/src/main/python/mllib/pca_rowmatrix_example.py
Normal file
46
examples/src/main/python/mllib/pca_rowmatrix_example.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from pyspark import SparkContext
|
||||
# $example on$
|
||||
from pyspark.mllib.linalg import Vectors
|
||||
from pyspark.mllib.linalg.distributed import RowMatrix
|
||||
# $example off$
|
||||
|
||||
if __name__ == "__main__":
|
||||
sc = SparkContext(appName="PythonPCAOnRowMatrixExample")
|
||||
|
||||
# $example on$
|
||||
rows = sc.parallelize([
|
||||
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
|
||||
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
|
||||
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
|
||||
])
|
||||
|
||||
mat = RowMatrix(rows)
|
||||
# Compute the top 4 principal components.
|
||||
# Principal components are stored in a local dense matrix.
|
||||
pc = mat.computePrincipalComponents(4)
|
||||
|
||||
# Project the rows to the linear space spanned by the top 4 principal components.
|
||||
projected = mat.multiply(pc)
|
||||
# $example off$
|
||||
collected = projected.rows.collect()
|
||||
print("Projected Row Matrix of principal component:")
|
||||
for vector in collected:
|
||||
print(vector)
|
||||
sc.stop()
|
48
examples/src/main/python/mllib/svd_example.py
Normal file
48
examples/src/main/python/mllib/svd_example.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from pyspark import SparkContext
|
||||
# $example on$
|
||||
from pyspark.mllib.linalg import Vectors
|
||||
from pyspark.mllib.linalg.distributed import RowMatrix
|
||||
# $example off$
|
||||
|
||||
if __name__ == "__main__":
|
||||
sc = SparkContext(appName="PythonSVDExample")
|
||||
|
||||
# $example on$
|
||||
rows = sc.parallelize([
|
||||
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
|
||||
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
|
||||
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
|
||||
])
|
||||
|
||||
mat = RowMatrix(rows)
|
||||
|
||||
# Compute the top 5 singular values and corresponding singular vectors.
|
||||
svd = mat.computeSVD(5, computeU=True)
|
||||
U = svd.U # The U factor is a RowMatrix.
|
||||
s = svd.s # The singular values are stored in a local dense vector.
|
||||
V = svd.V # The V factor is a local dense matrix.
|
||||
# $example off$
|
||||
collected = U.rows.collect()
|
||||
print("U factor is:")
|
||||
for vector in collected:
|
||||
print(vector)
|
||||
print("Singular values are: %s" % s)
|
||||
print("V factor is:\n%s" % V)
|
||||
sc.stop()
|
|
@ -39,9 +39,9 @@ object PCAOnRowMatrixExample {
|
|||
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
|
||||
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
|
||||
|
||||
val dataRDD = sc.parallelize(data, 2)
|
||||
val rows = sc.parallelize(data)
|
||||
|
||||
val mat: RowMatrix = new RowMatrix(dataRDD)
|
||||
val mat: RowMatrix = new RowMatrix(rows)
|
||||
|
||||
// Compute the top 4 principal components.
|
||||
// Principal components are stored in a local dense matrix.
|
||||
|
|
|
@ -28,6 +28,9 @@ import org.apache.spark.mllib.linalg.Vectors
|
|||
import org.apache.spark.mllib.linalg.distributed.RowMatrix
|
||||
// $example off$
|
||||
|
||||
/**
|
||||
* Example for SingularValueDecomposition.
|
||||
*/
|
||||
object SVDExample {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
@ -41,15 +44,15 @@ object SVDExample {
|
|||
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
|
||||
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
|
||||
|
||||
val dataRDD = sc.parallelize(data, 2)
|
||||
val rows = sc.parallelize(data)
|
||||
|
||||
val mat: RowMatrix = new RowMatrix(dataRDD)
|
||||
val mat: RowMatrix = new RowMatrix(rows)
|
||||
|
||||
// Compute the top 5 singular values and corresponding singular vectors.
|
||||
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, computeU = true)
|
||||
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
|
||||
val s: Vector = svd.s // The singular values are stored in a local dense vector.
|
||||
val V: Matrix = svd.V // The V factor is a local dense matrix.
|
||||
val s: Vector = svd.s // The singular values are stored in a local dense vector.
|
||||
val V: Matrix = svd.V // The V factor is a local dense matrix.
|
||||
// $example off$
|
||||
val collect = U.rows.collect()
|
||||
println("U factor is:")
|
||||
|
|
|
@ -28,14 +28,13 @@ from py4j.java_gateway import JavaObject
|
|||
|
||||
from pyspark import RDD, since
|
||||
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
|
||||
from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition
|
||||
from pyspark.mllib.linalg import _convert_to_vector, DenseMatrix, Matrix, QRDecomposition
|
||||
from pyspark.mllib.stat import MultivariateStatisticalSummary
|
||||
from pyspark.storagelevel import StorageLevel
|
||||
|
||||
|
||||
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
|
||||
'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix',
|
||||
'BlockMatrix']
|
||||
__all__ = ['BlockMatrix', 'CoordinateMatrix', 'DistributedMatrix', 'IndexedRow',
|
||||
'IndexedRowMatrix', 'MatrixEntry', 'RowMatrix', 'SingularValueDecomposition']
|
||||
|
||||
|
||||
class DistributedMatrix(object):
|
||||
|
@ -301,6 +300,136 @@ class RowMatrix(DistributedMatrix):
|
|||
R = decomp.call("R")
|
||||
return QRDecomposition(Q, R)
|
||||
|
||||
@since('2.2.0')
|
||||
def computeSVD(self, k, computeU=False, rCond=1e-9):
|
||||
"""
|
||||
Computes the singular value decomposition of the RowMatrix.
|
||||
|
||||
The given row matrix A of dimension (m X n) is decomposed into
|
||||
U * s * V'T where
|
||||
|
||||
* U: (m X k) (left singular vectors) is a RowMatrix whose
|
||||
columns are the eigenvectors of (A X A')
|
||||
* s: DenseVector consisting of square root of the eigenvalues
|
||||
(singular values) in descending order.
|
||||
* v: (n X k) (right singular vectors) is a Matrix whose columns
|
||||
are the eigenvectors of (A' X A)
|
||||
|
||||
For more specific details on implementation, please refer
|
||||
the Scala documentation.
|
||||
|
||||
:param k: Number of leading singular values to keep (`0 < k <= n`).
|
||||
It might return less than k if there are numerically zero singular values
|
||||
or there are not enough Ritz values converged before the maximum number of
|
||||
Arnoldi update iterations is reached (in case that matrix A is ill-conditioned).
|
||||
:param computeU: Whether or not to compute U. If set to be
|
||||
True, then U is computed by A * V * s^-1
|
||||
:param rCond: Reciprocal condition number. All singular values
|
||||
smaller than rCond * s[0] are treated as zero
|
||||
where s[0] is the largest singular value.
|
||||
:returns: :py:class:`SingularValueDecomposition`
|
||||
|
||||
>>> rows = sc.parallelize([[3, 1, 1], [-1, 3, 1]])
|
||||
>>> rm = RowMatrix(rows)
|
||||
|
||||
>>> svd_model = rm.computeSVD(2, True)
|
||||
>>> svd_model.U.rows.collect()
|
||||
[DenseVector([-0.7071, 0.7071]), DenseVector([-0.7071, -0.7071])]
|
||||
>>> svd_model.s
|
||||
DenseVector([3.4641, 3.1623])
|
||||
>>> svd_model.V
|
||||
DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, 0.0], 0)
|
||||
"""
|
||||
j_model = self._java_matrix_wrapper.call(
|
||||
"computeSVD", int(k), bool(computeU), float(rCond))
|
||||
return SingularValueDecomposition(j_model)
|
||||
|
||||
@since('2.2.0')
|
||||
def computePrincipalComponents(self, k):
|
||||
"""
|
||||
Computes the k principal components of the given row matrix
|
||||
|
||||
.. note:: This cannot be computed on matrices with more than 65535 columns.
|
||||
|
||||
:param k: Number of principal components to keep.
|
||||
:returns: :py:class:`pyspark.mllib.linalg.DenseMatrix`
|
||||
|
||||
>>> rows = sc.parallelize([[1, 2, 3], [2, 4, 5], [3, 6, 1]])
|
||||
>>> rm = RowMatrix(rows)
|
||||
|
||||
>>> # Returns the two principal components of rm
|
||||
>>> pca = rm.computePrincipalComponents(2)
|
||||
>>> pca
|
||||
DenseMatrix(3, 2, [-0.349, -0.6981, 0.6252, -0.2796, -0.5592, -0.7805], 0)
|
||||
|
||||
>>> # Transform into new dimensions with the greatest variance.
|
||||
>>> rm.multiply(pca).rows.collect() # doctest: +NORMALIZE_WHITESPACE
|
||||
[DenseVector([0.1305, -3.7394]), DenseVector([-0.3642, -6.6983]), \
|
||||
DenseVector([-4.6102, -4.9745])]
|
||||
"""
|
||||
return self._java_matrix_wrapper.call("computePrincipalComponents", k)
|
||||
|
||||
@since('2.2.0')
|
||||
def multiply(self, matrix):
|
||||
"""
|
||||
Multiply this matrix by a local dense matrix on the right.
|
||||
|
||||
:param matrix: a local dense matrix whose number of rows must match the number of columns
|
||||
of this matrix
|
||||
:returns: :py:class:`RowMatrix`
|
||||
|
||||
>>> rm = RowMatrix(sc.parallelize([[0, 1], [2, 3]]))
|
||||
>>> rm.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect()
|
||||
[DenseVector([2.0, 3.0]), DenseVector([6.0, 11.0])]
|
||||
"""
|
||||
if not isinstance(matrix, DenseMatrix):
|
||||
raise ValueError("Only multiplication with DenseMatrix "
|
||||
"is supported.")
|
||||
j_model = self._java_matrix_wrapper.call("multiply", matrix)
|
||||
return RowMatrix(j_model)
|
||||
|
||||
|
||||
class SingularValueDecomposition(JavaModelWrapper):
|
||||
"""
|
||||
Represents singular value decomposition (SVD) factors.
|
||||
|
||||
.. versionadded:: 2.2.0
|
||||
"""
|
||||
|
||||
@property
|
||||
@since('2.2.0')
|
||||
def U(self):
|
||||
"""
|
||||
Returns a distributed matrix whose columns are the left
|
||||
singular vectors of the SingularValueDecomposition if computeU was set to be True.
|
||||
"""
|
||||
u = self.call("U")
|
||||
if u is not None:
|
||||
mat_name = u.getClass().getSimpleName()
|
||||
if mat_name == "RowMatrix":
|
||||
return RowMatrix(u)
|
||||
elif mat_name == "IndexedRowMatrix":
|
||||
return IndexedRowMatrix(u)
|
||||
else:
|
||||
raise TypeError("Expected RowMatrix/IndexedRowMatrix got %s" % mat_name)
|
||||
|
||||
@property
|
||||
@since('2.2.0')
|
||||
def s(self):
|
||||
"""
|
||||
Returns a DenseVector with singular values in descending order.
|
||||
"""
|
||||
return self.call("s")
|
||||
|
||||
@property
|
||||
@since('2.2.0')
|
||||
def V(self):
|
||||
"""
|
||||
Returns a DenseMatrix whose columns are the right singular
|
||||
vectors of the SingularValueDecomposition.
|
||||
"""
|
||||
return self.call("V")
|
||||
|
||||
|
||||
class IndexedRow(object):
|
||||
"""
|
||||
|
@ -528,6 +657,68 @@ class IndexedRowMatrix(DistributedMatrix):
|
|||
colsPerBlock)
|
||||
return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
|
||||
|
||||
@since('2.2.0')
|
||||
def computeSVD(self, k, computeU=False, rCond=1e-9):
|
||||
"""
|
||||
Computes the singular value decomposition of the IndexedRowMatrix.
|
||||
|
||||
The given row matrix A of dimension (m X n) is decomposed into
|
||||
U * s * V'T where
|
||||
|
||||
* U: (m X k) (left singular vectors) is a IndexedRowMatrix
|
||||
whose columns are the eigenvectors of (A X A')
|
||||
* s: DenseVector consisting of square root of the eigenvalues
|
||||
(singular values) in descending order.
|
||||
* v: (n X k) (right singular vectors) is a Matrix whose columns
|
||||
are the eigenvectors of (A' X A)
|
||||
|
||||
For more specific details on implementation, please refer
|
||||
the scala documentation.
|
||||
|
||||
:param k: Number of leading singular values to keep (`0 < k <= n`).
|
||||
It might return less than k if there are numerically zero singular values
|
||||
or there are not enough Ritz values converged before the maximum number of
|
||||
Arnoldi update iterations is reached (in case that matrix A is ill-conditioned).
|
||||
:param computeU: Whether or not to compute U. If set to be
|
||||
True, then U is computed by A * V * s^-1
|
||||
:param rCond: Reciprocal condition number. All singular values
|
||||
smaller than rCond * s[0] are treated as zero
|
||||
where s[0] is the largest singular value.
|
||||
:returns: SingularValueDecomposition object
|
||||
|
||||
>>> rows = [(0, (3, 1, 1)), (1, (-1, 3, 1))]
|
||||
>>> irm = IndexedRowMatrix(sc.parallelize(rows))
|
||||
>>> svd_model = irm.computeSVD(2, True)
|
||||
>>> svd_model.U.rows.collect() # doctest: +NORMALIZE_WHITESPACE
|
||||
[IndexedRow(0, [-0.707106781187,0.707106781187]),\
|
||||
IndexedRow(1, [-0.707106781187,-0.707106781187])]
|
||||
>>> svd_model.s
|
||||
DenseVector([3.4641, 3.1623])
|
||||
>>> svd_model.V
|
||||
DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, 0.0], 0)
|
||||
"""
|
||||
j_model = self._java_matrix_wrapper.call(
|
||||
"computeSVD", int(k), bool(computeU), float(rCond))
|
||||
return SingularValueDecomposition(j_model)
|
||||
|
||||
@since('2.2.0')
|
||||
def multiply(self, matrix):
|
||||
"""
|
||||
Multiply this matrix by a local dense matrix on the right.
|
||||
|
||||
:param matrix: a local dense matrix whose number of rows must match the number of columns
|
||||
of this matrix
|
||||
:returns: :py:class:`IndexedRowMatrix`
|
||||
|
||||
>>> mat = IndexedRowMatrix(sc.parallelize([(0, (0, 1)), (1, (2, 3))]))
|
||||
>>> mat.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect()
|
||||
[IndexedRow(0, [2.0,3.0]), IndexedRow(1, [6.0,11.0])]
|
||||
"""
|
||||
if not isinstance(matrix, DenseMatrix):
|
||||
raise ValueError("Only multiplication with DenseMatrix "
|
||||
"is supported.")
|
||||
return IndexedRowMatrix(self._java_matrix_wrapper.call("multiply", matrix))
|
||||
|
||||
|
||||
class MatrixEntry(object):
|
||||
"""
|
||||
|
|
|
@ -23,6 +23,7 @@ import os
|
|||
import sys
|
||||
import tempfile
|
||||
import array as pyarray
|
||||
from math import sqrt
|
||||
from time import time, sleep
|
||||
from shutil import rmtree
|
||||
|
||||
|
@ -54,6 +55,7 @@ from pyspark.mllib.common import _to_java_object_rdd
|
|||
from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
|
||||
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
|
||||
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
|
||||
from pyspark.mllib.linalg.distributed import RowMatrix
|
||||
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
|
||||
from pyspark.mllib.recommendation import Rating
|
||||
from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
|
||||
|
@ -1699,6 +1701,67 @@ class HashingTFTest(MLlibTestCase):
|
|||
": expected " + str(expected[i]) + ", got " + str(output[i]))
|
||||
|
||||
|
||||
class DimensionalityReductionTests(MLlibTestCase):
|
||||
|
||||
denseData = [
|
||||
Vectors.dense([0.0, 1.0, 2.0]),
|
||||
Vectors.dense([3.0, 4.0, 5.0]),
|
||||
Vectors.dense([6.0, 7.0, 8.0]),
|
||||
Vectors.dense([9.0, 0.0, 1.0])
|
||||
]
|
||||
sparseData = [
|
||||
Vectors.sparse(3, [(1, 1.0), (2, 2.0)]),
|
||||
Vectors.sparse(3, [(0, 3.0), (1, 4.0), (2, 5.0)]),
|
||||
Vectors.sparse(3, [(0, 6.0), (1, 7.0), (2, 8.0)]),
|
||||
Vectors.sparse(3, [(0, 9.0), (2, 1.0)])
|
||||
]
|
||||
|
||||
def assertEqualUpToSign(self, vecA, vecB):
|
||||
eq1 = vecA - vecB
|
||||
eq2 = vecA + vecB
|
||||
self.assertTrue(sum(abs(eq1)) < 1e-6 or sum(abs(eq2)) < 1e-6)
|
||||
|
||||
def test_svd(self):
|
||||
denseMat = RowMatrix(self.sc.parallelize(self.denseData))
|
||||
sparseMat = RowMatrix(self.sc.parallelize(self.sparseData))
|
||||
m = 4
|
||||
n = 3
|
||||
for mat in [denseMat, sparseMat]:
|
||||
for k in range(1, 4):
|
||||
rm = mat.computeSVD(k, computeU=True)
|
||||
self.assertEqual(rm.s.size, k)
|
||||
self.assertEqual(rm.U.numRows(), m)
|
||||
self.assertEqual(rm.U.numCols(), k)
|
||||
self.assertEqual(rm.V.numRows, n)
|
||||
self.assertEqual(rm.V.numCols, k)
|
||||
|
||||
# Test that U returned is None if computeU is set to False.
|
||||
self.assertEqual(mat.computeSVD(1).U, None)
|
||||
|
||||
# Test that low rank matrices cannot have number of singular values
|
||||
# greater than a limit.
|
||||
rm = RowMatrix(self.sc.parallelize(tile([1, 2, 3], (3, 1))))
|
||||
self.assertEqual(rm.computeSVD(3, False, 1e-6).s.size, 1)
|
||||
|
||||
def test_pca(self):
|
||||
expected_pcs = array([
|
||||
[0.0, 1.0, 0.0],
|
||||
[sqrt(2.0) / 2.0, 0.0, sqrt(2.0) / 2.0],
|
||||
[sqrt(2.0) / 2.0, 0.0, -sqrt(2.0) / 2.0]
|
||||
])
|
||||
n = 3
|
||||
denseMat = RowMatrix(self.sc.parallelize(self.denseData))
|
||||
sparseMat = RowMatrix(self.sc.parallelize(self.sparseData))
|
||||
for mat in [denseMat, sparseMat]:
|
||||
for k in range(1, 4):
|
||||
pcs = mat.computePrincipalComponents(k)
|
||||
self.assertEqual(pcs.numRows, n)
|
||||
self.assertEqual(pcs.numCols, k)
|
||||
|
||||
# We can just test the updated principal component for equality.
|
||||
self.assertEqualUpToSign(pcs.toArray()[:, k - 1], expected_pcs[:, k - 1])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pyspark.mllib.tests import *
|
||||
if not _have_scipy:
|
||||
|
|
Loading…
Reference in a new issue