[SPARK-35423][ML] PCA results should be consistent, If the Matrix contains both Sparse and Dense vectors

### What changes were proposed in this pull request?
If the dataset contains mix of sparse and dense vectors output of PCA seems different. The issue here is we check only the first row's Vector type. If the first row is dense and rest all the row's are sparse, we compute PCA based on dense path. Similarly, if only first row in Sparse and rest all the rows are dense, we compute based on Sparse computation path.

Following datasets will produce different results with PCA, even though the data is same, except first row type is sparse.
```
val data1 = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
```

```
+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+

```
```
val data1 = Array(
  Vectors.dense(0.0, 1.0, 0.0, 7.0, 0.0 ),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
```

```
+------------------------------------------------------------+
|pcaFeatures                                                 |
+------------------------------------------------------------+
|[1.6485728230883814,-4.0132827005162985,-1.0091435193998504]|
|[-4.645104331781533,-1.1167972663619048,-1.0091435193998501]|
|[-6.428880535676488,-5.337951427775359,-1.009143519399851]  |
+------------------------------------------------------------+
```

### Why are the changes needed?
To fix inconsistent result if dataset contains both sparse and dense vectors. We need to treat the entire metrics as Sparse ONLY if all the rows are sparse. Otherwise we need to consider the matrix as dense. This PR can be a followup for the PR: https://github.com/apache/spark/pull/23126

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UTs

Closes #32734 from shahidki31/shahid/pca.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
shahid 2021-06-09 10:23:46 -05:00 committed by Sean Owen
parent 313dc2d4ed
commit 519be238be
3 changed files with 50 additions and 2 deletions

View file

@ -250,6 +250,13 @@ sealed trait Vector extends Serializable {
*/
private[spark] def nonZeroIterator: Iterator[(Int, Double)] =
activeIterator.filter(_._2 != 0)
/**
* Returns the ratio of number of zeros by total number of values.
*/
private[spark] def sparsity(): Double = {
1.0 - numNonzeros.toDouble / size
}
}
/**

View file

@ -421,6 +421,11 @@ class RowMatrix @Since("1.0.0") (
}
}
// The matrix is sparse, if all the rows has sparsity more than 0.5.
private def isSparseMatrix: Boolean = {
rows.filter(row => row.sparsity() < 0.5).isEmpty()
}
/**
* Computes the covariance matrix, treating each row as an observation.
*
@ -438,8 +443,8 @@ class RowMatrix @Since("1.0.0") (
require(m > 1, s"RowMatrix.computeCovariance called on matrix with only $m rows." +
" Cannot compute the covariance of a RowMatrix with <= 1 row.")
val mean = Vectors.fromML(summary.mean)
if (rows.first().isInstanceOf[DenseVector]) {
// If all the rows are sparse vectors, then compute based on `computeSparseVectorCovariance`.
if (!isSparseMatrix) {
computeDenseVectorCovariance(mean, n, m)
} else {
computeSparseVectorCovariance(mean, n, m)

View file

@ -69,6 +69,42 @@ class PCASuite extends MLTest with DefaultReadWriteTest {
}
}
test("dataset with dense vectors and sparse vectors should produce same results") {
val data1 = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val data2 = Array(
Vectors.dense(0.0, 1.0, 0.0, 7.0, 0.0),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val data3 = data1.map(_.toSparse)
val data4 = data1.map(_.toDense)
val df1 = spark.createDataFrame(data1.map(Tuple1.apply)).toDF("features")
val df2 = spark.createDataFrame(data2.map(Tuple1.apply)).toDF("features")
val df3 = spark.createDataFrame(data3.map(Tuple1.apply)).toDF("features")
val df4 = spark.createDataFrame(data4.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
val pcaModel1 = pca.fit(df1)
val pcaModel2 = pca.fit(df2)
val pcaModel3 = pca.fit(df3)
val pcaModel4 = pca.fit(df4)
assert(pcaModel1.explainedVariance == pcaModel2.explainedVariance)
assert(pcaModel1.explainedVariance == pcaModel3.explainedVariance)
assert(pcaModel1.explainedVariance == pcaModel4.explainedVariance)
assert(pcaModel1.pc === pcaModel2.pc)
assert(pcaModel1.pc === pcaModel3.pc)
assert(pcaModel1.pc === pcaModel4.pc)
}
test("PCA read/write") {
val t = new PCA()
.setInputCol("myInputCol")