[SPARK-14734][ML][MLLIB] Added asML, fromML methods for all spark.mllib Vector, Matrix types

## What changes were proposed in this pull request?

For maintaining wrappers around spark.mllib algorithms in spark.ml, it will be useful to have ```private[spark]``` methods for converting from one linear algebra representation to another.
This PR adds toNew, fromNew methods for all spark.mllib Vector and Matrix types.

## How was this patch tested?

Unit tests for all conversions

Author: Joseph K. Bradley <joseph@databricks.com>

Closes #12504 from jkbradley/linalg-conversions.
This commit is contained in:
Joseph K. Bradley 2016-04-21 16:50:09 -07:00 committed by DB Tsai
parent e2b5647ab9
commit f25a3ea8d3
5 changed files with 139 additions and 2 deletions

View file

@ -24,7 +24,8 @@ import scala.collection.mutable.{ArrayBuffer, ArrayBuilder => MArrayBuilder, Has
import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM}
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.annotation.Since
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.util.GenericArrayData
@ -158,6 +159,12 @@ sealed trait Matrix extends Serializable {
*/
@Since("1.5.0")
def numActives: Int
/**
* Convert this matrix to the new mllib-local representation.
* This does NOT copy the data; it copies references.
*/
private[spark] def asML: newlinalg.Matrix
}
private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
@ -419,6 +426,10 @@ class DenseMatrix @Since("1.3.0") (
}
}
}
private[spark] override def asML: newlinalg.DenseMatrix = {
new newlinalg.DenseMatrix(numRows, numCols, values, isTransposed)
}
}
/**
@ -515,6 +526,11 @@ object DenseMatrix {
}
matrix
}
/** Convert new linalg type to spark.mllib type. Light copy; only copies references */
private[spark] def fromML(m: newlinalg.DenseMatrix): DenseMatrix = {
new DenseMatrix(m.numRows, m.numCols, m.values, m.isTransposed)
}
}
/**
@ -721,6 +737,10 @@ class SparseMatrix @Since("1.3.0") (
}
}
}
private[spark] override def asML: newlinalg.SparseMatrix = {
new newlinalg.SparseMatrix(numRows, numCols, colPtrs, rowIndices, values, isTransposed)
}
}
/**
@ -895,6 +915,11 @@ object SparseMatrix {
SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1)))
}
}
/** Convert new linalg type to spark.mllib type. Light copy; only copies references */
private[spark] def fromML(m: newlinalg.SparseMatrix): SparseMatrix = {
new SparseMatrix(m.numRows, m.numCols, m.colPtrs, m.rowIndices, m.values, m.isTransposed)
}
}
/**
@ -1177,4 +1202,12 @@ object Matrices {
SparseMatrix.fromCOO(numRows, numCols, entries)
}
}
/** Convert new linalg type to spark.mllib type. Light copy; only copies references */
private[spark] def fromML(m: newlinalg.Matrix): Matrix = m match {
case dm: newlinalg.DenseMatrix =>
DenseMatrix.fromML(dm)
case sm: newlinalg.SparseMatrix =>
SparseMatrix.fromML(sm)
}
}

View file

@ -30,6 +30,7 @@ import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}
import org.apache.spark.SparkException
import org.apache.spark.annotation.{AlphaComponent, Since}
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.NumericParser
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
@ -180,6 +181,12 @@ sealed trait Vector extends Serializable {
*/
@Since("1.6.0")
def toJson: String
/**
* Convert this vector to the new mllib-local representation.
* This does NOT copy the data; it copies references.
*/
private[spark] def asML: newlinalg.Vector
}
/**
@ -573,6 +580,14 @@ object Vectors {
/** Max number of nonzero entries used in computing hash code. */
private[linalg] val MAX_HASH_NNZ = 128
/** Convert new linalg type to spark.mllib type. Light copy; only copies references */
private[spark] def fromML(v: newlinalg.Vector): Vector = v match {
case dv: newlinalg.DenseVector =>
DenseVector.fromML(dv)
case sv: newlinalg.SparseVector =>
SparseVector.fromML(sv)
}
}
/**
@ -686,6 +701,10 @@ class DenseVector @Since("1.0.0") (
val jValue = ("type" -> 1) ~ ("values" -> values.toSeq)
compact(render(jValue))
}
private[spark] override def asML: newlinalg.DenseVector = {
new newlinalg.DenseVector(values)
}
}
@Since("1.3.0")
@ -694,6 +713,11 @@ object DenseVector {
/** Extracts the value array from a dense vector. */
@Since("1.3.0")
def unapply(dv: DenseVector): Option[Array[Double]] = Some(dv.values)
/** Convert new linalg type to spark.mllib type. Light copy; only copies references */
private[spark] def fromML(v: newlinalg.DenseVector): DenseVector = {
new DenseVector(v.values)
}
}
/**
@ -882,6 +906,10 @@ class SparseVector @Since("1.0.0") (
("values" -> values.toSeq)
compact(render(jValue))
}
private[spark] override def asML: newlinalg.SparseVector = {
new newlinalg.SparseVector(size, indices, values)
}
}
@Since("1.3.0")
@ -889,4 +917,9 @@ object SparseVector {
@Since("1.3.0")
def unapply(sv: SparseVector): Option[(Int, Array[Int], Array[Double])] =
Some((sv.size, sv.indices, sv.values))
/** Convert new linalg type to spark.mllib type. Light copy; only copies references */
private[spark] def fromML(v: newlinalg.SparseVector): SparseVector = {
new SparseVector(v.size, v.indices, v.values)
}
}

