spark-instrumented-optimizer/python/pyspark/mllib/linalg/distributed.py
Mike Dusenberry 34dcf10104 [SPARK-6486] [MLLIB] [PYTHON] Add BlockMatrix to PySpark.
mengxr This adds the `BlockMatrix` to PySpark.  I have the conversions to `IndexedRowMatrix` and `CoordinateMatrix` ready as well, so once PR #7554 is completed (which relies on PR #7746), this PR can be finished.

Author: Mike Dusenberry <mwdusenb@us.ibm.com>

Closes #7761 from dusenberrymw/SPARK-6486_Add_BlockMatrix_to_PySpark and squashes the following commits:

27195c2 [Mike Dusenberry] Adding one more check to _convert_to_matrix_block_tuple, and a few minor documentation changes.
ae50883 [Mike Dusenberry] Minor update: BlockMatrix should inherit from DistributedMatrix.
b8acc1c [Mike Dusenberry] Moving BlockMatrix to pyspark.mllib.linalg.distributed, updating the logic to match that of the other distributed matrices, adding conversions, and adding documentation.
c014002 [Mike Dusenberry] Using properties for better documentation.
3bda6ab [Mike Dusenberry] Adding documentation.
8fb3095 [Mike Dusenberry] Small cleanup.
e17af2e [Mike Dusenberry] Adding BlockMatrix to PySpark.
2015-08-05 07:40:50 -07:00

