spark-instrumented-optimizer/docs/mllib-data-types.md
Mike Dusenberry 571d5b5363 [SPARK-6485] [MLLIB] [PYTHON] Add CoordinateMatrix/RowMatrix/IndexedRowMatrix to PySpark.
This PR adds the RowMatrix, IndexedRowMatrix, and CoordinateMatrix distributed matrices to PySpark.  Each distributed matrix class acts as a wrapper around the Scala/Java counterpart by maintaining a reference to the Java object.  New distributed matrices can be created using factory methods added to DistributedMatrices, which creates the Java distributed matrix and then wraps it with the corresponding PySpark class.  This design allows for simple conversion between the various distributed matrices, and lets us re-use the Scala code.  Serialization between Python and Java is implemented using DataFrames as needed for IndexedRowMatrix and CoordinateMatrix for simplicity.  Associated documentation and unit-tests have also been added.  To facilitate code review, this PR implements access to the rows/entries as RDDs, the number of rows & columns, and conversions between the various distributed matrices (not including BlockMatrix), and does not implement the other linear algebra functions of the matrices, although this will be very simple to add now.

Author: Mike Dusenberry <mwdusenb@us.ibm.com>

Closes #7554 from dusenberrymw/SPARK-6485_Add_CoordinateMatrix_RowMatrix_IndexedMatrix_to_PySpark and squashes the following commits:

