[SPARK-5521] PCA wrapper for easy transform vectors
I implement a simple PCA wrapper for easy transform of vectors by PCA for example LabeledPoint or another complicated structure. Example of usage: ``` import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.feature.PCA val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) val pca = PCA.create(training.first().features.size/2, data.map(_.features)) val training_pca = training.map(p => p.copy(features = pca.transform(p.features))) val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) val numIterations = 100 val model = LinearRegressionWithSGD.train(training, numIterations) val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations) val valuesAndPreds = test.map { point => val score = model.predict(point.features) (score, point.label) } val valuesAndPreds_pca = test_pca.map { point => val score = model_pca.predict(point.features) (score, point.label) } val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean() println("Mean Squared Error = " + MSE) println("PCA Mean Squared Error = " + MSE_pca) ``` Author: Kirill A. Korinskiy <catap@catap.ru> Author: Joseph K. Bradley <joseph@databricks.com> Closes #4304 from catap/pca and squashes the following commits: 501bcd9 [Joseph K. Bradley] Small updates: removed k from Java-friendly PCA fit(). In PCASuite, converted results to set for comparison. Added an error message for bad k in PCA. 9dcc02b [Kirill A. Korinskiy] [SPARK-5521] fix scala style 1892a06 [Kirill A. Korinskiy] [SPARK-5521] PCA wrapper for easy transform vectors
This commit is contained in:
parent
3038443e58
commit
8c07c75c98
|
@ -137,7 +137,7 @@ statistical method to find a rotation such that the first coordinate has the lar
|
||||||
possible, and each succeeding coordinate in turn has the largest variance possible. The columns of
|
possible, and each succeeding coordinate in turn has the largest variance possible. The columns of
|
||||||
the rotation matrix are called principal components. PCA is used widely in dimensionality reduction.
|
the rotation matrix are called principal components. PCA is used widely in dimensionality reduction.
|
||||||
|
|
||||||
MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format.
|
MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format and any Vectors.
|
||||||
|
|
||||||
<div class="codetabs">
|
<div class="codetabs">
|
||||||
<div data-lang="scala" markdown="1">
|
<div data-lang="scala" markdown="1">
|
||||||
|
@ -157,6 +157,23 @@ val pc: Matrix = mat.computePrincipalComponents(10) // Principal components are
|
||||||
// Project the rows to the linear space spanned by the top 10 principal components.
|
// Project the rows to the linear space spanned by the top 10 principal components.
|
||||||
val projected: RowMatrix = mat.multiply(pc)
|
val projected: RowMatrix = mat.multiply(pc)
|
||||||
{% endhighlight %}
|
{% endhighlight %}
|
||||||
|
|
||||||
|
The following code demonstrates how to compute principal components on source vectors
|
||||||
|
and use them to project the vectors into a low-dimensional space while keeping associated labels:
|
||||||
|
|
||||||
|
{% highlight scala %}
|
||||||
|
import org.apache.spark.mllib.regression.LabeledPoint
|
||||||
|
import org.apache.spark.mllib.feature.PCA
|
||||||
|
|
||||||
|
val data: RDD[LabeledPoint] = ...
|
||||||
|
|
||||||
|
// Compute the top 10 principal components.
|
||||||
|
val pca = new PCA(10).fit(data.map(_.features))
|
||||||
|
|
||||||
|
// Project vectors to the linear space spanned by the top 10 principal components, keeping the label
|
||||||
|
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
|
||||||
|
{% endhighlight %}
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div data-lang="java" markdown="1">
|
<div data-lang="java" markdown="1">
|
||||||
|
|
|
@ -507,7 +507,6 @@ v_N
|
||||||
|
|
||||||
This example below demonstrates how to load a simple vectors file, extract a set of vectors, then transform those vectors using a transforming vector value.
|
This example below demonstrates how to load a simple vectors file, extract a set of vectors, then transform those vectors using a transforming vector value.
|
||||||
|
|
||||||
|
|
||||||
<div class="codetabs">
|
<div class="codetabs">
|
||||||
<div data-lang="scala">
|
<div data-lang="scala">
|
||||||
{% highlight scala %}
|
{% highlight scala %}
|
||||||
|
@ -531,3 +530,57 @@ val transformedData2 = parsedData.map(x => transformer.transform(x))
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
|
||||||
|
## PCA
|
||||||
|
|
||||||
|
A feature transformer that projects vectors to a low-dimensional space using PCA.
|
||||||
|
Details you can read at [dimensionality reduction](mllib-dimensionality-reduction.html).
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
The following code demonstrates how to compute principal components on a `Vector`
|
||||||
|
and use them to project the vectors into a low-dimensional space while keeping associated labels
|
||||||
|
for calculation a [Linear Regression]((mllib-linear-methods.html))
|
||||||
|
|
||||||
|
<div class="codetabs">
|
||||||
|
<div data-lang="scala">
|
||||||
|
{% highlight scala %}
|
||||||
|
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
|
||||||
|
import org.apache.spark.mllib.regression.LabeledPoint
|
||||||
|
import org.apache.spark.mllib.linalg.Vectors
|
||||||
|
import org.apache.spark.mllib.feature.PCA
|
||||||
|
|
||||||
|
val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line =>
|
||||||
|
val parts = line.split(',')
|
||||||
|
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
|
||||||
|
}.cache()
|
||||||
|
|
||||||
|
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
|
||||||
|
val training = splits(0).cache()
|
||||||
|
val test = splits(1)
|
||||||
|
|
||||||
|
val pca = new PCA(training.first().features.size/2).fit(data.map(_.features))
|
||||||
|
val training_pca = training.map(p => p.copy(features = pca.transform(p.features)))
|
||||||
|
val test_pca = test.map(p => p.copy(features = pca.transform(p.features)))
|
||||||
|
|
||||||
|
val numIterations = 100
|
||||||
|
val model = LinearRegressionWithSGD.train(training, numIterations)
|
||||||
|
val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations)
|
||||||
|
|
||||||
|
val valuesAndPreds = test.map { point =>
|
||||||
|
val score = model.predict(point.features)
|
||||||
|
(score, point.label)
|
||||||
|
}
|
||||||
|
|
||||||
|
val valuesAndPreds_pca = test_pca.map { point =>
|
||||||
|
val score = model_pca.predict(point.features)
|
||||||
|
(score, point.label)
|
||||||
|
}
|
||||||
|
|
||||||
|
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
|
||||||
|
val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean()
|
||||||
|
|
||||||
|
println("Mean Squared Error = " + MSE)
|
||||||
|
println("PCA Mean Squared Error = " + MSE_pca)
|
||||||
|
{% endhighlight %}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.mllib.feature
|
||||||
|
|
||||||
|
import org.apache.spark.api.java.JavaRDD
|
||||||
|
import org.apache.spark.mllib.linalg._
|
||||||
|
import org.apache.spark.mllib.linalg.distributed.RowMatrix
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A feature transformer that projects vectors to a low-dimensional space using PCA.
|
||||||
|
*
|
||||||
|
* @param k number of principal components
|
||||||
|
*/
|
||||||
|
class PCA(val k: Int) {
|
||||||
|
require(k >= 1, s"PCA requires a number of principal components k >= 1 but was given $k")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes a [[PCAModel]] that contains the principal components of the input vectors.
|
||||||
|
*
|
||||||
|
* @param sources source vectors
|
||||||
|
*/
|
||||||
|
def fit(sources: RDD[Vector]): PCAModel = {
|
||||||
|
require(k <= sources.first().size,
|
||||||
|
s"source vector size is ${sources.first().size} must be greater than k=$k")
|
||||||
|
|
||||||
|
val mat = new RowMatrix(sources)
|
||||||
|
val pc = mat.computePrincipalComponents(k) match {
|
||||||
|
case dm: DenseMatrix =>
|
||||||
|
dm
|
||||||
|
case sm: SparseMatrix =>
|
||||||
|
/* Convert a sparse matrix to dense.
|
||||||
|
*
|
||||||
|
* RowMatrix.computePrincipalComponents always returns a dense matrix.
|
||||||
|
* The following code is a safeguard.
|
||||||
|
*/
|
||||||
|
sm.toDense
|
||||||
|
case m =>
|
||||||
|
throw new IllegalArgumentException("Unsupported matrix format. Expected " +
|
||||||
|
s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}")
|
||||||
|
|
||||||
|
}
|
||||||
|
new PCAModel(k, pc)
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Java-friendly version of [[fit()]] */
|
||||||
|
def fit(sources: JavaRDD[Vector]): PCAModel = fit(sources.rdd)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Model fitted by [[PCA]] that can project vectors to a low-dimensional space using PCA.
|
||||||
|
*
|
||||||
|
* @param k number of principal components.
|
||||||
|
* @param pc a principal components Matrix. Each column is one principal component.
|
||||||
|
*/
|
||||||
|
class PCAModel private[mllib] (val k: Int, val pc: DenseMatrix) extends VectorTransformer {
|
||||||
|
/**
|
||||||
|
* Transform a vector by computed Principal Components.
|
||||||
|
*
|
||||||
|
* @param vector vector to be transformed.
|
||||||
|
* Vector must be the same length as the source vectors given to [[PCA.fit()]].
|
||||||
|
* @return transformed vector. Vector will be of length k.
|
||||||
|
*/
|
||||||
|
override def transform(vector: Vector): Vector = {
|
||||||
|
vector match {
|
||||||
|
case dv: DenseVector =>
|
||||||
|
pc.transpose.multiply(dv)
|
||||||
|
case SparseVector(size, indices, values) =>
|
||||||
|
/* SparseVector -> single row SparseMatrix */
|
||||||
|
val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose
|
||||||
|
val projection = sm.multiply(pc)
|
||||||
|
Vectors.dense(projection.values)
|
||||||
|
case _ =>
|
||||||
|
throw new IllegalArgumentException("Unsupported vector format. Expected " +
|
||||||
|
s"SparseVector or DenseVector. Instead got: ${vector.getClass}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.mllib.feature
|
||||||
|
|
||||||
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
|
import org.apache.spark.mllib.linalg.Vectors
|
||||||
|
import org.apache.spark.mllib.linalg.distributed.RowMatrix
|
||||||
|
import org.apache.spark.mllib.util.MLlibTestSparkContext
|
||||||
|
|
||||||
|
class PCASuite extends FunSuite with MLlibTestSparkContext {
|
||||||
|
|
||||||
|
private val data = Array(
|
||||||
|
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
|
||||||
|
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
|
||||||
|
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
|
||||||
|
)
|
||||||
|
|
||||||
|
private lazy val dataRDD = sc.parallelize(data, 2)
|
||||||
|
|
||||||
|
test("Correct computing use a PCA wrapper") {
|
||||||
|
val k = dataRDD.count().toInt
|
||||||
|
val pca = new PCA(k).fit(dataRDD)
|
||||||
|
|
||||||
|
val mat = new RowMatrix(dataRDD)
|
||||||
|
val pc = mat.computePrincipalComponents(k)
|
||||||
|
|
||||||
|
val pca_transform = pca.transform(dataRDD).collect()
|
||||||
|
val mat_multiply = mat.multiply(pc).rows.collect()
|
||||||
|
|
||||||
|
assert(pca_transform.toSet === mat_multiply.toSet)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue