spark-instrumented-optimizer/python/pyspark/mllib/linalg/distributed.py
Kai Jiang 1537e55604 [SPARK-12041][ML][PYSPARK] Add columnSimilarities to IndexedRowMatrix
Add `columnSimilarities` to IndexedRowMatrix for PySpark spark.mllib.linalg.

Author: Kai Jiang <jiangkai@gmail.com>

Closes #10158 from vectorijk/spark-12041.
2016-01-05 15:33:27 -08:00

936 lines
35 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Package for distributed linear algebra.
"""
import sys
if sys.version >= '3':
long = int
from py4j.java_gateway import JavaObject
from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import _convert_to_vector, Matrix
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix',
'BlockMatrix']
class DistributedMatrix(object):
"""
.. note:: Experimental
Represents a distributively stored matrix backed by one or
more RDDs.
"""
def numRows(self):
"""Get or compute the number of rows."""
raise NotImplementedError
def numCols(self):
"""Get or compute the number of cols."""
raise NotImplementedError
class RowMatrix(DistributedMatrix):
"""
.. note:: Experimental
Represents a row-oriented distributed Matrix with no meaningful
row indices.
:param rows: An RDD of vectors.
:param numRows: Number of rows in the matrix. A non-positive
value means unknown, at which point the number
of rows will be determined by the number of
records in the `rows` RDD.
:param numCols: Number of columns in the matrix. A non-positive
value means unknown, at which point the number
of columns will be determined by the size of
the first row.
"""
def __init__(self, rows, numRows=0, numCols=0):
"""
Note: This docstring is not shown publicly.
Create a wrapper over a Java RowMatrix.
Publicly, we require that `rows` be an RDD. However, for
internal usage, `rows` can also be a Java RowMatrix
object, in which case we can wrap it directly. This
assists in clean matrix conversions.
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
>>> mat = RowMatrix(rows)
>>> mat_diff = RowMatrix(rows)
>>> (mat_diff._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
False
>>> mat_same = RowMatrix(mat._java_matrix_wrapper._java_model)
>>> (mat_same._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
True
"""
if isinstance(rows, RDD):
rows = rows.map(_convert_to_vector)
java_matrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols))
elif (isinstance(rows, JavaObject)
and rows.getClass().getSimpleName() == "RowMatrix"):
java_matrix = rows
else:
raise TypeError("rows should be an RDD of vectors, got %s" % type(rows))
self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
@property
def rows(self):
"""
Rows of the RowMatrix stored as an RDD of vectors.
>>> mat = RowMatrix(sc.parallelize([[1, 2, 3], [4, 5, 6]]))
>>> rows = mat.rows
>>> rows.first()
DenseVector([1.0, 2.0, 3.0])
"""
return self._java_matrix_wrapper.call("rows")
def numRows(self):
"""
Get or compute the number of rows.
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
... [7, 8, 9], [10, 11, 12]])
>>> mat = RowMatrix(rows)
>>> print(mat.numRows())
4
>>> mat = RowMatrix(rows, 7, 6)
>>> print(mat.numRows())
7
"""
return self._java_matrix_wrapper.call("numRows")
def numCols(self):
"""
Get or compute the number of cols.
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
... [7, 8, 9], [10, 11, 12]])
>>> mat = RowMatrix(rows)
>>> print(mat.numCols())
3
>>> mat = RowMatrix(rows, 7, 6)
>>> print(mat.numCols())
6
"""
return self._java_matrix_wrapper.call("numCols")
class IndexedRow(object):
"""
.. note:: Experimental
Represents a row of an IndexedRowMatrix.
Just a wrapper over a (long, vector) tuple.
:param index: The index for the given row.
:param vector: The row in the matrix at the given index.
"""
def __init__(self, index, vector):
self.index = long(index)
self.vector = _convert_to_vector(vector)
def __repr__(self):
return "IndexedRow(%s, %s)" % (self.index, self.vector)
def _convert_to_indexed_row(row):
if isinstance(row, IndexedRow):
return row
elif isinstance(row, tuple) and len(row) == 2:
return IndexedRow(*row)
else:
raise TypeError("Cannot convert type %s into IndexedRow" % type(row))
class IndexedRowMatrix(DistributedMatrix):
"""
.. note:: Experimental
Represents a row-oriented distributed Matrix with indexed rows.
:param rows: An RDD of IndexedRows or (long, vector) tuples.
:param numRows: Number of rows in the matrix. A non-positive
value means unknown, at which point the number
of rows will be determined by the max row
index plus one.
:param numCols: Number of columns in the matrix. A non-positive
value means unknown, at which point the number
of columns will be determined by the size of
the first row.
"""
def __init__(self, rows, numRows=0, numCols=0):
"""
Note: This docstring is not shown publicly.
Create a wrapper over a Java IndexedRowMatrix.
Publicly, we require that `rows` be an RDD. However, for
internal usage, `rows` can also be a Java IndexedRowMatrix
object, in which case we can wrap it directly. This
assists in clean matrix conversions.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows)
>>> mat_diff = IndexedRowMatrix(rows)
>>> (mat_diff._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
False
>>> mat_same = IndexedRowMatrix(mat._java_matrix_wrapper._java_model)
>>> (mat_same._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
True
"""
if isinstance(rows, RDD):
rows = rows.map(_convert_to_indexed_row)
# We use DataFrames for serialization of IndexedRows from
# Python, so first convert the RDD to a DataFrame on this
# side. This will convert each IndexedRow to a Row
# containing the 'index' and 'vector' values, which can
# both be easily serialized. We will convert back to
# IndexedRows on the Scala side.
java_matrix = callMLlibFunc("createIndexedRowMatrix", rows.toDF(),
long(numRows), int(numCols))
elif (isinstance(rows, JavaObject)
and rows.getClass().getSimpleName() == "IndexedRowMatrix"):
java_matrix = rows
else:
raise TypeError("rows should be an RDD of IndexedRows or (long, vector) tuples, "
"got %s" % type(rows))
self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
@property
def rows(self):
"""
Rows of the IndexedRowMatrix stored as an RDD of IndexedRows.
>>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6])]))
>>> rows = mat.rows
>>> rows.first()
IndexedRow(0, [1.0,2.0,3.0])
"""
# We use DataFrames for serialization of IndexedRows from
# Java, so we first convert the RDD of rows to a DataFrame
# on the Scala/Java side. Then we map each Row in the
# DataFrame back to an IndexedRow on this side.
rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
rows = rows_df.map(lambda row: IndexedRow(row[0], row[1]))
return rows
def numRows(self):
"""
Get or compute the number of rows.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6]),
... IndexedRow(2, [7, 8, 9]),
... IndexedRow(3, [10, 11, 12])])
>>> mat = IndexedRowMatrix(rows)
>>> print(mat.numRows())
4
>>> mat = IndexedRowMatrix(rows, 7, 6)
>>> print(mat.numRows())
7
"""
return self._java_matrix_wrapper.call("numRows")
def numCols(self):
"""
Get or compute the number of cols.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6]),
... IndexedRow(2, [7, 8, 9]),
... IndexedRow(3, [10, 11, 12])])
>>> mat = IndexedRowMatrix(rows)
>>> print(mat.numCols())
3
>>> mat = IndexedRowMatrix(rows, 7, 6)
>>> print(mat.numCols())
6
"""
return self._java_matrix_wrapper.call("numCols")
def columnSimilarities(self):
"""
Compute all cosine similarities between columns.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(6, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows)
>>> cs = mat.columnSimilarities()
>>> print(cs.numCols())
3
"""
java_coordinate_matrix = self._java_matrix_wrapper.call("columnSimilarities")
return CoordinateMatrix(java_coordinate_matrix)
def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(6, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows).toRowMatrix()
>>> mat.rows.collect()
[DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0])]
"""
java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix")
return RowMatrix(java_row_matrix)
def toCoordinateMatrix(self):
"""
Convert this matrix to a CoordinateMatrix.
>>> rows = sc.parallelize([IndexedRow(0, [1, 0]),
... IndexedRow(6, [0, 5])])
>>> mat = IndexedRowMatrix(rows).toCoordinateMatrix()
>>> mat.entries.take(3)
[MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 0.0), MatrixEntry(6, 0, 0.0)]
"""
java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
return CoordinateMatrix(java_coordinate_matrix)
def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
"""
Convert this matrix to a BlockMatrix.
:param rowsPerBlock: Number of rows that make up each block.
The blocks forming the final rows are not
required to have the given number of rows.
:param colsPerBlock: Number of columns that make up each block.
The blocks forming the final columns are not
required to have the given number of columns.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(6, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows).toBlockMatrix()
>>> # This IndexedRowMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # BlockMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> print(mat.numCols())
3
"""
java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
rowsPerBlock,
colsPerBlock)
return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
class MatrixEntry(object):
"""
.. note:: Experimental
Represents an entry of a CoordinateMatrix.
Just a wrapper over a (long, long, float) tuple.
:param i: The row index of the matrix.
:param j: The column index of the matrix.
:param value: The (i, j)th entry of the matrix, as a float.
"""
def __init__(self, i, j, value):
self.i = long(i)
self.j = long(j)
self.value = float(value)
def __repr__(self):
return "MatrixEntry(%s, %s, %s)" % (self.i, self.j, self.value)
def _convert_to_matrix_entry(entry):
if isinstance(entry, MatrixEntry):
return entry
elif isinstance(entry, tuple) and len(entry) == 3:
return MatrixEntry(*entry)
else:
raise TypeError("Cannot convert type %s into MatrixEntry" % type(entry))
class CoordinateMatrix(DistributedMatrix):
"""
.. note:: Experimental
Represents a matrix in coordinate format.
:param entries: An RDD of MatrixEntry inputs or
(long, long, float) tuples.
:param numRows: Number of rows in the matrix. A non-positive
value means unknown, at which point the number
of rows will be determined by the max row
index plus one.
:param numCols: Number of columns in the matrix. A non-positive
value means unknown, at which point the number
of columns will be determined by the max row
index plus one.
"""
def __init__(self, entries, numRows=0, numCols=0):
"""
Note: This docstring is not shown publicly.
Create a wrapper over a Java CoordinateMatrix.
Publicly, we require that `rows` be an RDD. However, for
internal usage, `rows` can also be a Java CoordinateMatrix
object, in which case we can wrap it directly. This
assists in clean matrix conversions.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries)
>>> mat_diff = CoordinateMatrix(entries)
>>> (mat_diff._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
False
>>> mat_same = CoordinateMatrix(mat._java_matrix_wrapper._java_model)
>>> (mat_same._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
True
"""
if isinstance(entries, RDD):
entries = entries.map(_convert_to_matrix_entry)
# We use DataFrames for serialization of MatrixEntry entries
# from Python, so first convert the RDD to a DataFrame on
# this side. This will convert each MatrixEntry to a Row
# containing the 'i', 'j', and 'value' values, which can
# each be easily serialized. We will convert back to
# MatrixEntry inputs on the Scala side.
java_matrix = callMLlibFunc("createCoordinateMatrix", entries.toDF(),
long(numRows), long(numCols))
elif (isinstance(entries, JavaObject)
and entries.getClass().getSimpleName() == "CoordinateMatrix"):
java_matrix = entries
else:
raise TypeError("entries should be an RDD of MatrixEntry entries or "
"(long, long, float) tuples, got %s" % type(entries))
self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
@property
def entries(self):
"""
Entries of the CoordinateMatrix stored as an RDD of
MatrixEntries.
>>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)]))
>>> entries = mat.entries
>>> entries.first()
MatrixEntry(0, 0, 1.2)
"""
# We use DataFrames for serialization of MatrixEntry entries
# from Java, so we first convert the RDD of entries to a
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a MatrixEntry on this side.
entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
return entries
def numRows(self):
"""
Get or compute the number of rows.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(1, 0, 2),
... MatrixEntry(2, 1, 3.7)])
>>> mat = CoordinateMatrix(entries)
>>> print(mat.numRows())
3
>>> mat = CoordinateMatrix(entries, 7, 6)
>>> print(mat.numRows())
7
"""
return self._java_matrix_wrapper.call("numRows")
def numCols(self):
"""
Get or compute the number of cols.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(1, 0, 2),
... MatrixEntry(2, 1, 3.7)])
>>> mat = CoordinateMatrix(entries)
>>> print(mat.numCols())
2
>>> mat = CoordinateMatrix(entries, 7, 6)
>>> print(mat.numCols())
6
"""
return self._java_matrix_wrapper.call("numCols")
def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, but the ensuing RowMatrix
>>> # will only have 2 rows since there are only entries on 2
>>> # unique rows.
>>> print(mat.numRows())
2
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing RowMatrix
>>> # will have 5 columns as well.
>>> print(mat.numCols())
5
"""
java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix")
return RowMatrix(java_row_matrix)
def toIndexedRowMatrix(self):
"""
Convert this matrix to an IndexedRowMatrix.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # IndexedRowMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing
>>> # IndexedRowMatrix will have 5 columns as well.
>>> print(mat.numCols())
5
"""
java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
return IndexedRowMatrix(java_indexed_row_matrix)
def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
"""
Convert this matrix to a BlockMatrix.
:param rowsPerBlock: Number of rows that make up each block.
The blocks forming the final rows are not
required to have the given number of rows.
:param colsPerBlock: Number of columns that make up each block.
The blocks forming the final columns are not
required to have the given number of columns.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toBlockMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # BlockMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing
>>> # BlockMatrix will have 5 columns as well.
>>> print(mat.numCols())
5
"""
java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
rowsPerBlock,
colsPerBlock)
return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
def _convert_to_matrix_block_tuple(block):
if (isinstance(block, tuple) and len(block) == 2
and isinstance(block[0], tuple) and len(block[0]) == 2
and isinstance(block[1], Matrix)):
blockRowIndex = int(block[0][0])
blockColIndex = int(block[0][1])
subMatrix = block[1]
return ((blockRowIndex, blockColIndex), subMatrix)
else:
raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block))
class BlockMatrix(DistributedMatrix):
"""
.. note:: Experimental
Represents a distributed matrix in blocks of local matrices.
:param blocks: An RDD of sub-matrix blocks
((blockRowIndex, blockColIndex), sub-matrix) that
form this distributed matrix. If multiple blocks
with the same index exist, the results for
operations like add and multiply will be
unpredictable.
:param rowsPerBlock: Number of rows that make up each block.
The blocks forming the final rows are not
required to have the given number of rows.
:param colsPerBlock: Number of columns that make up each block.
The blocks forming the final columns are not
required to have the given number of columns.
:param numRows: Number of rows of this matrix. If the supplied
value is less than or equal to zero, the number
of rows will be calculated when `numRows` is
invoked.
:param numCols: Number of columns of this matrix. If the supplied
value is less than or equal to zero, the number
of columns will be calculated when `numCols` is
invoked.
"""
def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0):
"""
Note: This docstring is not shown publicly.
Create a wrapper over a Java BlockMatrix.
Publicly, we require that `blocks` be an RDD. However, for
internal usage, `blocks` can also be a Java BlockMatrix
object, in which case we can wrap it directly. This
assists in clean matrix conversions.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat_diff = BlockMatrix(blocks, 3, 2)
>>> (mat_diff._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
False
>>> mat_same = BlockMatrix(mat._java_matrix_wrapper._java_model, 3, 2)
>>> (mat_same._java_matrix_wrapper._java_model ==
... mat._java_matrix_wrapper._java_model)
True
"""
if isinstance(blocks, RDD):
blocks = blocks.map(_convert_to_matrix_block_tuple)
# We use DataFrames for serialization of sub-matrix blocks
# from Python, so first convert the RDD to a DataFrame on
# this side. This will convert each sub-matrix block
# tuple to a Row containing the 'blockRowIndex',
# 'blockColIndex', and 'subMatrix' values, which can
# each be easily serialized. We will convert back to
# ((blockRowIndex, blockColIndex), sub-matrix) tuples on
# the Scala side.
java_matrix = callMLlibFunc("createBlockMatrix", blocks.toDF(),
int(rowsPerBlock), int(colsPerBlock),
long(numRows), long(numCols))
elif (isinstance(blocks, JavaObject)
and blocks.getClass().getSimpleName() == "BlockMatrix"):
java_matrix = blocks
else:
raise TypeError("blocks should be an RDD of sub-matrix blocks as "
"((int, int), matrix) tuples, got %s" % type(blocks))
self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
@property
def blocks(self):
"""
The RDD of sub-matrix blocks
((blockRowIndex, blockColIndex), sub-matrix) that form this
distributed matrix.
>>> mat = BlockMatrix(
... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
>>> blocks = mat.blocks
>>> blocks.first()
((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))
"""
# We use DataFrames for serialization of sub-matrix blocks
# from Java, so we first convert the RDD of blocks to a
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a sub-matrix block on this side.
blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
return blocks
@property
def rowsPerBlock(self):
"""
Number of rows that make up each block.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.rowsPerBlock
3
"""
return self._java_matrix_wrapper.call("rowsPerBlock")
@property
def colsPerBlock(self):
"""
Number of columns that make up each block.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.colsPerBlock
2
"""
return self._java_matrix_wrapper.call("colsPerBlock")
@property
def numRowBlocks(self):
"""
Number of rows of blocks in the BlockMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.numRowBlocks
2
"""
return self._java_matrix_wrapper.call("numRowBlocks")
@property
def numColBlocks(self):
"""
Number of columns of blocks in the BlockMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.numColBlocks
1
"""
return self._java_matrix_wrapper.call("numColBlocks")
def numRows(self):
"""
Get or compute the number of rows.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> print(mat.numRows())
6
>>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
>>> print(mat.numRows())
7
"""
return self._java_matrix_wrapper.call("numRows")
def numCols(self):
"""
Get or compute the number of cols.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> print(mat.numCols())
2
>>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
>>> print(mat.numCols())
6
"""
return self._java_matrix_wrapper.call("numCols")
def add(self, other):
"""
Adds two block matrices together. The matrices must have the
same size and matching `rowsPerBlock` and `colsPerBlock` values.
If one of the sub matrix blocks that are being added is a
SparseMatrix, the resulting sub matrix block will also be a
SparseMatrix, even if it is being added to a DenseMatrix. If
two dense sub matrix blocks are added, the output block will
also be a DenseMatrix.
>>> dm1 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
>>> dm2 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12])
>>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [7, 11, 12])
>>> blocks1 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)])
>>> blocks2 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)])
>>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm2)])
>>> mat1 = BlockMatrix(blocks1, 3, 2)
>>> mat2 = BlockMatrix(blocks2, 3, 2)
>>> mat3 = BlockMatrix(blocks3, 3, 2)
>>> mat1.add(mat2).toLocalMatrix()
DenseMatrix(6, 2, [2.0, 4.0, 6.0, 14.0, 16.0, 18.0, 8.0, 10.0, 12.0, 20.0, 22.0, 24.0], 0)
>>> mat1.add(mat3).toLocalMatrix()
DenseMatrix(6, 2, [8.0, 2.0, 3.0, 14.0, 16.0, 18.0, 4.0, 16.0, 18.0, 20.0, 22.0, 24.0], 0)
"""
if not isinstance(other, BlockMatrix):
raise TypeError("Other should be a BlockMatrix, got %s" % type(other))
other_java_block_matrix = other._java_matrix_wrapper._java_model
java_block_matrix = self._java_matrix_wrapper.call("add", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
def multiply(self, other):
"""
Left multiplies this BlockMatrix by `other`, another
BlockMatrix. The `colsPerBlock` of this matrix must equal the
`rowsPerBlock` of `other`. If `other` contains any SparseMatrix
blocks, they will have to be converted to DenseMatrix blocks.
The output BlockMatrix will only consist of DenseMatrix blocks.
This may cause some performance issues until support for
multiplying two sparse matrices is added.
>>> dm1 = Matrices.dense(2, 3, [1, 2, 3, 4, 5, 6])
>>> dm2 = Matrices.dense(2, 3, [7, 8, 9, 10, 11, 12])
>>> dm3 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
>>> dm4 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12])
>>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [7, 11, 12])
>>> blocks1 = sc.parallelize([((0, 0), dm1), ((0, 1), dm2)])
>>> blocks2 = sc.parallelize([((0, 0), dm3), ((1, 0), dm4)])
>>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm4)])
>>> mat1 = BlockMatrix(blocks1, 2, 3)
>>> mat2 = BlockMatrix(blocks2, 3, 2)
>>> mat3 = BlockMatrix(blocks3, 3, 2)
>>> mat1.multiply(mat2).toLocalMatrix()
DenseMatrix(2, 2, [242.0, 272.0, 350.0, 398.0], 0)
>>> mat1.multiply(mat3).toLocalMatrix()
DenseMatrix(2, 2, [227.0, 258.0, 394.0, 450.0], 0)
"""
if not isinstance(other, BlockMatrix):
raise TypeError("Other should be a BlockMatrix, got %s" % type(other))
other_java_block_matrix = other._java_matrix_wrapper._java_model
java_block_matrix = self._java_matrix_wrapper.call("multiply", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
def toLocalMatrix(self):
"""
Collect the distributed matrix on the driver as a DenseMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix()
>>> # This BlockMatrix will have 6 effective rows, due to
>>> # having two sub-matrix blocks stacked, each with 3 rows.
>>> # The ensuing DenseMatrix will also have 6 rows.
>>> print(mat.numRows)
6
>>> # This BlockMatrix will have 2 effective columns, due to
>>> # having two sub-matrix blocks stacked, each with 2
>>> # columns. The ensuing DenseMatrix will also have 2 columns.
>>> print(mat.numCols)
2
"""
return self._java_matrix_wrapper.call("toLocalMatrix")
def toIndexedRowMatrix(self):
"""
Convert this matrix to an IndexedRowMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2).toIndexedRowMatrix()
>>> # This BlockMatrix will have 6 effective rows, due to
>>> # having two sub-matrix blocks stacked, each with 3 rows.
>>> # The ensuing IndexedRowMatrix will also have 6 rows.
>>> print(mat.numRows())
6
>>> # This BlockMatrix will have 2 effective columns, due to
>>> # having two sub-matrix blocks stacked, each with 2 columns.
>>> # The ensuing IndexedRowMatrix will also have 2 columns.
>>> print(mat.numCols())
2
"""
java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
return IndexedRowMatrix(java_indexed_row_matrix)
def toCoordinateMatrix(self):
"""
Convert this matrix to a CoordinateMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(1, 2, [1, 2])),
... ((1, 0), Matrices.dense(1, 2, [7, 8]))])
>>> mat = BlockMatrix(blocks, 1, 2).toCoordinateMatrix()
>>> mat.entries.take(3)
[MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 2.0), MatrixEntry(1, 0, 7.0)]
"""
java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
return CoordinateMatrix(java_coordinate_matrix)
def _test():
import doctest
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Matrices
import pyspark.mllib.linalg.distributed
globs = pyspark.mllib.linalg.distributed.__dict__.copy()
globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
globs['sqlContext'] = SQLContext(globs['sc'])
globs['Matrices'] = Matrices
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()