bb039cb [Mike Dusenberry] Minor documentation update.
b887c18 [Mike Dusenberry] Updating the matrix conversion logic again to make it even cleaner.  Now, we allow the 'rows' parameter in the constructors to be either an RDD or the Java matrix object. If 'rows' is an RDD, we create a Java matrix object, wrap it, and then store that.  If 'rows' is a Java matrix object of the correct type, we just wrap and store that directly.  This is only for internal usage, and publicly, we still require 'rows' to be an RDD.  We no longer store the 'rows' RDD, and instead just compute it from the Java object when needed.  The point of this is that when we do matrix conversions, we do the conversion on the Scala/Java side, which returns a Java object, so we should use that directly, but exposing 'java_matrix' parameter in the public API is not ideal. This non-public feature of allowing 'rows' to be a Java matrix object is documented in the '__init__' constructor docstrings, which are not part of the generated public API, and doctests are also included.
7f0dcb6 [Mike Dusenberry] Updating module docstring.
cfc1be5 [Mike Dusenberry] Use 'new SQLContext(matrix.rows.sparkContext)' rather than 'SQLContext.getOrCreate', as the later doesn't guarantee that the SparkContext will be the same as for the matrix.rows data.
687e345 [Mike Dusenberry] Improving conversion performance.  This adds an optional 'java_matrix' parameter to the constructors, and pulls the conversion logic out into a '_create_from_java' function. Now, if the constructors are given a valid Java distributed matrix object as 'java_matrix', they will store those internally, rather than create a new one on the Scala/Java side.
3e50b6e [Mike Dusenberry] Moving the distributed matrices to pyspark.mllib.linalg.distributed.
308f197 [Mike Dusenberry] Using properties for better documentation.
1633f86 [Mike Dusenberry] Minor documentation cleanup.
f0c13a7 [Mike Dusenberry] CoordinateMatrix should inherit from DistributedMatrix.
ffdd724 [Mike Dusenberry] Updating doctests to make documentation cleaner.
3fd4016 [Mike Dusenberry] Updating docstrings.
27cd5f6 [Mike Dusenberry] Simplifying input conversions in the constructors for each distributed matrix.
a409cf5 [Mike Dusenberry] Updating doctests to be less verbose by using lists instead of DenseVectors explicitly.
d19b0ba [Mike Dusenberry] Updating code and documentation to note that a vector-like object (numpy array, list, etc.) can be used in place of explicit Vector object, and adding conversions when necessary to RowMatrix construction.
4bd756d [Mike Dusenberry] Adding param documentation to IndexedRow and MatrixEntry.
c6bded5 [Mike Dusenberry] Move conversion logic from tuples to IndexedRow or MatrixEntry types from within the IndexedRowMatrix and CoordinateMatrix constructors to separate _convert_to_indexed_row and _convert_to_matrix_entry functions.
329638b [Mike Dusenberry] Moving the Experimental tag to the top of each docstring.
0be6826 [Mike Dusenberry] Simplifying doctests by removing duplicated rows/entries RDDs within the various tests.
c0900df [Mike Dusenberry] Adding the colons that were accidentally not inserted.
4ad6819 [Mike Dusenberry] Documenting the  and  parameters.
3b854b9 [Mike Dusenberry] Minor updates to documentation.
10046e8 [Mike Dusenberry] Updating documentation to use class constructors instead of the removed DistributedMatrices factory methods.
119018d [Mike Dusenberry] Adding static  methods to each of the distributed matrix classes to consolidate conversion logic.
4d7af86 [Mike Dusenberry] Adding type checks to the constructors.  Although it is slightly verbose, it is better for the user to have a good error message than a cryptic stacktrace.
93b6a3d [Mike Dusenberry] Pulling the DistributedMatrices Python class out of this pull request.
f6f3c68 [Mike Dusenberry] Pulling the DistributedMatrices Scala class out of this pull request.
6a3ecb7 [Mike Dusenberry] Updating pattern matching.
08f287b [Mike Dusenberry] Slight reformatting of the documentation.
a245dc0 [Mike Dusenberry] Updating Python doctests for compatability between Python 2 & 3. Since Python 3 removed the idea of a separate 'long' type, all values that would have been outputted as a 'long' (ex: '4L') will now be treated as an 'int' and outputed as one (ex: '4').  The doctests now explicitly convert to ints so that both Python 2 and 3 will have the same output.  This is fine since the values are all small, and thus can be easily represented as ints.
4d3a37e [Mike Dusenberry] Reformatting a few long Python doctest lines.
7e3ca16 [Mike Dusenberry] Fixing long lines.
f721ead [Mike Dusenberry] Updating documentation for each of the distributed matrices.
ab0e8b6 [Mike Dusenberry] Updating unit test to be more useful.
dda2f89 [Mike Dusenberry] Added wrappers for the conversions between the various distributed matrices.  Added logic to be able to access the rows/entries of the distributed matrices, which requires serialization through DataFrames for IndexedRowMatrix and CoordinateMatrix types. Added unit tests.
0cd7166 [Mike Dusenberry] Implemented the CoordinateMatrix API in PySpark, following the idea of the IndexedRowMatrix API, including using DataFrames for serialization.
3c369cb [Mike Dusenberry] Updating the architecture a bit to make conversions between the various distributed matrix types easier.  The different distributed matrix classes are now only wrappers around the Java objects, and take the Java object as an argument during construction.  This way, we can call  for example on an , which returns a reference to a Java RowMatrix object, and then construct a PySpark RowMatrix object wrapped around the Java object.  This is analogous to the behavior of PySpark RDDs and DataFrames.  We now delegate creation of the various distributed matrices from scratch in PySpark to the factory methods on .
4bdd09b [Mike Dusenberry] Implemented the IndexedRowMatrix API in PySpark, following the idea of the RowMatrix API.  Note that for the IndexedRowMatrix, we use DataFrames to serialize the data between Python and Scala/Java, so we accept PySpark RDDs, then convert to a DataFrame, then convert back to RDDs on the Scala/Java side before constructing the IndexedRowMatrix.
23bf1ec [Mike Dusenberry] Updating documentation to add PySpark RowMatrix. Inserting newline above doctest so that it renders properly in API docs.
b194623 [Mike Dusenberry] Updating design to have a PySpark RowMatrix simply create and keep a reference to a wrapper over a Java RowMatrix.  Updating DistributedMatrices factory methods to accept numRows and numCols with default values.  Updating PySpark DistributedMatrices factory method to simply create a PySpark RowMatrix. Adding additional doctests for numRows and numCols parameters.
bc2d220 [Mike Dusenberry] Adding unit tests for RowMatrix methods.
d7e316f [Mike Dusenberry] Implemented the RowMatrix API in PySpark by doing the following: Added a DistributedMatrices class to contain factory methods for creating the various distributed matrices.  Added a factory method for creating a RowMatrix from an RDD of Vectors.  Added a createRowMatrix function to the PythonMLlibAPI to interface with the factory method.  Added DistributedMatrix, DistributedMatrices, and RowMatrix classes to the pyspark.mllib.linalg api.
2015-08-04 16:30:03 -07:00

25 KiB

layout title displayTitle
global Data Types - MLlib <a href="mllib-guide.html">MLlib</a> - Data Types
  • Table of contents {:toc}

MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by Breeze and jblas. A training example used in supervised learning is called a "labeled point" in MLlib.

Local vector

A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.

