[SPARK-19368][MLLIB] BlockMatrix.toIndexedRowMatrix() optimization for sparse matrices

## What changes were proposed in this pull request?

Optimization [SPARK-12869] was made for dense matrices but caused great performance issue for sparse matrices because manipulating them is very inefficient. When manipulating sparse matrices in Breeze we better use VectorBuilder.

## How was this patch tested?

checked it against a use case that we have that after moving to Spark 2 took 6.5 hours instead of 20 mins. After the change it is back to 20 mins again.

Closes #16732 from uzadude/SparseVector_optimization.

Authored-by: oraviv <oraviv@paypal.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
oraviv 2018-11-22 15:48:01 -06:00 committed by Sean Owen
parent dd8c179c28
commit d81d95a7e8

View file

@ -17,10 +17,9 @@
package org.apache.spark.mllib.linalg.distributed
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM}
import scala.collection.mutable.ArrayBuffer
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM, SparseVector => BSV, Vector => BV}
import org.apache.spark.{Partitioner, SparkException}
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
@ -28,6 +27,7 @@ import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
/**
* A grid partitioner, which uses a regular grid to partition coordinates.
*
@ -273,24 +273,37 @@ class BlockMatrix @Since("1.3.0") (
require(cols < Int.MaxValue, s"The number of columns should be less than Int.MaxValue ($cols).")
val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) =>
mat.rowIter.zipWithIndex.map {
mat.rowIter.zipWithIndex.filter(_._1.size > 0).map {
case (vector, rowIdx) =>
blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector.asBreeze))
blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector))
}
}.groupByKey().map { case (rowIdx, vectors) =>
val numberNonZeroPerRow = vectors.map(_._2.activeSize).sum.toDouble / cols.toDouble
val numberNonZero = vectors.map(_._2.numActives).sum
val numberNonZeroPerRow = numberNonZero.toDouble / cols.toDouble
val wholeVector = if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz
BSV.zeros[Double](cols)
} else {
BDV.zeros[Double](cols)
}
val wholeVector =
if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz
val arrBufferIndices = new ArrayBuffer[Int](numberNonZero)
val arrBufferValues = new ArrayBuffer[Double](numberNonZero)
vectors.foreach { case (blockColIdx: Int, vec: BV[_]) =>
val offset = colsPerBlock * blockColIdx
wholeVector(offset until Math.min(cols, offset + colsPerBlock)) := vec
}
new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector))
vectors.foreach { case (blockColIdx: Int, vec: Vector) =>
val offset = colsPerBlock * blockColIdx
vec.foreachActive { case (colIdx: Int, value: Double) =>
arrBufferIndices += offset + colIdx
arrBufferValues += value
}
}
Vectors.sparse(cols, arrBufferIndices.toArray, arrBufferValues.toArray)
} else {
val wholeVectorBuf = BDV.zeros[Double](cols)
vectors.foreach { case (blockColIdx: Int, vec: Vector) =>
val offset = colsPerBlock * blockColIdx
wholeVectorBuf(offset until Math.min(cols, offset + colsPerBlock)) := vec.asBreeze
}
Vectors.fromBreeze(wholeVectorBuf)
}
IndexedRow(rowIdx, wholeVector)
}
new IndexedRowMatrix(rows)
}