854 lines
32 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Package for distributed linear algebra.
"""
import sys
if sys.version >= '3':
long = int
from py4j.java_gateway import JavaObject
from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import _convert_to_vector, Matrix
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix',
'BlockMatrix']
class DistributedMatrix(object):
"""
.. note:: Experimental
Represents a distributively stored matrix backed by one or
more RDDs.
"""
def numRows(self):
"""Get or compute the number of rows."""
raise NotImplementedError
def numCols(self):
"""Get or compute the number of cols."""
raise NotImplementedError
class RowMatrix(DistributedMatrix):
"""
.. note:: Experimental
Represents a row-oriented distributed Matrix with no meaningful
row indices.
:param rows: An RDD of vectors.
:param numRows: Number of rows in the matrix. A non-positive
value means unknown, at which point the number
of rows will be determined by the number of
records in the `rows` RDD.
:param numCols: Number of columns in the matrix. A non-positive
value means unknown, at which point the number
of columns will be determined by the size of
the first row.
"""
def __init__(self, rows, numRows=0, numCols=0):
"""
Note: This docstring is not shown publicly.
Create a wrapper over a Java RowMatrix.
Publicly, we require that `rows` be an RDD. However, for
internal usage, `rows` can also be a Java RowMatrix
object, in which case we can wrap it directly. This
assists in clean matrix conversions.
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
>>> mat = RowMatrix(rows)
>>> mat_diff = RowMatrix(rows)
>>> (mat_diff._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
False
>>> mat_same = RowMatrix(mat._java_matrix_wrapper._java_model)
>>> (mat_same._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
True
"""
if isinstance(rows, RDD):
rows = rows.map(_convert_to_vector)
java_matrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols))
elif (isinstance(rows, JavaObject)
and rows.getClass().getSimpleName() == "RowMatrix"):
java_matrix = rows
else:
raise TypeError("rows should be an RDD of vectors, got %s" % type(rows))
self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
@property
def rows(self):
"""
Rows of the RowMatrix stored as an RDD of vectors.
>>> mat = RowMatrix(sc.parallelize([[1, 2, 3], [4, 5, 6]]))
>>> rows = mat.rows
>>> rows.first()
DenseVector([1.0, 2.0, 3.0])
"""
return self._java_matrix_wrapper.call("rows")
def numRows(self):
"""
Get or compute the number of rows.
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
... [7, 8, 9], [10, 11, 12]])
>>> mat = RowMatrix(rows)
>>> print(mat.numRows())
4
>>> mat = RowMatrix(rows, 7, 6)
>>> print(mat.numRows())
7
"""
return self._java_matrix_wrapper.call("numRows")
def numCols(self):
"""
Get or compute the number of cols.
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
... [7, 8, 9], [10, 11, 12]])
>>> mat = RowMatrix(rows)
>>> print(mat.numCols())
3
>>> mat = RowMatrix(rows, 7, 6)
>>> print(mat.numCols())
6
"""
return self._java_matrix_wrapper.call("numCols")
class IndexedRow(object):
"""
.. note:: Experimental
Represents a row of an IndexedRowMatrix.
Just a wrapper over a (long, vector) tuple.
:param index: The index for the given row.
:param vector: The row in the matrix at the given index.
"""
def __init__(self, index, vector):
self.index = long(index)
self.vector = _convert_to_vector(vector)
def __repr__(self):
return "IndexedRow(%s, %s)" % (self.index, self.vector)
def _convert_to_indexed_row(row):
if isinstance(row, IndexedRow):
return row
elif isinstance(row, tuple) and len(row) == 2:
return IndexedRow(*row)
else:
raise TypeError("Cannot convert type %s into IndexedRow" % type(row))
class IndexedRowMatrix(DistributedMatrix):
"""
.. note:: Experimental
Represents a row-oriented distributed Matrix with indexed rows.
:param rows: An RDD of IndexedRows or (long, vector) tuples.
:param numRows: Number of rows in the matrix. A non-positive
value means unknown, at which point the number
of rows will be determined by the max row
index plus one.
:param numCols: Number of columns in the matrix. A non-positive
value means unknown, at which point the number
of columns will be determined by the size of
the first row.
"""
def __init__(self, rows, numRows=0, numCols=0):
"""
Note: This docstring is not shown publicly.
Create a wrapper over a Java IndexedRowMatrix.
Publicly, we require that `rows` be an RDD. However, for
internal usage, `rows` can also be a Java IndexedRowMatrix
object, in which case we can wrap it directly. This
assists in clean matrix conversions.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows)
>>> mat_diff = IndexedRowMatrix(rows)
>>> (mat_diff._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
False
>>> mat_same = IndexedRowMatrix(mat._java_matrix_wrapper._java_model)
>>> (mat_same._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
True
"""
if isinstance(rows, RDD):
rows = rows.map(_convert_to_indexed_row)
# We use DataFrames for serialization of IndexedRows from
# Python, so first convert the RDD to a DataFrame on this
# side. This will convert each IndexedRow to a Row
# containing the 'index' and 'vector' values, which can
# both be easily serialized. We will convert back to
# IndexedRows on the Scala side.
java_matrix = callMLlibFunc("createIndexedRowMatrix", rows.toDF(),
long(numRows), int(numCols))
elif (isinstance(rows, JavaObject)
and rows.getClass().getSimpleName() == "IndexedRowMatrix"):
java_matrix = rows
else:
raise TypeError("rows should be an RDD of IndexedRows or (long, vector) tuples, "
"got %s" % type(rows))
self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
@property
def rows(self):
"""
Rows of the IndexedRowMatrix stored as an RDD of IndexedRows.
>>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6])]))
>>> rows = mat.rows
>>> rows.first()
IndexedRow(0, [1.0,2.0,3.0])
"""
# We use DataFrames for serialization of IndexedRows from
# Java, so we first convert the RDD of rows to a DataFrame
# on the Scala/Java side. Then we map each Row in the
# DataFrame back to an IndexedRow on this side.
rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
rows = rows_df.map(lambda row: IndexedRow(row[0], row[1]))
return rows
def numRows(self):
"""
Get or compute the number of rows.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6]),
... IndexedRow(2, [7, 8, 9]),
... IndexedRow(3, [10, 11, 12])])
>>> mat = IndexedRowMatrix(rows)
>>> print(mat.numRows())
4
>>> mat = IndexedRowMatrix(rows, 7, 6)
>>> print(mat.numRows())
7
"""
return self._java_matrix_wrapper.call("numRows")
def numCols(self):
"""
Get or compute the number of cols.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6]),
... IndexedRow(2, [7, 8, 9]),
... IndexedRow(3, [10, 11, 12])])
>>> mat = IndexedRowMatrix(rows)
>>> print(mat.numCols())
3
>>> mat = IndexedRowMatrix(rows, 7, 6)
>>> print(mat.numCols())
6
"""
return self._java_matrix_wrapper.call("numCols")
def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(6, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows).toRowMatrix()
>>> mat.rows.collect()
[DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0])]
"""
java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix")
return RowMatrix(java_row_matrix)
def toCoordinateMatrix(self):
"""
Convert this matrix to a CoordinateMatrix.
>>> rows = sc.parallelize([IndexedRow(0, [1, 0]),
... IndexedRow(6, [0, 5])])
>>> mat = IndexedRowMatrix(rows).toCoordinateMatrix()
>>> mat.entries.take(3)
[MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 0.0), MatrixEntry(6, 0, 0.0)]
"""
java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
return CoordinateMatrix(java_coordinate_matrix)
def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
"""
Convert this matrix to a BlockMatrix.
:param rowsPerBlock: Number of rows that make up each block.
The blocks forming the final rows are not
required to have the given number of rows.
:param colsPerBlock: Number of columns that make up each block.
The blocks forming the final columns are not
required to have the given number of columns.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(6, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows).toBlockMatrix()
>>> # This IndexedRowMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # BlockMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> print(mat.numCols())
3
"""
java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
rowsPerBlock,
colsPerBlock)
return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
class MatrixEntry(object):
"""
.. note:: Experimental
Represents an entry of a CoordinateMatrix.
Just a wrapper over a (long, long, float) tuple.
:param i: The row index of the matrix.
:param j: The column index of the matrix.
:param value: The (i, j)th entry of the matrix, as a float.
"""
def __init__(self, i, j, value):
self.i = long(i)
self.j = long(j)
self.value = float(value)
def __repr__(self):
return "MatrixEntry(%s, %s, %s)" % (self.i, self.j, self.value)
def _convert_to_matrix_entry(entry):
if isinstance(entry, MatrixEntry):
return entry
elif isinstance(entry, tuple) and len(entry) == 3:
return MatrixEntry(*entry)
else:
raise TypeError("Cannot convert type %s into MatrixEntry" % type(entry))
class CoordinateMatrix(DistributedMatrix):
"""
.. note:: Experimental
Represents a matrix in coordinate format.
:param entries: An RDD of MatrixEntry inputs or
(long, long, float) tuples.
:param numRows: Number of rows in the matrix. A non-positive
value means unknown, at which point the number
of rows will be determined by the max row
index plus one.
:param numCols: Number of columns in the matrix. A non-positive
value means unknown, at which point the number
of columns will be determined by the max row
index plus one.
"""
def __init__(self, entries, numRows=0, numCols=0):
"""
Note: This docstring is not shown publicly.
Create a wrapper over a Java CoordinateMatrix.
Publicly, we require that `rows` be an RDD. However, for
internal usage, `rows` can also be a Java CoordinateMatrix
object, in which case we can wrap it directly. This
assists in clean matrix conversions.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries)
>>> mat_diff = CoordinateMatrix(entries)
>>> (mat_diff._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
False
>>> mat_same = CoordinateMatrix(mat._java_matrix_wrapper._java_model)
>>> (mat_same._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
True
"""
if isinstance(entries, RDD):
entries = entries.map(_convert_to_matrix_entry)
# We use DataFrames for serialization of MatrixEntry entries
# from Python, so first convert the RDD to a DataFrame on
# this side. This will convert each MatrixEntry to a Row
# containing the 'i', 'j', and 'value' values, which can
# each be easily serialized. We will convert back to
# MatrixEntry inputs on the Scala side.
java_matrix = callMLlibFunc("createCoordinateMatrix", entries.toDF(),
long(numRows), long(numCols))
elif (isinstance(entries, JavaObject)
and entries.getClass().getSimpleName() == "CoordinateMatrix"):
java_matrix = entries
else:
raise TypeError("entries should be an RDD of MatrixEntry entries or "
"(long, long, float) tuples, got %s" % type(entries))
self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
@property
def entries(self):
"""
Entries of the CoordinateMatrix stored as an RDD of
MatrixEntries.
>>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)]))
>>> entries = mat.entries
>>> entries.first()
MatrixEntry(0, 0, 1.2)
"""
# We use DataFrames for serialization of MatrixEntry entries
# from Java, so we first convert the RDD of entries to a
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a MatrixEntry on this side.
entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
return entries
def numRows(self):
"""
Get or compute the number of rows.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(1, 0, 2),
... MatrixEntry(2, 1, 3.7)])
>>> mat = CoordinateMatrix(entries)
>>> print(mat.numRows())
3
>>> mat = CoordinateMatrix(entries, 7, 6)
>>> print(mat.numRows())
7
"""
return self._java_matrix_wrapper.call("numRows")
def numCols(self):
"""
Get or compute the number of cols.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(1, 0, 2),
... MatrixEntry(2, 1, 3.7)])
>>> mat = CoordinateMatrix(entries)
>>> print(mat.numCols())
2
>>> mat = CoordinateMatrix(entries, 7, 6)
>>> print(mat.numCols())
6
"""
return self._java_matrix_wrapper.call("numCols")
def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, but the ensuing RowMatrix
>>> # will only have 2 rows since there are only entries on 2
>>> # unique rows.
>>> print(mat.numRows())
2
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing RowMatrix
>>> # will have 5 columns as well.
>>> print(mat.numCols())
5
"""
java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix")
return RowMatrix(java_row_matrix)
def toIndexedRowMatrix(self):
"""
Convert this matrix to an IndexedRowMatrix.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # IndexedRowMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing
>>> # IndexedRowMatrix will have 5 columns as well.
>>> print(mat.numCols())
5
"""
java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
return IndexedRowMatrix(java_indexed_row_matrix)
def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
"""
Convert this matrix to a BlockMatrix.
:param rowsPerBlock: Number of rows that make up each block.
The blocks forming the final rows are not
required to have the given number of rows.
:param colsPerBlock: Number of columns that make up each block.
The blocks forming the final columns are not
required to have the given number of columns.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toBlockMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # BlockMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing
>>> # BlockMatrix will have 5 columns as well.
>>> print(mat.numCols())
5
"""
java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
rowsPerBlock,
colsPerBlock)
return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
def _convert_to_matrix_block_tuple(block):
if (isinstance(block, tuple) and len(block) == 2
and isinstance(block[0], tuple) and len(block[0]) == 2
and isinstance(block[1], Matrix)):
blockRowIndex = int(block[0][0])
blockColIndex = int(block[0][1])
subMatrix = block[1]
return ((blockRowIndex, blockColIndex), subMatrix)
else:
raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block))
class BlockMatrix(DistributedMatrix):
"""
.. note:: Experimental
Represents a distributed matrix in blocks of local matrices.
:param blocks: An RDD of sub-matrix blocks
((blockRowIndex, blockColIndex), sub-matrix) that
form this distributed matrix. If multiple blocks
with the same index exist, the results for
operations like add and multiply will be
unpredictable.
:param rowsPerBlock: Number of rows that make up each block.
The blocks forming the final rows are not
required to have the given number of rows.
:param colsPerBlock: Number of columns that make up each block.
The blocks forming the final columns are not
required to have the given number of columns.
:param numRows: Number of rows of this matrix. If the supplied
value is less than or equal to zero, the number
of rows will be calculated when `numRows` is
invoked.
:param numCols: Number of columns of this matrix. If the supplied
value is less than or equal to zero, the number
of columns will be calculated when `numCols` is
invoked.
"""
def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0):
"""
Note: This docstring is not shown publicly.
Create a wrapper over a Java BlockMatrix.
Publicly, we require that `blocks` be an RDD. However, for
internal usage, `blocks` can also be a Java BlockMatrix
object, in which case we can wrap it directly. This
assists in clean matrix conversions.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat_diff = BlockMatrix(blocks, 3, 2)
>>> (mat_diff._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
False
>>> mat_same = BlockMatrix(mat._java_matrix_wrapper._java_model, 3, 2)
>>> (mat_same._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
True
"""
if isinstance(blocks, RDD):
blocks = blocks.map(_convert_to_matrix_block_tuple)
# We use DataFrames for serialization of sub-matrix blocks
# from Python, so first convert the RDD to a DataFrame on
# this side. This will convert each sub-matrix block
# tuple to a Row containing the 'blockRowIndex',
# 'blockColIndex', and 'subMatrix' values, which can
# each be easily serialized. We will convert back to
# ((blockRowIndex, blockColIndex), sub-matrix) tuples on
# the Scala side.
java_matrix = callMLlibFunc("createBlockMatrix", blocks.toDF(),
int(rowsPerBlock), int(colsPerBlock),
long(numRows), long(numCols))
elif (isinstance(blocks, JavaObject)
and blocks.getClass().getSimpleName() == "BlockMatrix"):
java_matrix = blocks
else:
raise TypeError("blocks should be an RDD of sub-matrix blocks as "
"((int, int), matrix) tuples, got %s" % type(blocks))
self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
@property
def blocks(self):
"""
The RDD of sub-matrix blocks
((blockRowIndex, blockColIndex), sub-matrix) that form this
distributed matrix.
>>> mat = BlockMatrix(
... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
>>> blocks = mat.blocks
>>> blocks.first()
((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))
"""
# We use DataFrames for serialization of sub-matrix blocks
# from Java, so we first convert the RDD of blocks to a
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a sub-matrix block on this side.
blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
return blocks
@property
def rowsPerBlock(self):
"""
Number of rows that make up each block.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.rowsPerBlock
3
"""
return self._java_matrix_wrapper.call("rowsPerBlock")
@property
def colsPerBlock(self):
"""
Number of columns that make up each block.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.colsPerBlock
2
"""
return self._java_matrix_wrapper.call("colsPerBlock")
@property
def numRowBlocks(self):
"""
Number of rows of blocks in the BlockMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.numRowBlocks
2
"""
return self._java_matrix_wrapper.call("numRowBlocks")
@property
def numColBlocks(self):
"""
Number of columns of blocks in the BlockMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.numColBlocks
1
"""
return self._java_matrix_wrapper.call("numColBlocks")
def numRows(self):
"""
Get or compute the number of rows.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> print(mat.numRows())
6
>>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
>>> print(mat.numRows())
7
"""
return self._java_matrix_wrapper.call("numRows")
def numCols(self):
"""
Get or compute the number of cols.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> print(mat.numCols())
2
>>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
>>> print(mat.numCols())
6
"""
return self._java_matrix_wrapper.call("numCols")
def toLocalMatrix(self):
"""
Collect the distributed matrix on the driver as a DenseMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix()
>>> # This BlockMatrix will have 6 effective rows, due to
>>> # having two sub-matrix blocks stacked, each with 3 rows.
>>> # The ensuing DenseMatrix will also have 6 rows.
>>> print(mat.numRows)
6
>>> # This BlockMatrix will have 2 effective columns, due to
>>> # having two sub-matrix blocks stacked, each with 2
>>> # columns. The ensuing DenseMatrix will also have 2 columns.
>>> print(mat.numCols)
2
"""
return self._java_matrix_wrapper.call("toLocalMatrix")
def toIndexedRowMatrix(self):
"""
Convert this matrix to an IndexedRowMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2).toIndexedRowMatrix()
>>> # This BlockMatrix will have 6 effective rows, due to
>>> # having two sub-matrix blocks stacked, each with 3 rows.
>>> # The ensuing IndexedRowMatrix will also have 6 rows.
>>> print(mat.numRows())
6
>>> # This BlockMatrix will have 2 effective columns, due to
>>> # having two sub-matrix blocks stacked, each with 2 columns.
>>> # The ensuing IndexedRowMatrix will also have 2 columns.
>>> print(mat.numCols())
2
"""
java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
return IndexedRowMatrix(java_indexed_row_matrix)
def toCoordinateMatrix(self):
"""
Convert this matrix to a CoordinateMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(1, 2, [1, 2])),
... ((1, 0), Matrices.dense(1, 2, [7, 8]))])
>>> mat = BlockMatrix(blocks, 1, 2).toCoordinateMatrix()
>>> mat.entries.take(3)
[MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 2.0), MatrixEntry(1, 0, 7.0)]
"""
java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
return CoordinateMatrix(java_coordinate_matrix)
def _test():
import doctest
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Matrices
import pyspark.mllib.linalg.distributed
globs = pyspark.mllib.linalg.distributed.__dict__.copy()
globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
globs['sqlContext'] = SQLContext(globs['sc'])
globs['Matrices'] = Matrices
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()