The base class of local vectors is Vector, and we provide two implementations: DenseVector and SparseVector. We recommend using the factory methods implemented in Vectors to create local vectors.

{% highlight scala %} import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Create a dense vector (1.0, 0.0, 3.0). val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries. val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))) {% endhighlight %}

Note: Scala imports scala.collection.immutable.Vector by default, so you have to import org.apache.spark.mllib.linalg.Vector explicitly to use MLlib's Vector.

The base class of local vectors is Vector, and we provide two implementations: DenseVector and SparseVector. We recommend using the factory methods implemented in Vectors to create local vectors.

{% highlight java %} import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors;

// Create a dense vector (1.0, 0.0, 3.0). Vector dv = Vectors.dense(1.0, 0.0, 3.0); // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}); {% endhighlight %}

MLlib recognizes the following types as dense vectors:
  • NumPy's array
  • Python's list, e.g., [1, 2, 3]

and the following as sparse vectors:

We recommend using NumPy arrays over lists for efficiency, and using the factory methods implemented in Vectors to create sparse vectors.

{% highlight python %} import numpy as np import scipy.sparse as sps from pyspark.mllib.linalg import Vectors

Use a NumPy array as a dense vector.

dv1 = np.array([1.0, 0.0, 3.0])

Use a Python list as a dense vector.

dv2 = [1.0, 0.0, 3.0]

Create a SparseVector.

sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])

Use a single-column SciPy csc_matrix as a sparse vector.

sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape = (3, 1)) {% endhighlight %}

Labeled point

A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....

A labeled point is represented by the case class LabeledPoint.

{% highlight scala %} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint

// Create a labeled point with a positive label and a dense feature vector. val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector. val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) {% endhighlight %}

A labeled point is represented by LabeledPoint.

{% highlight java %} import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint;

// Create a labeled point with a positive label and a dense feature vector. LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));

// Create a labeled point with a negative label and a sparse feature vector. LabeledPoint neg = new LabeledPoint(1.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0})); {% endhighlight %}

A labeled point is represented by LabeledPoint.

{% highlight python %} from pyspark.mllib.linalg import SparseVector from pyspark.mllib.regression import LabeledPoint

Create a labeled point with a positive label and a dense feature vector.

pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

Create a labeled point with a negative label and a sparse feature vector.

neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0])) {% endhighlight %}

Sparse data

It is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:

label index1:value1 index2:value2 ...

where the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based.

MLUtils.loadLibSVMFile reads training examples stored in LIBSVM format.

{% highlight scala %} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD

val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") {% endhighlight %}

[`MLUtils.loadLibSVMFile`](api/java/org/apache/spark/mllib/util/MLUtils.html) reads training examples stored in LIBSVM format.

{% highlight java %} import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.api.java.JavaRDD;

JavaRDD examples = MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD(); {% endhighlight %}

[`MLUtils.loadLibSVMFile`](api/python/pyspark.mllib.html#pyspark.mllib.util.MLUtils) reads training examples stored in LIBSVM format.

{% highlight python %} from pyspark.mllib.util import MLUtils

examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") {% endhighlight %}

Local matrix

A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. MLlib supports dense matrices, whose entry values are stored in a single double array in column-major order, and sparse matrices, whose non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order. For example, the following dense matrix \[ \begin{pmatrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{pmatrix} \] is stored in a one-dimensional array [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] with the matrix size (3, 2).

The base class of local matrices is Matrix, and we provide two implementations: DenseMatrix, and SparseMatrix. We recommend using the factory methods implemented in Matrices to create local matrices. Remember, local matrices in MLlib are stored in column-major order.

{% highlight scala %} import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8)) {% endhighlight %}

The base class of local matrices is Matrix, and we provide two implementations: DenseMatrix, and SparseMatrix. We recommend using the factory methods implemented in Matrices to create local matrices. Remember, local matrices in MLlib are stored in column-major order.

{% highlight java %} import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.Matrices;

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) Matrix dm = Matrices.dense(3, 2, new double[] {1.0, 3.0, 5.0, 2.0, 4.0, 6.0});

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) Matrix sm = Matrices.sparse(3, 2, new int[] {0, 1, 3}, new int[] {0, 2, 1}, new double[] {9, 6, 8}); {% endhighlight %}

The base class of local matrices is Matrix, and we provide two implementations: DenseMatrix, and SparseMatrix. We recommend using the factory methods implemented in Matrices to create local matrices. Remember, local matrices in MLlib are stored in column-major order.

