[SPARK-5894] [ML] Add polynomial mapper

See [SPARK-5894](https://issues.apache.org/jira/browse/SPARK-5894).

Author: Xusen Yin <yinxusen@gmail.com>
Author: Xiangrui Meng <meng@databricks.com>

Closes #5245 from yinxusen/SPARK-5894 and squashes the following commits:

dc461a6 [Xusen Yin] merge polynomial expansion v2
6d0c3cc [Xusen Yin] Merge branch 'SPARK-5894' of https://github.com/mengxr/spark into mengxr-SPARK-5894
57bfdd5 [Xusen Yin] Merge branch 'master' into SPARK-5894
3d02a7d [Xusen Yin] Merge branch 'master' into SPARK-5894
a067da2 [Xiangrui Meng] a new approach for poly expansion
0789d81 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5894
4e9aed0 [Xusen Yin] fix test suite
95d8fb9 [Xusen Yin] fix sparse vector indices
8d39674 [Xusen Yin] fix sparse vector expansion error
5998dd6 [Xusen Yin] fix dense vector fillin
fa3ade3 [Xusen Yin] change the functional code into imperative one to speedup
b70e7e1 [Xusen Yin] remove useless case class
6fa236f [Xusen Yin] fix vector slice error
daff601 [Xusen Yin] fix index error of sparse vector
6bd0a10 [Xusen Yin] merge repeated features
419f8a2 [Xusen Yin] need to merge same columns
4ebf34e [Xusen Yin] add test suite of polynomial expansion
372227c [Xusen Yin] add polynomial expansion
This commit is contained in:
Xusen Yin 2015-04-24 00:39:29 -07:00 committed by Xiangrui Meng
parent 4c722d77ae
commit 8509519d8b
2 changed files with 271 additions and 0 deletions

View file

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.feature
import scala.collection.mutable
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.mllib.linalg._
import org.apache.spark.sql.types.DataType
/**
* :: AlphaComponent ::
* Perform feature expansion in a polynomial space. As said in wikipedia of Polynomial Expansion,
* which is available at [[http://en.wikipedia.org/wiki/Polynomial_expansion]], "In mathematics, an
* expansion of a product of sums expresses it as a sum of products by using the fact that
* multiplication distributes over addition". Take a 2-variable feature vector as an example:
* `(x, y)`, if we want to expand it with degree 2, then we get `(x, y, x * x, x * y, y * y)`.
*/
@AlphaComponent
class PolynomialExpansion extends UnaryTransformer[Vector, Vector, PolynomialExpansion] {
/**
* The polynomial degree to expand, which should be larger than 1.
* @group param
*/
val degree = new IntParam(this, "degree", "the polynomial degree to expand")
setDefault(degree -> 2)
/** @group getParam */
def getDegree: Int = getOrDefault(degree)
/** @group setParam */
def setDegree(value: Int): this.type = set(degree, value)
override protected def createTransformFunc(paramMap: ParamMap): Vector => Vector = { v =>
val d = paramMap(degree)
PolynomialExpansion.expand(v, d)
}
override protected def outputDataType: DataType = new VectorUDT()
}
/**
* The expansion is done via recursion. Given n features and degree d, the size after expansion is
* (n + d choose d) (including 1 and first-order values). For example, let f([a, b, c], 3) be the
* function that expands [a, b, c] to their monomials of degree 3. We have the following recursion:
*
* {{{
* f([a, b, c], 3) = f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * c^2 ++ [c^3]
* }}}
*
* To handle sparsity, if c is zero, we can skip all monomials that contain it. We remember the
* current index and increment it properly for sparse input.
*/
object PolynomialExpansion {
private def choose(n: Int, k: Int): Int = {
Range(n, n - k, -1).product / Range(k, 1, -1).product
}
private def getPolySize(numFeatures: Int, degree: Int): Int = choose(numFeatures + degree, degree)
private def expandDense(
values: Array[Double],
lastIdx: Int,
degree: Int,
multiplier: Double,
polyValues: Array[Double],
curPolyIdx: Int): Int = {
if (multiplier == 0.0) {
// do nothing
} else if (degree == 0 || lastIdx < 0) {
polyValues(curPolyIdx) = multiplier
} else {
val v = values(lastIdx)
val lastIdx1 = lastIdx - 1
var alpha = multiplier
var i = 0
var curStart = curPolyIdx
while (i <= degree && alpha != 0.0) {
curStart = expandDense(values, lastIdx1, degree - i, alpha, polyValues, curStart)
i += 1
alpha *= v
}
}
curPolyIdx + getPolySize(lastIdx + 1, degree)
}
private def expandSparse(
indices: Array[Int],
values: Array[Double],
lastIdx: Int,
lastFeatureIdx: Int,
degree: Int,
multiplier: Double,
polyIndices: mutable.ArrayBuilder[Int],
polyValues: mutable.ArrayBuilder[Double],
curPolyIdx: Int): Int = {
if (multiplier == 0.0) {
// do nothing
} else if (degree == 0 || lastIdx < 0) {
polyIndices += curPolyIdx
polyValues += multiplier
} else {
// Skip all zeros at the tail.
val v = values(lastIdx)
val lastIdx1 = lastIdx - 1
val lastFeatureIdx1 = indices(lastIdx) - 1
var alpha = multiplier
var curStart = curPolyIdx
var i = 0
while (i <= degree && alpha != 0.0) {
curStart = expandSparse(indices, values, lastIdx1, lastFeatureIdx1, degree - i, alpha,
polyIndices, polyValues, curStart)
i += 1
alpha *= v
}
}
curPolyIdx + getPolySize(lastFeatureIdx + 1, degree)
}
private def expand(dv: DenseVector, degree: Int): DenseVector = {
val n = dv.size
val polySize = getPolySize(n, degree)
val polyValues = new Array[Double](polySize)
expandDense(dv.values, n - 1, degree, 1.0, polyValues, 0)
new DenseVector(polyValues)
}
private def expand(sv: SparseVector, degree: Int): SparseVector = {
val polySize = getPolySize(sv.size, degree)
val nnz = sv.values.length
val nnzPolySize = getPolySize(nnz, degree)
val polyIndices = mutable.ArrayBuilder.make[Int]
polyIndices.sizeHint(nnzPolySize)
val polyValues = mutable.ArrayBuilder.make[Double]
polyValues.sizeHint(nnzPolySize)
expandSparse(
sv.indices, sv.values, nnz - 1, sv.size - 1, degree, 1.0, polyIndices, polyValues, 0)
new SparseVector(polySize, polyIndices.result(), polyValues.result())
}
def expand(v: Vector, degree: Int): Vector = {
v match {
case dv: DenseVector => expand(dv, degree)
case sv: SparseVector => expand(sv, degree)
case _ => throw new IllegalArgumentException
}
}
}

View file

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.feature
import org.scalatest.FunSuite
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.sql.{Row, SQLContext}
import org.scalatest.exceptions.TestFailedException
class PolynomialExpansionSuite extends FunSuite with MLlibTestSparkContext {
@transient var sqlContext: SQLContext = _
override def beforeAll(): Unit = {
super.beforeAll()
sqlContext = new SQLContext(sc)
}
test("Polynomial expansion with default parameter") {
val data = Array(
Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
Vectors.dense(-2.0, 2.3),
Vectors.dense(0.0, 0.0, 0.0),
Vectors.dense(0.6, -1.1, -3.0),
Vectors.sparse(3, Seq())
)
val twoDegreeExpansion: Array[Vector] = Array(
Vectors.sparse(10, Array(0, 1, 2, 3, 4, 5), Array(1.0, -2.0, 4.0, 2.3, -4.6, 5.29)),
Vectors.dense(1.0, -2.0, 4.0, 2.3, -4.6, 5.29),
Vectors.dense(Array(1.0) ++ Array.fill[Double](9)(0.0)),
Vectors.dense(1.0, 0.6, 0.36, -1.1, -0.66, 1.21, -3.0, -1.8, 3.3, 9.0),
Vectors.sparse(10, Array(0), Array(1.0)))
val df = sqlContext.createDataFrame(data.zip(twoDegreeExpansion)).toDF("features", "expected")
val polynomialExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
polynomialExpansion.transform(df).select("polyFeatures", "expected").collect().foreach {
case Row(expanded: DenseVector, expected: DenseVector) =>
assert(expanded ~== expected absTol 1e-1)
case Row(expanded: SparseVector, expected: SparseVector) =>
assert(expanded ~== expected absTol 1e-1)
case _ =>
throw new TestFailedException("Unmatched data types after polynomial expansion", 0)
}
}
test("Polynomial expansion with setter") {
val data = Array(
Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
Vectors.dense(-2.0, 2.3),
Vectors.dense(0.0, 0.0, 0.0),
Vectors.dense(0.6, -1.1, -3.0),
Vectors.sparse(3, Seq())
)
val threeDegreeExpansion: Array[Vector] = Array(
Vectors.sparse(20, Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
Array(1.0, -2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17)),
Vectors.dense(1.0, -2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17),
Vectors.dense(Array(1.0) ++ Array.fill[Double](19)(0.0)),
Vectors.dense(1.0, 0.6, 0.36, 0.216, -1.1, -0.66, -0.396, 1.21, 0.726, -1.331, -3.0, -1.8,
-1.08, 3.3, 1.98, -3.63, 9.0, 5.4, -9.9, -27.0),
Vectors.sparse(20, Array(0), Array(1.0)))
val df = sqlContext.createDataFrame(data.zip(threeDegreeExpansion)).toDF("features", "expected")
val polynomialExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
polynomialExpansion.transform(df).select("polyFeatures", "expected").collect().foreach {
case Row(expanded: DenseVector, expected: DenseVector) =>
assert(expanded ~== expected absTol 1e-1)
case Row(expanded: SparseVector, expected: SparseVector) =>
assert(expanded ~== expected absTol 1e-1)
case _ =>
throw new TestFailedException("Unmatched data types after polynomial expansion", 0)
}
}
}