[SPARK-26158][MLLIB] fix covariance accuracy problem for DenseVector

## What changes were proposed in this pull request?
Enhance accuracy of the covariance logic in RowMatrix for function computeCovariance

## How was this patch tested?
Unit test
Accuracy test

Closes #23126 from KyleLi1985/master.

Authored-by: 李亮 <liang.li.work@outlook.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
李亮 2018-11-29 13:08:53 -06:00 committed by Sean Owen
parent 1144df3b5d
commit 9fdc7a840d
3 changed files with 90 additions and 24 deletions

View file

@ -128,6 +128,77 @@ class RowMatrix @Since("1.0.0") (
RowMatrix.triuToFull(n, GU.data)
}
private def computeDenseVectorCovariance(mean: Vector, n: Int, m: Long): Matrix = {
val bc = rows.context.broadcast(mean)
// Computes n*(n+1)/2, avoiding overflow in the multiplication.
// This succeeds when n <= 65535, which is checked above
val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2))
val MU = rows.treeAggregate(new BDV[Double](nt))(
seqOp = (U, v) => {
val n = v.size
val na = Array.ofDim[Double](n)
val means = bc.value
val ta = v.toArray
for (index <- 0 until n) {
na(index) = ta(index) - means(index)
}
BLAS.spr(1.0, new DenseVector(na), U.data)
U
}, combOp = (U1, U2) => U1 += U2)
bc.destroy()
val M = RowMatrix.triuToFull(n, MU.data).asBreeze
var i = 0
var j = 0
val m1 = m - 1.0
while (i < n) {
j = i
while (j < n) {
val Mij = M(i, j) / m1
M(i, j) = Mij
M(j, i) = Mij
j += 1
}
i += 1
}
Matrices.fromBreeze(M)
}
private def computeSparseVectorCovariance(mean: Vector, n: Int, m: Long): Matrix = {
// We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is
// large but Cov(X, Y) is small, but it is good for sparse computation.
// TODO: find a fast and stable way for sparse data.
val G = computeGramianMatrix().asBreeze
var i = 0
var j = 0
val m1 = m - 1.0
var alpha = 0.0
while (i < n) {
alpha = m / m1 * mean(i)
j = i
while (j < n) {
val Gij = G(i, j) / m1 - alpha * mean(j)
G(i, j) = Gij
G(j, i) = Gij
j += 1
}
i += 1
}
Matrices.fromBreeze(G)
}
private def checkNumColumns(cols: Int): Unit = {
if (cols > 65535) {
throw new IllegalArgumentException(s"Argument with more than 65535 cols: $cols")
@ -337,29 +408,11 @@ class RowMatrix @Since("1.0.0") (
" Cannot compute the covariance of a RowMatrix with <= 1 row.")
val mean = summary.mean
// We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is
// large but Cov(X, Y) is small, but it is good for sparse computation.
// TODO: find a fast and stable way for sparse data.
val G = computeGramianMatrix().asBreeze
var i = 0
var j = 0
val m1 = m - 1.0
var alpha = 0.0
while (i < n) {
alpha = m / m1 * mean(i)
j = i
while (j < n) {
val Gij = G(i, j) / m1 - alpha * mean(j)
G(i, j) = Gij
G(j, i) = Gij
j += 1
}
i += 1
if (rows.first().isInstanceOf[DenseVector]) {
computeDenseVectorCovariance(mean, n, m)
} else {
computeSparseVectorCovariance(mean, n, m)
}
Matrices.fromBreeze(G)
}
/**

View file

@ -28,7 +28,6 @@ import org.apache.spark.SharedSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.mllib.linalg.DenseVector;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.sql.Dataset;
@ -67,7 +66,7 @@ public class JavaPCASuite extends SharedSparkSession {
JavaRDD<Vector> dataRDD = jsc.parallelize(points, 2);
RowMatrix mat = new RowMatrix(dataRDD.map(
(Vector vector) -> (org.apache.spark.mllib.linalg.Vector) new DenseVector(vector.toArray())
(Vector vector) -> org.apache.spark.mllib.linalg.Vectors.fromML(vector)
).rdd());
Matrix pc = mat.computePrincipalComponents(3);

View file

@ -266,6 +266,20 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
}
}
test("dense vector covariance accuracy (SPARK-26158)") {
val denseData = Seq(
Vectors.dense(100000.000004, 199999.999999),
Vectors.dense(100000.000012, 200000.000002),
Vectors.dense(99999.9999931, 200000.000003),
Vectors.dense(99999.9999977, 200000.000001)
)
val denseMat = new RowMatrix(sc.parallelize(denseData, 2))
val result = denseMat.computeCovariance()
val expected = breeze.linalg.cov(denseMat.toBreeze())
assert(closeToZero(abs(expected) - abs(result.asBreeze.asInstanceOf[BDM[Double]])))
}
test("compute covariance") {
for (mat <- Seq(denseMat, sparseMat)) {
val result = mat.computeCovariance()