{% highlight python %} import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8]) {% endhighlight %}

Distributed matrix

A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Three types of distributed matrices have been implemented so far.

The basic type is called RowMatrix. A RowMatrix is a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors. It is backed by an RDD of its rows, where each row is a local vector. We assume that the number of columns is not huge for a RowMatrix so that a single local vector can be reasonably communicated to the driver and can also be stored / operated on using a single node. An IndexedRowMatrix is similar to a RowMatrix but with row indices, which can be used for identifying rows and executing joins. A CoordinateMatrix is a distributed matrix stored in coordinate list (COO) format, backed by an RDD of its entries.

Note

The underlying RDDs of a distributed matrix must be deterministic, because we cache the matrix size. In general the use of non-deterministic RDDs can lead to errors.

RowMatrix

A RowMatrix is a row-oriented distributed matrix without meaningful row indices, backed by an RDD of its rows, where each row is a local vector. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.

A RowMatrix can be created from an RDD[Vector] instance. Then we can compute its column summary statistics.

{% highlight scala %} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows)

// Get its size. val m = mat.numRows() val n = mat.numCols() {% endhighlight %}

A RowMatrix can be created from a JavaRDD<Vector> instance. Then we can compute its column summary statistics.

{% highlight java %} import org.apache.spark.api.java.JavaRDD; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD rows = ... // a JavaRDD of local vectors // Create a RowMatrix from an JavaRDD. RowMatrix mat = new RowMatrix(rows.rdd());

// Get its size. long m = mat.numRows(); long n = mat.numCols(); {% endhighlight %}

A RowMatrix can be created from an RDD of vectors.

{% highlight python %} from pyspark.mllib.linalg.distributed import RowMatrix

Create an RDD of vectors.

rows = sc.parallelize(1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12)

Create a RowMatrix from an RDD of vectors.

mat = RowMatrix(rows)

Get its size.

m = mat.numRows() # 4 n = mat.numCols() # 3

Get the rows as an RDD of vectors again.

rowsRDD = mat.rows {% endhighlight %}

IndexedRowMatrix

An IndexedRowMatrix is similar to a RowMatrix but with meaningful row indices. It is backed by an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local vector.

An IndexedRowMatrix can be created from an RDD[IndexedRow] instance, where IndexedRow is a wrapper over (Long, Vector). An IndexedRowMatrix can be converted to a RowMatrix by dropping its row indices.

{% highlight scala %} import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

val rows: RDD[IndexedRow] = ... // an RDD of indexed rows // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

// Get its size. val m = mat.numRows() val n = mat.numCols()

// Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix() {% endhighlight %}

An IndexedRowMatrix can be created from an JavaRDD<IndexedRow> instance, where IndexedRow is a wrapper over (long, Vector). An IndexedRowMatrix can be converted to a RowMatrix by dropping its row indices.

{% highlight java %} import org.apache.spark.api.java.JavaRDD; import org.apache.spark.mllib.linalg.distributed.IndexedRow; import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix; import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD rows = ... // a JavaRDD of indexed rows // Create an IndexedRowMatrix from a JavaRDD. IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());

// Get its size. long m = mat.numRows(); long n = mat.numCols();

// Drop its row indices. RowMatrix rowMat = mat.toRowMatrix(); {% endhighlight %}

An IndexedRowMatrix can be created from an RDD of IndexedRows, where IndexedRow is a wrapper over (long, vector). An IndexedRowMatrix can be converted to a RowMatrix by dropping its row indices.

{% highlight python %} from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

Create an RDD of indexed rows.

- This can be done explicitly with the IndexedRow class:

indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]), IndexedRow(1, [4, 5, 6]), IndexedRow(2, [7, 8, 9]), IndexedRow(3, [10, 11, 12])])

- or by using (long, vector) tuples:

indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]), (2, [7, 8, 9]), (3, [10, 11, 12])])

Create an IndexedRowMatrix from an RDD of IndexedRows.

mat = IndexedRowMatrix(indexedRows)

Get its size.

m = mat.numRows() # 4 n = mat.numCols() # 3

Get the rows as an RDD of IndexedRows.

rowsRDD = mat.rows

Convert to a RowMatrix by dropping the row indices.

rowMat = mat.toRowMatrix()

Convert to a CoordinateMatrix.

coordinateMat = mat.toCoordinateMatrix() {% endhighlight %}