View file

@ -19,12 +19,14 @@ package org.apache.spark.mllib.linalg
import java.util.Random
import scala.collection.mutable.{Map => MutableMap}
import breeze.linalg.{CSCMatrix, Matrix => BM}
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar._
import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.TestingUtils._
class MatricesSuite extends SparkFunSuite {
@ -523,4 +525,39 @@ class MatricesSuite extends SparkFunSuite {
assert(m.transpose.colIter.toSeq === rows)
}
}
test("conversions between new local linalg and mllib linalg") {
val dm: DenseMatrix = new DenseMatrix(3, 2, Array(0.0, 0.0, 1.0, 0.0, 2.0, 3.5))
val sm: SparseMatrix = dm.toSparse
val sm0: Matrix = sm.asInstanceOf[Matrix]
val dm0: Matrix = dm.asInstanceOf[Matrix]
def compare(oldM: Matrix, newM: newlinalg.Matrix): Unit = {
assert(oldM.toArray === newM.toArray)
assert(oldM.numCols === newM.numCols)
assert(oldM.numRows === newM.numRows)
}
val newSM: newlinalg.SparseMatrix = sm.asML
val newDM: newlinalg.DenseMatrix = dm.asML
val newSM0: newlinalg.Matrix = sm0.asML
val newDM0: newlinalg.Matrix = dm0.asML
assert(newSM0.isInstanceOf[newlinalg.SparseMatrix])
assert(newDM0.isInstanceOf[newlinalg.DenseMatrix])
compare(sm, newSM)
compare(dm, newDM)
compare(sm0, newSM0)
compare(dm0, newDM0)
val oldSM: SparseMatrix = SparseMatrix.fromML(newSM)
val oldDM: DenseMatrix = DenseMatrix.fromML(newDM)
val oldSM0: Matrix = Matrices.fromML(newSM0)
val oldDM0: Matrix = Matrices.fromML(newDM0)
assert(oldSM0.isInstanceOf[SparseMatrix])
assert(oldDM0.isInstanceOf[DenseMatrix])
compare(oldSM, newSM)
compare(oldDM, newDM)
compare(oldSM0, newSM0)
compare(oldDM0, newDM0)
}
}

View file

@ -24,6 +24,7 @@ import org.json4s.jackson.JsonMethods.{parse => parseJson}
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.TestingUtils._
class VectorsSuite extends SparkFunSuite with Logging {
@ -392,4 +393,33 @@ class VectorsSuite extends SparkFunSuite with Logging {
assert(u === v, "toJson/fromJson should preserve vector values.")
}
}
test("conversions between new local linalg and mllib linalg") {
val dv: DenseVector = new DenseVector(Array(1.0, 2.0, 3.5))
val sv: SparseVector = new SparseVector(5, Array(1, 2, 4), Array(1.1, 2.2, 4.4))
val sv0: Vector = sv.asInstanceOf[Vector]
val dv0: Vector = dv.asInstanceOf[Vector]
val newSV: newlinalg.SparseVector = sv.asML
val newDV: newlinalg.DenseVector = dv.asML
val newSV0: newlinalg.Vector = sv0.asML
val newDV0: newlinalg.Vector = dv0.asML
assert(newSV0.isInstanceOf[newlinalg.SparseVector])
assert(newDV0.isInstanceOf[newlinalg.DenseVector])
assert(sv.toArray === newSV.toArray)
assert(dv.toArray === newDV.toArray)
assert(sv0.toArray === newSV0.toArray)
assert(dv0.toArray === newDV0.toArray)
val oldSV: SparseVector = SparseVector.fromML(newSV)
val oldDV: DenseVector = DenseVector.fromML(newDV)
val oldSV0: Vector = Vectors.fromML(newSV0)
val oldDV0: Vector = Vectors.fromML(newDV0)
assert(oldSV0.isInstanceOf[SparseVector])
assert(oldDV0.isInstanceOf[DenseVector])
assert(oldSV.toArray === newSV.toArray)
assert(oldDV.toArray === newDV.toArray)
assert(oldSV0.toArray === newSV0.toArray)
assert(oldDV0.toArray === newDV0.toArray)
}
}

View file

@ -659,6 +659,10 @@ object MimaExcludes {
// [SPARK-14407] Hides HadoopFsRelation related data source API into execution package
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.OutputWriter"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.OutputWriterFactory")
) ++ Seq(
// SPARK-14734: Add conversions between mllib and ml Vector, Matrix types
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.mllib.linalg.Vector.asML"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.mllib.linalg.Matrix.asML")
) ++ Seq(
// SPARK-14704: Create accumulators in TaskMetrics
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.this"),