[SPARK-3977] Conversion methods for BlockMatrix to other Distributed Matrices
The conversion methods for `BlockMatrix`. Conversions go through `CoordinateMatrix` in order to cause a shuffle so that intermediate operations will be stored on disk and the expensive initial computation will be mitigated. Author: Burak Yavuz <brkyvz@gmail.com> Closes #4256 from brkyvz/SPARK-3977PR and squashes the following commits: 4df37fe [Burak Yavuz] moved TODO inside code block b049c07 [Burak Yavuz] addressed code review feedback v1 66cb755 [Burak Yavuz] added default toBlockMatrix conversion 851f2a2 [Burak Yavuz] added better comments and checks cdb9895 [Burak Yavuz] [SPARK-3977] Conversion methods for BlockMatrix to other Distributed Matrices
This commit is contained in:
parent
5b9760de8d
commit
a63be1a18f
|
@ -17,10 +17,12 @@
|
|||
|
||||
package org.apache.spark.mllib.linalg.distributed
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import breeze.linalg.{DenseMatrix => BDM}
|
||||
|
||||
import org.apache.spark.{Logging, Partitioner}
|
||||
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
|
||||
import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
|
@ -182,6 +184,28 @@ class BlockMatrix(
|
|||
this
|
||||
}
|
||||
|
||||
/** Converts to CoordinateMatrix. */
|
||||
def toCoordinateMatrix(): CoordinateMatrix = {
|
||||
val entryRDD = blocks.flatMap { case ((blockRowIndex, blockColIndex), mat) =>
|
||||
val rowStart = blockRowIndex.toLong * rowsPerBlock
|
||||
val colStart = blockColIndex.toLong * colsPerBlock
|
||||
val entryValues = new ArrayBuffer[MatrixEntry]()
|
||||
mat.foreachActive { (i, j, v) =>
|
||||
if (v != 0.0) entryValues.append(new MatrixEntry(rowStart + i, colStart + j, v))
|
||||
}
|
||||
entryValues
|
||||
}
|
||||
new CoordinateMatrix(entryRDD, numRows(), numCols())
|
||||
}
|
||||
|
||||
/** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */
|
||||
def toIndexedRowMatrix(): IndexedRowMatrix = {
|
||||
require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " +
|
||||
s"numCols: ${numCols()}")
|
||||
// TODO: This implementation may be optimized
|
||||
toCoordinateMatrix().toIndexedRowMatrix()
|
||||
}
|
||||
|
||||
/** Collect the distributed matrix on the driver as a `DenseMatrix`. */
|
||||
def toLocalMatrix(): Matrix = {
|
||||
require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " +
|
||||
|
|
|
@ -21,8 +21,7 @@ import breeze.linalg.{DenseMatrix => BDM}
|
|||
|
||||
import org.apache.spark.annotation.Experimental
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.mllib.linalg.Vectors
|
||||
import org.apache.spark.mllib.linalg.{Matrix, SparseMatrix, Vectors}
|
||||
|
||||
/**
|
||||
* :: Experimental ::
|
||||
|
@ -98,6 +97,46 @@ class CoordinateMatrix(
|
|||
toIndexedRowMatrix().toRowMatrix()
|
||||
}
|
||||
|
||||
/** Converts to BlockMatrix. Creates blocks of [[SparseMatrix]] with size 1024 x 1024. */
|
||||
def toBlockMatrix(): BlockMatrix = {
|
||||
toBlockMatrix(1024, 1024)
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts to BlockMatrix. Creates blocks of [[SparseMatrix]].
|
||||
* @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have
|
||||
* a smaller value. Must be an integer value greater than 0.
|
||||
* @param colsPerBlock The number of columns of each block. The blocks at the right edge may have
|
||||
* a smaller value. Must be an integer value greater than 0.
|
||||
* @return a [[BlockMatrix]]
|
||||
*/
|
||||
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
|
||||
require(rowsPerBlock > 0,
|
||||
s"rowsPerBlock needs to be greater than 0. rowsPerBlock: $rowsPerBlock")
|
||||
require(colsPerBlock > 0,
|
||||
s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock")
|
||||
val m = numRows()
|
||||
val n = numCols()
|
||||
val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
|
||||
val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt
|
||||
val partitioner = GridPartitioner(numRowBlocks, numColBlocks, entries.partitions.length)
|
||||
|
||||
val blocks: RDD[((Int, Int), Matrix)] = entries.map { entry =>
|
||||
val blockRowIndex = (entry.i / rowsPerBlock).toInt
|
||||
val blockColIndex = (entry.j / colsPerBlock).toInt
|
||||
|
||||
val rowId = entry.i % rowsPerBlock
|
||||
val colId = entry.j % colsPerBlock
|
||||
|
||||
((blockRowIndex, blockColIndex), (rowId.toInt, colId.toInt, entry.value))
|
||||
}.groupByKey(partitioner).map { case ((blockRowIndex, blockColIndex), entry) =>
|
||||
val effRows = math.min(m - blockRowIndex.toLong * rowsPerBlock, rowsPerBlock).toInt
|
||||
val effCols = math.min(n - blockColIndex.toLong * colsPerBlock, colsPerBlock).toInt
|
||||
((blockRowIndex, blockColIndex), SparseMatrix.fromCOO(effRows, effCols, entry))
|
||||
}
|
||||
new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, m, n)
|
||||
}
|
||||
|
||||
/** Determines the size by computing the max row/column index. */
|
||||
private def computeSize() {
|
||||
// Reduce will throw an exception if `entries` is empty.
|
||||
|
|
|
@ -75,6 +75,24 @@ class IndexedRowMatrix(
|
|||
new RowMatrix(rows.map(_.vector), 0L, nCols)
|
||||
}
|
||||
|
||||
/** Converts to BlockMatrix. Creates blocks of [[SparseMatrix]] with size 1024 x 1024. */
|
||||
def toBlockMatrix(): BlockMatrix = {
|
||||
toBlockMatrix(1024, 1024)
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts to BlockMatrix. Creates blocks of [[SparseMatrix]].
|
||||
* @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have
|
||||
* a smaller value. Must be an integer value greater than 0.
|
||||
* @param colsPerBlock The number of columns of each block. The blocks at the right edge may have
|
||||
* a smaller value. Must be an integer value greater than 0.
|
||||
* @return a [[BlockMatrix]]
|
||||
*/
|
||||
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
|
||||
// TODO: This implementation may be optimized
|
||||
toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts this matrix to a
|
||||
* [[org.apache.spark.mllib.linalg.distributed.CoordinateMatrix]].
|
||||
|
|
|
@ -120,6 +120,20 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("toCoordinateMatrix") {
|
||||
val coordMat = gridBasedMat.toCoordinateMatrix()
|
||||
assert(coordMat.numRows() === m)
|
||||
assert(coordMat.numCols() === n)
|
||||
assert(coordMat.toBreeze() === gridBasedMat.toBreeze())
|
||||
}
|
||||
|
||||
test("toIndexedRowMatrix") {
|
||||
val rowMat = gridBasedMat.toIndexedRowMatrix()
|
||||
assert(rowMat.numRows() === m)
|
||||
assert(rowMat.numCols() === n)
|
||||
assert(rowMat.toBreeze() === gridBasedMat.toBreeze())
|
||||
}
|
||||
|
||||
test("toBreeze and toLocalMatrix") {
|
||||
val expected = BDM(
|
||||
(1.0, 0.0, 0.0, 0.0),
|
||||
|
|
|
@ -100,4 +100,18 @@ class CoordinateMatrixSuite extends FunSuite with MLlibTestSparkContext {
|
|||
Vectors.dense(0.0, 9.0, 0.0, 0.0))
|
||||
assert(rows === expected)
|
||||
}
|
||||
|
||||
test("toBlockMatrix") {
|
||||
val blockMat = mat.toBlockMatrix(2, 2)
|
||||
assert(blockMat.numRows() === m)
|
||||
assert(blockMat.numCols() === n)
|
||||
assert(blockMat.toBreeze() === mat.toBreeze())
|
||||
|
||||
intercept[IllegalArgumentException] {
|
||||
mat.toBlockMatrix(-1, 2)
|
||||
}
|
||||
intercept[IllegalArgumentException] {
|
||||
mat.toBlockMatrix(2, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,6 +88,21 @@ class IndexedRowMatrixSuite extends FunSuite with MLlibTestSparkContext {
|
|||
assert(coordMat.toBreeze() === idxRowMat.toBreeze())
|
||||
}
|
||||
|
||||
test("toBlockMatrix") {
|
||||
val idxRowMat = new IndexedRowMatrix(indexedRows)
|
||||
val blockMat = idxRowMat.toBlockMatrix(2, 2)
|
||||
assert(blockMat.numRows() === m)
|
||||
assert(blockMat.numCols() === n)
|
||||
assert(blockMat.toBreeze() === idxRowMat.toBreeze())
|
||||
|
||||
intercept[IllegalArgumentException] {
|
||||
idxRowMat.toBlockMatrix(-1, 2)
|
||||
}
|
||||
intercept[IllegalArgumentException] {
|
||||
idxRowMat.toBlockMatrix(2, 0)
|
||||
}
|
||||
}
|
||||
|
||||
test("multiply a local matrix") {
|
||||
val A = new IndexedRowMatrix(indexedRows)
|
||||
val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
|
||||
|
|
Loading…
Reference in a new issue