CoordinateMatrix

A CoordinateMatrix is a distributed matrix backed by an RDD of its entries. Each entry is a tuple of (i: Long, j: Long, value: Double), where i is the row index, j is the column index, and value is the entry value. A CoordinateMatrix should be used only when both dimensions of the matrix are huge and the matrix is very sparse.

A CoordinateMatrix can be created from an RDD[MatrixEntry] instance, where MatrixEntry is a wrapper over (Long, Long, Double). A CoordinateMatrix can be converted to an IndexedRowMatrix with sparse rows by calling toIndexedRowMatrix. Other computations for CoordinateMatrix are not currently supported.

{% highlight scala %} import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries)

// Get its size. val m = mat.numRows() val n = mat.numCols()

// Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix() {% endhighlight %}

A CoordinateMatrix can be created from a JavaRDD<MatrixEntry> instance, where MatrixEntry is a wrapper over (long, long, double). A CoordinateMatrix can be converted to an IndexedRowMatrix with sparse rows by calling toIndexedRowMatrix. Other computations for CoordinateMatrix are not currently supported.

{% highlight java %} import org.apache.spark.api.java.JavaRDD; import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix; import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix; import org.apache.spark.mllib.linalg.distributed.MatrixEntry;

JavaRDD entries = ... // a JavaRDD of matrix entries // Create a CoordinateMatrix from a JavaRDD. CoordinateMatrix mat = new CoordinateMatrix(entries.rdd());

// Get its size. long m = mat.numRows(); long n = mat.numCols();

// Convert it to an IndexRowMatrix whose rows are sparse vectors. IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix(); {% endhighlight %}

A CoordinateMatrix can be created from an RDD of MatrixEntry entries, where MatrixEntry is a wrapper over (long, long, float). A CoordinateMatrix can be converted to a RowMatrix by calling toRowMatrix, or to an IndexedRowMatrix with sparse rows by calling toIndexedRowMatrix.

{% highlight python %} from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

Create an RDD of coordinate entries.

- This can be done explicitly with the MatrixEntry class:

entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6, 1, 3.7)])

- or using (long, long, float) tuples:

entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])

Create an CoordinateMatrix from an RDD of MatrixEntries.

mat = CoordinateMatrix(entries)

Get its size.

m = mat.numRows() # 3 n = mat.numCols() # 2

Get the entries as an RDD of MatrixEntries.

entriesRDD = mat.entries

Convert to a RowMatrix.

rowMat = mat.toRowMatrix()

Convert to an IndexedRowMatrix.

indexedRowMat = mat.toIndexedRowMatrix() {% endhighlight %}

BlockMatrix

A BlockMatrix is a distributed matrix backed by an RDD of MatrixBlocks, where a MatrixBlock is a tuple of ((Int, Int), Matrix), where the (Int, Int) is the index of the block, and Matrix is the sub-matrix at the given index with size rowsPerBlock x colsPerBlock. BlockMatrix supports methods such as add and multiply with another BlockMatrix. BlockMatrix also has a helper function validate which can be used to check whether the BlockMatrix is set up properly.

A BlockMatrix can be most easily created from an IndexedRowMatrix or CoordinateMatrix by calling toBlockMatrix. toBlockMatrix creates blocks of size 1024 x 1024 by default. Users may change the block size by supplying the values through toBlockMatrix(rowsPerBlock, colsPerBlock).

{% highlight scala %} import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) // Transform the CoordinateMatrix to a BlockMatrix val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate()

// Calculate A^T A. val ata = matA.transpose.multiply(matA) {% endhighlight %}

A BlockMatrix can be most easily created from an IndexedRowMatrix or CoordinateMatrix by calling toBlockMatrix. toBlockMatrix creates blocks of size 1024 x 1024 by default. Users may change the block size by supplying the values through toBlockMatrix(rowsPerBlock, colsPerBlock).

{% highlight java %} import org.apache.spark.api.java.JavaRDD; import org.apache.spark.mllib.linalg.distributed.BlockMatrix; import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix; import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;

JavaRDD entries = ... // a JavaRDD of (i, j, v) Matrix Entries // Create a CoordinateMatrix from a JavaRDD. CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd()); // Transform the CoordinateMatrix to a BlockMatrix BlockMatrix matA = coordMat.toBlockMatrix().cache();

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate();

// Calculate A^T A. BlockMatrix ata = matA.transpose().multiply(matA); {% endhighlight %}