[SPARK-35024][ML] Refactor LinearSVC - support virtual centering
### What changes were proposed in this pull request? 1, remove existing agg, and use a new agg supporting virtual centering 2, add related testsuites ### Why are the changes needed? centering vectors should accelerate convergence, and generate solution more close to R ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? updated testsuites and added testsuites Closes #32124 from zhengruifeng/svc_agg_refactor. Authored-by: Ruifeng Zheng <ruifengz@foxmail.com> Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
This commit is contained in:
parent
bcac733bf1
commit
1f150b9392
|
@ -38,14 +38,14 @@ test_that("spark.svmLinear", {
|
|||
expect_true(class(summary$coefficients[, 1]) == "numeric")
|
||||
|
||||
coefs <- summary$coefficients[, "Estimate"]
|
||||
expected_coefs <- c(-0.06004978, -0.1563083, -0.460648, 0.2276626, 1.055085)
|
||||
expected_coefs <- c(-6.8823988, -0.6154984, -1.5135447, 1.9694126, 3.3736856)
|
||||
expect_true(all(abs(coefs - expected_coefs) < 0.1))
|
||||
|
||||
# Test prediction with string label
|
||||
prediction <- predict(model, training)
|
||||
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
|
||||
expected <- c("versicolor", "versicolor", "versicolor", "virginica", "virginica",
|
||||
"virginica", "virginica", "virginica", "virginica", "virginica")
|
||||
expected <- c("versicolor", "versicolor", "versicolor", "versicolor", "versicolor",
|
||||
"versicolor", "versicolor", "versicolor", "versicolor", "versicolor")
|
||||
expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)
|
||||
|
||||
# Test model save and load
|
||||
|
|
|
@ -222,6 +222,7 @@ class LinearSVC @Since("2.2.0") (
|
|||
}
|
||||
|
||||
val featuresStd = summarizer.std.toArray
|
||||
val featuresMean = summarizer.mean.toArray
|
||||
val getFeaturesStd = (j: Int) => featuresStd(j)
|
||||
val regularization = if ($(regParam) != 0.0) {
|
||||
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
|
||||
|
@ -239,7 +240,8 @@ class LinearSVC @Since("2.2.0") (
|
|||
as a result, no scaling is needed.
|
||||
*/
|
||||
val (rawCoefficients, objectiveHistory) =
|
||||
trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer)
|
||||
trainImpl(instances, actualBlockSizeInMB, featuresStd, featuresMean,
|
||||
regularization, optimizer)
|
||||
|
||||
if (rawCoefficients == null) {
|
||||
val msg = s"${optimizer.getClass.getName} failed."
|
||||
|
@ -277,16 +279,19 @@ class LinearSVC @Since("2.2.0") (
|
|||
instances: RDD[Instance],
|
||||
actualBlockSizeInMB: Double,
|
||||
featuresStd: Array[Double],
|
||||
featuresMean: Array[Double],
|
||||
regularization: Option[L2Regularization],
|
||||
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
|
||||
val numFeatures = featuresStd.length
|
||||
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
|
||||
|
||||
val bcFeaturesStd = instances.context.broadcast(featuresStd)
|
||||
val inverseStd = featuresStd.map(std => if (std != 0) 1.0 / std else 0.0)
|
||||
val scaledMean = Array.tabulate(numFeatures)(i => inverseStd(i) * featuresMean(i))
|
||||
val bcInverseStd = instances.context.broadcast(inverseStd)
|
||||
val bcScaledMean = instances.context.broadcast(scaledMean)
|
||||
|
||||
val standardized = instances.mapPartitions { iter =>
|
||||
val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 }
|
||||
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
|
||||
val func = StandardScalerModel.getTransformFunc(Array.empty, bcInverseStd.value, false, true)
|
||||
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
|
||||
}
|
||||
|
||||
|
@ -295,13 +300,24 @@ class LinearSVC @Since("2.2.0") (
|
|||
.persist(StorageLevel.MEMORY_AND_DISK)
|
||||
.setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")
|
||||
|
||||
val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
|
||||
val getAggregatorFunc = new HingeBlockAggregator(bcInverseStd, bcScaledMean,
|
||||
$(fitIntercept))(_)
|
||||
val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
|
||||
regularization, $(aggregationDepth))
|
||||
|
||||
val states = optimizer.iterations(new CachedDiffFunction(costFun),
|
||||
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
|
||||
val initialSolution = Array.ofDim[Double](numFeaturesPlusIntercept)
|
||||
if ($(fitIntercept)) {
|
||||
// orginal `initialSolution` is for problem:
|
||||
// y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept)
|
||||
// we should adjust it to the initial solution for problem:
|
||||
// y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept)
|
||||
// NOTE: this is NOOP before we finally support model initialization
|
||||
val adapt = BLAS.javaBLAS.ddot(numFeatures, initialSolution, 1, scaledMean, 1)
|
||||
initialSolution(numFeatures) += adapt
|
||||
}
|
||||
|
||||
val states = optimizer.iterations(new CachedDiffFunction(costFun),
|
||||
new BDV[Double](initialSolution))
|
||||
val arrayBuilder = mutable.ArrayBuilder.make[Double]
|
||||
var state: optimizer.State = null
|
||||
while (states.hasNext) {
|
||||
|
@ -309,9 +325,19 @@ class LinearSVC @Since("2.2.0") (
|
|||
arrayBuilder += state.adjustedValue
|
||||
}
|
||||
blocks.unpersist()
|
||||
bcFeaturesStd.destroy()
|
||||
bcInverseStd.destroy()
|
||||
bcScaledMean.destroy()
|
||||
|
||||
(if (state != null) state.x.toArray else null, arrayBuilder.result)
|
||||
val solution = if (state == null) null else state.x.toArray
|
||||
if ($(fitIntercept) && solution != null) {
|
||||
// the final solution is for problem:
|
||||
// y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept)
|
||||
// we should adjust it back for original problem:
|
||||
// y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept)
|
||||
val adapt = BLAS.javaBLAS.ddot(numFeatures, solution, 1, scaledMean, 1)
|
||||
solution(numFeatures) -= adapt
|
||||
}
|
||||
(solution, arrayBuilder.result)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -982,14 +982,14 @@ class LogisticRegression @Since("1.2.0") (
|
|||
val adapt = Array.ofDim[Double](numClasses)
|
||||
BLAS.javaBLAS.dgemv("N", numClasses, numFeatures, 1.0,
|
||||
initialSolution, numClasses, scaledMean, 1, 0.0, adapt, 1)
|
||||
BLAS.getBLAS(numFeatures).daxpy(numClasses, 1.0, adapt, 0, 1,
|
||||
BLAS.javaBLAS.daxpy(numClasses, 1.0, adapt, 0, 1,
|
||||
initialSolution, numClasses * numFeatures, 1)
|
||||
} else {
|
||||
// orginal `initialCoefWithInterceptArray` is for problem:
|
||||
// original `initialSolution` is for problem:
|
||||
// y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept)
|
||||
// we should adjust it to the initial solution for problem:
|
||||
// y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept)
|
||||
val adapt = BLAS.getBLAS(numFeatures).ddot(numFeatures, initialSolution, 1, scaledMean, 1)
|
||||
val adapt = BLAS.javaBLAS.ddot(numFeatures, initialSolution, 1, scaledMean, 1)
|
||||
initialSolution(numFeatures) += adapt
|
||||
}
|
||||
}
|
||||
|
@ -1018,14 +1018,14 @@ class LogisticRegression @Since("1.2.0") (
|
|||
val adapt = Array.ofDim[Double](numClasses)
|
||||
BLAS.javaBLAS.dgemv("N", numClasses, numFeatures, 1.0,
|
||||
solution, numClasses, scaledMean, 1, 0.0, adapt, 1)
|
||||
BLAS.getBLAS(numFeatures).daxpy(numClasses, -1.0, adapt, 0, 1,
|
||||
BLAS.javaBLAS.daxpy(numClasses, -1.0, adapt, 0, 1,
|
||||
solution, numClasses * numFeatures, 1)
|
||||
} else {
|
||||
// the final solution is for problem:
|
||||
// y = f(w1 * (x1 - avg_x1) / std_x1, w2 * (x2 - avg_x2) / std_x2, ..., intercept)
|
||||
// we should adjust it back for original problem:
|
||||
// y = f(w1 * x1 / std_x1, w2 * x2 / std_x2, ..., intercept)
|
||||
val adapt = BLAS.getBLAS(numFeatures).ddot(numFeatures, solution, 1, scaledMean, 1)
|
||||
val adapt = BLAS.javaBLAS.ddot(numFeatures, solution, 1, scaledMean, 1)
|
||||
solution(numFeatures) -= adapt
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ private[ml] class BinaryLogisticBlockAggregator(
|
|||
// deal with non-zero values in prediction.
|
||||
private val marginOffset = if (fitWithMean) {
|
||||
coefficientsArray.last -
|
||||
BLAS.getBLAS(numFeatures).ddot(numFeatures, coefficientsArray, 1, bcScaledMean.value, 1)
|
||||
BLAS.javaBLAS.ddot(numFeatures, coefficientsArray, 1, bcScaledMean.value, 1)
|
||||
} else {
|
||||
Double.NaN
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ private[ml] class BinaryLogisticBlockAggregator(
|
|||
case sm: SparseMatrix if fitIntercept =>
|
||||
val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
|
||||
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
|
||||
BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
|
||||
BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
|
||||
gradientSumArray, 1)
|
||||
|
||||
case sm: SparseMatrix if !fitIntercept =>
|
||||
|
@ -156,7 +156,7 @@ private[ml] class BinaryLogisticBlockAggregator(
|
|||
if (fitWithMean) {
|
||||
// above update of the linear part of gradientSumArray does NOT take the centering
|
||||
// into account, here we need to adjust this part.
|
||||
BLAS.getBLAS(numFeatures).daxpy(numFeatures, -multiplierSum, bcScaledMean.value, 1,
|
||||
BLAS.javaBLAS.daxpy(numFeatures, -multiplierSum, bcScaledMean.value, 1,
|
||||
gradientSumArray, 1)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,212 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.ml.optim.aggregator
|
||||
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
|
||||
import org.apache.spark.ml.linalg._
|
||||
|
||||
/**
|
||||
* HingeAggregator computes the gradient and loss for Hinge loss function as used in
|
||||
* binary classification for instances in sparse or dense vector in an online fashion.
|
||||
*
|
||||
* Two HingeAggregators can be merged together to have a summary of loss and gradient of
|
||||
* the corresponding joint dataset.
|
||||
*
|
||||
* This class standardizes feature values during computation using bcFeaturesStd.
|
||||
*
|
||||
* @param bcCoefficients The coefficients corresponding to the features.
|
||||
* @param fitIntercept Whether to fit an intercept term.
|
||||
* @param bcFeaturesStd The standard deviation values of the features.
|
||||
*/
|
||||
private[ml] class HingeAggregator(
|
||||
bcFeaturesStd: Broadcast[Array[Double]],
|
||||
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
|
||||
extends DifferentiableLossAggregator[Instance, HingeAggregator] {
|
||||
|
||||
private val numFeatures = bcFeaturesStd.value.length
|
||||
private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
|
||||
@transient private lazy val coefficientsArray = bcCoefficients.value match {
|
||||
case DenseVector(values) => values
|
||||
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
|
||||
s" but got type ${bcCoefficients.value.getClass}.")
|
||||
}
|
||||
protected override val dim: Int = numFeaturesPlusIntercept
|
||||
|
||||
/**
|
||||
* Add a new training instance to this HingeAggregator, and update the loss and gradient
|
||||
* of the objective function.
|
||||
*
|
||||
* @param instance The instance of data point to be added.
|
||||
* @return This HingeAggregator object.
|
||||
*/
|
||||
def add(instance: Instance): this.type = {
|
||||
instance match { case Instance(label, weight, features) =>
|
||||
require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." +
|
||||
s" Expecting $numFeatures but got ${features.size}.")
|
||||
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
|
||||
|
||||
if (weight == 0.0) return this
|
||||
val localFeaturesStd = bcFeaturesStd.value
|
||||
val localCoefficients = coefficientsArray
|
||||
val localGradientSumArray = gradientSumArray
|
||||
|
||||
val dotProduct = {
|
||||
var sum = 0.0
|
||||
features.foreachNonZero { (index, value) =>
|
||||
if (localFeaturesStd(index) != 0.0) {
|
||||
sum += localCoefficients(index) * value / localFeaturesStd(index)
|
||||
}
|
||||
}
|
||||
if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
|
||||
sum
|
||||
}
|
||||
// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
|
||||
// Therefore the gradient is -(2y - 1)*x
|
||||
val labelScaled = 2 * label - 1.0
|
||||
val loss = if (1.0 > labelScaled * dotProduct) {
|
||||
(1.0 - labelScaled * dotProduct) * weight
|
||||
} else {
|
||||
0.0
|
||||
}
|
||||
|
||||
if (1.0 > labelScaled * dotProduct) {
|
||||
val gradientScale = -labelScaled * weight
|
||||
features.foreachNonZero { (index, value) =>
|
||||
if (localFeaturesStd(index) != 0.0) {
|
||||
localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
|
||||
}
|
||||
}
|
||||
if (fitIntercept) {
|
||||
localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
|
||||
}
|
||||
}
|
||||
|
||||
lossSum += loss
|
||||
weightSum += weight
|
||||
this
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* BlockHingeAggregator computes the gradient and loss for Hinge loss function as used in
|
||||
* binary classification for blocks in sparse or dense matrix in an online fashion.
|
||||
*
|
||||
* Two BlockHingeAggregators can be merged together to have a summary of loss and gradient of
|
||||
* the corresponding joint dataset.
|
||||
*
|
||||
* NOTE: The feature values are expected to be standardized before computation.
|
||||
*
|
||||
* @param bcCoefficients The coefficients corresponding to the features.
|
||||
* @param fitIntercept Whether to fit an intercept term.
|
||||
*/
|
||||
private[ml] class BlockHingeAggregator(
|
||||
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
|
||||
extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] {
|
||||
|
||||
protected override val dim: Int = bcCoefficients.value.size
|
||||
private val numFeatures = if (fitIntercept) dim - 1 else dim
|
||||
|
||||
@transient private lazy val coefficientsArray = bcCoefficients.value match {
|
||||
case DenseVector(values) => values
|
||||
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
|
||||
s" but got type ${bcCoefficients.value.getClass}.")
|
||||
}
|
||||
|
||||
@transient private lazy val linear = {
|
||||
val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray
|
||||
Vectors.dense(linear)
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new training instance block to this BlockHingeAggregator, and update the loss and
|
||||
* gradient of the objective function.
|
||||
*
|
||||
* @param block The InstanceBlock to be added.
|
||||
* @return This BlockHingeAggregator object.
|
||||
*/
|
||||
def add(block: InstanceBlock): this.type = {
|
||||
require(block.matrix.isTransposed)
|
||||
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
|
||||
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
|
||||
require(block.weightIter.forall(_ >= 0),
|
||||
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
|
||||
|
||||
if (block.weightIter.forall(_ == 0)) return this
|
||||
val size = block.size
|
||||
|
||||
// vec here represents dotProducts
|
||||
val vec = if (fitIntercept) {
|
||||
Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense
|
||||
} else {
|
||||
Vectors.zeros(size).toDense
|
||||
}
|
||||
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
|
||||
|
||||
// in-place convert dotProducts to gradient scales
|
||||
// then, vec represents gradient scales
|
||||
var localLossSum = 0.0
|
||||
var i = 0
|
||||
while (i < size) {
|
||||
val weight = block.getWeight(i)
|
||||
if (weight > 0) {
|
||||
// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
|
||||
// Therefore the gradient is -(2y - 1)*x
|
||||
val label = block.getLabel(i)
|
||||
val labelScaled = label + label - 1.0
|
||||
val loss = (1.0 - labelScaled * vec(i)) * weight
|
||||
if (loss > 0) {
|
||||
localLossSum += loss
|
||||
val gradScale = -labelScaled * weight
|
||||
vec.values(i) = gradScale
|
||||
} else { vec.values(i) = 0.0 }
|
||||
} else { vec.values(i) = 0.0 }
|
||||
i += 1
|
||||
}
|
||||
lossSum += localLossSum
|
||||
weightSum += block.weightIter.sum
|
||||
|
||||
// predictions are all correct, no gradient signal
|
||||
if (vec.values.forall(_ == 0)) return this
|
||||
|
||||
block.matrix match {
|
||||
case dm: DenseMatrix =>
|
||||
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
|
||||
vec.values, 1, 1.0, gradientSumArray, 1)
|
||||
|
||||
case sm: SparseMatrix if fitIntercept =>
|
||||
val linearGradSumVec = Vectors.zeros(numFeatures).toDense
|
||||
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
|
||||
BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
|
||||
gradientSumArray, 1)
|
||||
|
||||
case sm: SparseMatrix if !fitIntercept =>
|
||||
val gradSumVec = new DenseVector(gradientSumArray)
|
||||
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
|
||||
|
||||
case m =>
|
||||
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
|
||||
}
|
||||
|
||||
if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum
|
||||
|
||||
this
|
||||
}
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.ml.optim.aggregator
|
||||
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.ml.feature.InstanceBlock
|
||||
import org.apache.spark.ml.linalg._
|
||||
|
||||
|
||||
/**
|
||||
* HingeBlockAggregator computes the gradient and loss for Huber loss function
|
||||
* as used in linear regression for blocks in sparse or dense matrix in an online fashion.
|
||||
*
|
||||
* Two BlockHuberAggregators can be merged together to have a summary of loss and gradient
|
||||
* of the corresponding joint dataset.
|
||||
*
|
||||
* NOTE: The feature values are expected to already have be scaled (multiplied by bcInverseStd,
|
||||
* but NOT centered) before computation.
|
||||
*
|
||||
* @param bcCoefficients The coefficients corresponding to the features.
|
||||
* @param fitIntercept Whether to fit an intercept term. When true, will perform data centering
|
||||
* in a virtual way. Then we MUST adjust the intercept of both initial
|
||||
* coefficients and final solution in the caller.
|
||||
*/
|
||||
private[ml] class HingeBlockAggregator(
|
||||
bcInverseStd: Broadcast[Array[Double]],
|
||||
bcScaledMean: Broadcast[Array[Double]],
|
||||
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
|
||||
extends DifferentiableLossAggregator[InstanceBlock, HingeBlockAggregator]
|
||||
with Logging {
|
||||
|
||||
if (fitIntercept) {
|
||||
require(bcScaledMean != null && bcScaledMean.value.length == bcInverseStd.value.length,
|
||||
"scaled means is required when center the vectors")
|
||||
}
|
||||
|
||||
private val numFeatures = bcInverseStd.value.length
|
||||
protected override val dim: Int = bcCoefficients.value.size
|
||||
|
||||
@transient private lazy val coefficientsArray = bcCoefficients.value match {
|
||||
case DenseVector(values) => values
|
||||
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " +
|
||||
s"got type ${bcCoefficients.value.getClass}.)")
|
||||
}
|
||||
|
||||
@transient private lazy val linear = if (fitIntercept) {
|
||||
new DenseVector(coefficientsArray.take(numFeatures))
|
||||
} else {
|
||||
new DenseVector(coefficientsArray)
|
||||
}
|
||||
|
||||
// pre-computed margin of an empty vector.
|
||||
// with this variable as an offset, for a sparse vector, we only need to
|
||||
// deal with non-zero values in prediction.
|
||||
private val marginOffset = if (fitIntercept) {
|
||||
coefficientsArray.last -
|
||||
BLAS.javaBLAS.ddot(numFeatures, coefficientsArray, 1, bcScaledMean.value, 1)
|
||||
} else {
|
||||
Double.NaN
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new training instance block to this HingeBlockAggregator, and update the loss
|
||||
* and gradient of the objective function.
|
||||
*
|
||||
* @param block The instance block of data point to be added.
|
||||
* @return This HingeBlockAggregator object.
|
||||
*/
|
||||
def add(block: InstanceBlock): this.type = {
|
||||
require(block.matrix.isTransposed)
|
||||
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
|
||||
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
|
||||
require(block.weightIter.forall(_ >= 0),
|
||||
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
|
||||
|
||||
if (block.weightIter.forall(_ == 0)) return this
|
||||
val size = block.size
|
||||
|
||||
// vec/arr here represents margins
|
||||
val vec = new DenseVector(Array.ofDim[Double](size))
|
||||
val arr = vec.values
|
||||
if (fitIntercept) java.util.Arrays.fill(arr, marginOffset)
|
||||
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
|
||||
|
||||
// in-place convert margins to multiplier
|
||||
// then, vec/arr represents multiplier
|
||||
var localLossSum = 0.0
|
||||
var localWeightSum = 0.0
|
||||
var multiplierSum = 0.0
|
||||
var i = 0
|
||||
while (i < size) {
|
||||
val weight = block.getWeight(i)
|
||||
localWeightSum += weight
|
||||
if (weight > 0) {
|
||||
// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
|
||||
// Therefore the gradient is -(2y - 1)*x
|
||||
val label = block.getLabel(i)
|
||||
val labelScaled = label + label - 1.0
|
||||
val loss = (1.0 - labelScaled * arr(i)) * weight
|
||||
if (loss > 0) {
|
||||
localLossSum += loss
|
||||
val multiplier = -labelScaled * weight
|
||||
arr(i) = multiplier
|
||||
multiplierSum += multiplier
|
||||
} else { arr(i) = 0.0 }
|
||||
} else { arr(i) = 0.0 }
|
||||
i += 1
|
||||
}
|
||||
lossSum += localLossSum
|
||||
weightSum += localWeightSum
|
||||
|
||||
// predictions are all correct, no gradient signal
|
||||
if (arr.forall(_ == 0)) return this
|
||||
|
||||
// update the linear part of gradientSumArray
|
||||
block.matrix match {
|
||||
case dm: DenseMatrix =>
|
||||
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
|
||||
vec.values, 1, 1.0, gradientSumArray, 1)
|
||||
|
||||
case sm: SparseMatrix if fitIntercept =>
|
||||
val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
|
||||
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
|
||||
BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
|
||||
gradientSumArray, 1)
|
||||
|
||||
case sm: SparseMatrix if !fitIntercept =>
|
||||
val gradSumVec = new DenseVector(gradientSumArray)
|
||||
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
|
||||
|
||||
case m =>
|
||||
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
|
||||
}
|
||||
|
||||
if (fitIntercept) {
|
||||
// above update of the linear part of gradientSumArray does NOT take the centering
|
||||
// into account, here we need to adjust this part.
|
||||
BLAS.javaBLAS.daxpy(numFeatures, -multiplierSum, bcScaledMean.value, 1,
|
||||
gradientSumArray, 1)
|
||||
|
||||
// update the intercept part of gradientSumArray
|
||||
gradientSumArray(numFeatures) += multiplierSum
|
||||
}
|
||||
|
||||
this
|
||||
}
|
||||
}
|
|
@ -203,7 +203,7 @@ private[ml] class MultinomialLogisticBlockAggregator(
|
|||
}
|
||||
|
||||
if (fitIntercept) {
|
||||
BLAS.getBLAS(numClasses).daxpy(numClasses, 1.0, multiplierSum, 0, 1,
|
||||
BLAS.javaBLAS.daxpy(numClasses, 1.0, multiplierSum, 0, 1,
|
||||
gradientSumArray, numClasses * numFeatures, 1)
|
||||
}
|
||||
|
||||
|
|
|
@ -23,9 +23,8 @@ import breeze.linalg.{DenseVector => BDV}
|
|||
import org.scalatest.Assertions._
|
||||
|
||||
import org.apache.spark.ml.classification.LinearSVCSuite._
|
||||
import org.apache.spark.ml.feature.{Instance, LabeledPoint}
|
||||
import org.apache.spark.ml.feature.LabeledPoint
|
||||
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
|
||||
import org.apache.spark.ml.optim.aggregator.HingeAggregator
|
||||
import org.apache.spark.ml.param.ParamsSuite
|
||||
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
|
||||
import org.apache.spark.ml.util.TestingUtils._
|
||||
|
@ -176,28 +175,13 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
|
|||
assert(model2.intercept !== 0.0)
|
||||
}
|
||||
|
||||
test("sparse coefficients in HingeAggregator") {
|
||||
val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
|
||||
val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
|
||||
val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients)
|
||||
val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") {
|
||||
intercept[IllegalArgumentException] {
|
||||
agg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
|
||||
}
|
||||
}
|
||||
assert(thrown.getMessage.contains("coefficients only supports dense"))
|
||||
|
||||
bcCoefficients.destroy()
|
||||
bcFeaturesStd.destroy()
|
||||
}
|
||||
|
||||
test("linearSVC with sample weights") {
|
||||
def modelEquals(m1: LinearSVCModel, m2: LinearSVCModel): Unit = {
|
||||
assert(m1.coefficients ~== m2.coefficients absTol 0.05)
|
||||
assert(m1.coefficients ~== m2.coefficients relTol 0.05)
|
||||
assert(m1.intercept ~== m2.intercept absTol 0.05)
|
||||
}
|
||||
|
||||
val estimator = new LinearSVC().setRegParam(0.01).setTol(0.01)
|
||||
val estimator = new LinearSVC().setRegParam(0.01).setTol(0.001)
|
||||
val dataset = smallBinaryDataset
|
||||
MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC](
|
||||
dataset.as[LabeledPoint], estimator, modelEquals)
|
||||
|
@ -237,7 +221,7 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
|
|||
val model1 = trainer1.fit(binaryDataset)
|
||||
|
||||
/*
|
||||
Use the following R code to load the data and train the model using glmnet package.
|
||||
Use the following R code to load the data and train the model using e1071 package.
|
||||
|
||||
library(e1071)
|
||||
data <- read.csv("path/target/tmp/LinearSVC/binaryDataset/part-00000", header=FALSE)
|
||||
|
@ -257,8 +241,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
|
|||
*/
|
||||
val coefficientsR = Vectors.dense(7.310338, 14.89741, 22.21005, 29.83508)
|
||||
val interceptR = 7.440177
|
||||
assert(model1.intercept ~== interceptR relTol 1E-2)
|
||||
assert(model1.coefficients ~== coefficientsR relTol 1E-2)
|
||||
assert(model1.intercept ~== interceptR relTol 1E-3)
|
||||
assert(model1.coefficients ~== coefficientsR relTol 5E-3)
|
||||
|
||||
/*
|
||||
Use the following python code to load the data and train the model using scikit-learn package.
|
||||
|
@ -280,8 +264,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
|
|||
|
||||
val coefficientsSK = Vectors.dense(7.24690165, 14.77029087, 21.99924004, 29.5575729)
|
||||
val interceptSK = 7.36947518
|
||||
assert(model1.intercept ~== interceptSK relTol 1E-3)
|
||||
assert(model1.coefficients ~== coefficientsSK relTol 4E-3)
|
||||
assert(model1.intercept ~== interceptSK relTol 1E-2)
|
||||
assert(model1.coefficients ~== coefficientsSK relTol 1E-2)
|
||||
}
|
||||
|
||||
test("summary and training summary") {
|
||||
|
@ -379,8 +363,8 @@ object LinearSVCSuite {
|
|||
}
|
||||
|
||||
def checkModels(model1: LinearSVCModel, model2: LinearSVCModel): Unit = {
|
||||
assert(model1.intercept == model2.intercept)
|
||||
assert(model1.coefficients.equals(model2.coefficients))
|
||||
assert(model1.intercept ~== model2.intercept relTol 1e-9)
|
||||
assert(model1.coefficients ~== model2.coefficients relTol 1e-9)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,189 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.ml.optim.aggregator
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
|
||||
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
|
||||
import org.apache.spark.ml.stat.Summarizer
|
||||
import org.apache.spark.ml.util.TestingUtils._
|
||||
import org.apache.spark.mllib.util.MLlibTestSparkContext
|
||||
|
||||
class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
|
||||
|
||||
@transient var instances: Array[Instance] = _
|
||||
@transient var instancesConstantFeature: Array[Instance] = _
|
||||
@transient var instancesConstantFeatureFiltered: Array[Instance] = _
|
||||
@transient var standardizedInstances: Array[Instance] = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
instances = Array(
|
||||
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
|
||||
Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
|
||||
Instance(0.0, 0.3, Vectors.dense(4.0, 0.5))
|
||||
)
|
||||
instancesConstantFeature = Array(
|
||||
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
|
||||
Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
|
||||
Instance(1.0, 0.3, Vectors.dense(1.0, 0.5))
|
||||
)
|
||||
instancesConstantFeatureFiltered = Array(
|
||||
Instance(0.0, 0.1, Vectors.dense(2.0)),
|
||||
Instance(1.0, 0.5, Vectors.dense(1.0)),
|
||||
Instance(2.0, 0.3, Vectors.dense(0.5))
|
||||
)
|
||||
standardizedInstances = standardize(instances)
|
||||
}
|
||||
|
||||
/** Get summary statistics for some data and create a new HingeAggregator. */
|
||||
private def getNewAggregator(
|
||||
instances: Array[Instance],
|
||||
coefficients: Vector,
|
||||
fitIntercept: Boolean): HingeAggregator = {
|
||||
val (featuresSummarizer, ySummarizer) =
|
||||
Summarizer.getClassificationSummarizers(sc.parallelize(instances))
|
||||
val featuresStd = featuresSummarizer.std.toArray
|
||||
val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
|
||||
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
|
||||
new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
|
||||
}
|
||||
|
||||
/** Get summary statistics for some data and create a new BlockHingeAggregator. */
|
||||
private def getNewBlockAggregator(
|
||||
coefficients: Vector,
|
||||
fitIntercept: Boolean): BlockHingeAggregator = {
|
||||
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
|
||||
new BlockHingeAggregator(fitIntercept)(bcCoefficients)
|
||||
}
|
||||
|
||||
test("aggregator add method input size") {
|
||||
val coefArray = Array(1.0, 2.0)
|
||||
val interceptArray = Array(2.0)
|
||||
val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
|
||||
fitIntercept = true)
|
||||
withClue("HingeAggregator features dimension must match coefficients dimension") {
|
||||
intercept[IllegalArgumentException] {
|
||||
agg.add(Instance(1.0, 1.0, Vectors.dense(2.0)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("negative weight") {
|
||||
val coefArray = Array(1.0, 2.0)
|
||||
val interceptArray = Array(2.0)
|
||||
val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
|
||||
fitIntercept = true)
|
||||
withClue("HingeAggregator does not support negative instance weights") {
|
||||
intercept[IllegalArgumentException] {
|
||||
agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("check sizes") {
|
||||
val rng = new scala.util.Random
|
||||
val numFeatures = instances.head.features.size
|
||||
val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble))
|
||||
val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble))
|
||||
val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true)
|
||||
val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept,
|
||||
fitIntercept = false)
|
||||
instances.foreach(aggIntercept.add)
|
||||
instances.foreach(aggNoIntercept.add)
|
||||
|
||||
assert(aggIntercept.gradient.size === numFeatures + 1)
|
||||
assert(aggNoIntercept.gradient.size === numFeatures)
|
||||
}
|
||||
|
||||
test("check correctness") {
|
||||
val coefArray = Array(1.0, 2.0)
|
||||
val intercept = 1.0
|
||||
val numFeatures = instances.head.features.size
|
||||
val (featuresSummarizer, _) = Summarizer.getClassificationSummarizers(sc.parallelize(instances))
|
||||
val featuresStd = featuresSummarizer.std.toArray
|
||||
val weightSum = instances.map(_.weight).sum
|
||||
|
||||
val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
|
||||
fitIntercept = true)
|
||||
instances.foreach(agg.add)
|
||||
|
||||
// compute the loss
|
||||
val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i)).toArray
|
||||
val lossSum = instances.map { case Instance(l, w, f) =>
|
||||
val margin = BLAS.dot(Vectors.dense(stdCoef), f) + intercept
|
||||
val labelScaled = 2 * l - 1.0
|
||||
if (1.0 > labelScaled * margin) {
|
||||
(1.0 - labelScaled * margin) * w
|
||||
} else {
|
||||
0.0
|
||||
}
|
||||
}.sum
|
||||
val loss = lossSum / weightSum
|
||||
|
||||
// compute the gradients
|
||||
val gradientCoef = new Array[Double](numFeatures)
|
||||
var gradientIntercept = 0.0
|
||||
instances.foreach { case Instance(l, w, f) =>
|
||||
val margin = BLAS.dot(f, Vectors.dense(coefArray)) + intercept
|
||||
if (1.0 > (2 * l - 1.0) * margin) {
|
||||
gradientCoef.indices.foreach { i =>
|
||||
gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i)
|
||||
}
|
||||
gradientIntercept += -(2 * l - 1.0) * w
|
||||
}
|
||||
}
|
||||
val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum))
|
||||
|
||||
assert(loss ~== agg.loss relTol 1e-9)
|
||||
assert(gradient ~== agg.gradient relTol 1e-9)
|
||||
|
||||
Seq(1, 2, 4).foreach { blockSize =>
|
||||
val blocks1 = standardizedInstances
|
||||
.grouped(blockSize)
|
||||
.map(seq => InstanceBlock.fromInstances(seq))
|
||||
.toArray
|
||||
val blocks2 = blocks1.map { block =>
|
||||
new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
|
||||
}
|
||||
|
||||
Seq(blocks1, blocks2).foreach { blocks =>
|
||||
val blockAgg = getNewBlockAggregator(Vectors.dense(coefArray ++ Array(intercept)),
|
||||
fitIntercept = true)
|
||||
blocks.foreach(blockAgg.add)
|
||||
assert(agg.loss ~== blockAgg.loss relTol 1e-9)
|
||||
assert(agg.gradient ~== blockAgg.gradient relTol 1e-9)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("check with zero standard deviation") {
|
||||
val binaryCoefArray = Array(1.0, 2.0)
|
||||
val intercept = 1.0
|
||||
val aggConstantFeatureBinary = getNewAggregator(instancesConstantFeature,
|
||||
Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true)
|
||||
instancesConstantFeature.foreach(aggConstantFeatureBinary.add)
|
||||
|
||||
val aggConstantFeatureBinaryFiltered = getNewAggregator(instancesConstantFeatureFiltered,
|
||||
Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true)
|
||||
instancesConstantFeatureFiltered.foreach(aggConstantFeatureBinaryFiltered.add)
|
||||
|
||||
// constant features should not affect gradient
|
||||
assert(aggConstantFeatureBinary.gradient(0) === 0.0)
|
||||
assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,258 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.ml.optim.aggregator
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
|
||||
import org.apache.spark.ml.linalg._
|
||||
import org.apache.spark.ml.stat.Summarizer
|
||||
import org.apache.spark.ml.util.TestingUtils._
|
||||
import org.apache.spark.mllib.util.MLlibTestSparkContext
|
||||
|
||||
class HingeBlockAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
|
||||
|
||||
@transient var instances: Array[Instance] = _
|
||||
@transient var instancesConstantFeature: Array[Instance] = _
|
||||
@transient var instancesConstantFeatureFiltered: Array[Instance] = _
|
||||
@transient var scaledInstances: Array[Instance] = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
instances = Array(
|
||||
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
|
||||
Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
|
||||
Instance(0.0, 0.3, Vectors.dense(4.0, 0.5))
|
||||
)
|
||||
instancesConstantFeature = Array(
|
||||
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
|
||||
Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
|
||||
Instance(1.0, 0.3, Vectors.dense(1.0, 0.5))
|
||||
)
|
||||
instancesConstantFeatureFiltered = Array(
|
||||
Instance(0.0, 0.1, Vectors.dense(2.0)),
|
||||
Instance(1.0, 0.5, Vectors.dense(1.0)),
|
||||
Instance(1.0, 0.3, Vectors.dense(0.5))
|
||||
)
|
||||
scaledInstances = standardize(instances)
|
||||
}
|
||||
|
||||
|
||||
/** Get summary statistics for some data and create a new HingeBlockAggregator. */
|
||||
private def getNewAggregator(
|
||||
instances: Array[Instance],
|
||||
coefficients: Vector,
|
||||
fitIntercept: Boolean): HingeBlockAggregator = {
|
||||
val (featuresSummarizer, _) =
|
||||
Summarizer.getClassificationSummarizers(sc.parallelize(instances))
|
||||
val featuresStd = featuresSummarizer.std.toArray
|
||||
val featuresMean = featuresSummarizer.mean.toArray
|
||||
val inverseStd = featuresStd.map(std => if (std != 0) 1.0 / std else 0.0)
|
||||
val scaledMean = inverseStd.zip(featuresMean).map(t => t._1 * t._2)
|
||||
val bcInverseStd = sc.broadcast(inverseStd)
|
||||
val bcScaledMean = sc.broadcast(scaledMean)
|
||||
val bcCoefficients = sc.broadcast(coefficients)
|
||||
new HingeBlockAggregator(bcInverseStd, bcScaledMean, fitIntercept)(bcCoefficients)
|
||||
}
|
||||
|
||||
test("sparse coefficients") {
|
||||
val bcInverseStd = sc.broadcast(Array(1.0))
|
||||
val bcScaledMean = sc.broadcast(Array(2.0))
|
||||
val bcCoefficients = sc.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
|
||||
val binaryAgg = new HingeBlockAggregator(bcInverseStd, bcScaledMean,
|
||||
fitIntercept = false)(bcCoefficients)
|
||||
val block = InstanceBlock.fromInstances(Seq(Instance(1.0, 1.0, Vectors.dense(1.0))))
|
||||
val thrownBinary = withClue("aggregator cannot handle sparse coefficients") {
|
||||
intercept[IllegalArgumentException] {
|
||||
binaryAgg.add(block)
|
||||
}
|
||||
}
|
||||
assert(thrownBinary.getMessage.contains("coefficients only supports dense"))
|
||||
}
|
||||
|
||||
test("aggregator add method input size") {
|
||||
val coefArray = Array(1.0, 2.0)
|
||||
val interceptValue = 4.0
|
||||
val agg = getNewAggregator(instances, Vectors.dense(coefArray :+ interceptValue),
|
||||
fitIntercept = true)
|
||||
val block = InstanceBlock.fromInstances(Seq(Instance(1.0, 1.0, Vectors.dense(2.0))))
|
||||
withClue("BinaryLogisticBlockAggregator features dimension must match coefficients dimension") {
|
||||
intercept[IllegalArgumentException] {
|
||||
agg.add(block)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("negative weight") {
|
||||
val coefArray = Array(1.0, 2.0)
|
||||
val interceptValue = 4.0
|
||||
val agg = getNewAggregator(instances, Vectors.dense(coefArray :+ interceptValue),
|
||||
fitIntercept = true)
|
||||
val block = InstanceBlock.fromInstances(Seq(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))))
|
||||
withClue("BinaryLogisticBlockAggregator does not support negative instance weights") {
|
||||
intercept[IllegalArgumentException] {
|
||||
agg.add(block)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("check sizes") {
|
||||
val rng = new scala.util.Random
|
||||
val numFeatures = instances.head.features.size
|
||||
val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble))
|
||||
val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble))
|
||||
val block = InstanceBlock.fromInstances(instances)
|
||||
|
||||
val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true)
|
||||
aggIntercept.add(block)
|
||||
assert(aggIntercept.gradient.size === numFeatures + 1)
|
||||
|
||||
val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, fitIntercept = false)
|
||||
aggNoIntercept.add(block)
|
||||
assert(aggNoIntercept.gradient.size === numFeatures)
|
||||
}
|
||||
|
||||
test("check correctness: fitIntercept = false") {
|
||||
val coefVec = Vectors.dense(1.0, 2.0)
|
||||
val numFeatures = instances.head.features.size
|
||||
val (featuresSummarizer, _) =
|
||||
Summarizer.getClassificationSummarizers(sc.parallelize(instances))
|
||||
val featuresStd = featuresSummarizer.std
|
||||
val stdCoefVec = Vectors.dense(Array.tabulate(coefVec.size)(i => coefVec(i) / featuresStd(i)))
|
||||
val weightSum = instances.map(_.weight).sum
|
||||
|
||||
// compute the loss and the gradients
|
||||
var lossSum = 0.0
|
||||
val gradientCoef = Array.ofDim[Double](numFeatures)
|
||||
instances.foreach { case Instance(l, w, f) =>
|
||||
val margin = BLAS.dot(stdCoefVec, f)
|
||||
val labelScaled = 2 * l - 1.0
|
||||
if (1.0 > labelScaled * margin) {
|
||||
lossSum += (1.0 - labelScaled * margin) * w
|
||||
gradientCoef.indices.foreach { i =>
|
||||
gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i)
|
||||
}
|
||||
}
|
||||
}
|
||||
val loss = lossSum / weightSum
|
||||
val gradient = Vectors.dense(gradientCoef.map(_ / weightSum))
|
||||
|
||||
Seq(1, 2, 4).foreach { blockSize =>
|
||||
val blocks1 = scaledInstances
|
||||
.grouped(blockSize)
|
||||
.map(seq => InstanceBlock.fromInstances(seq))
|
||||
.toArray
|
||||
val blocks2 = blocks1.map { block =>
|
||||
new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
|
||||
}
|
||||
|
||||
Seq(blocks1, blocks2).foreach { blocks =>
|
||||
val agg = getNewAggregator(instances, coefVec, fitIntercept = false)
|
||||
blocks.foreach(agg.add)
|
||||
assert(agg.loss ~== loss relTol 1e-9)
|
||||
assert(agg.gradient ~== gradient relTol 1e-9)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("check correctness: fitIntercept = true") {
|
||||
val coefVec = Vectors.dense(1.0, 2.0)
|
||||
val interceptValue = 1.0
|
||||
val numFeatures = instances.head.features.size
|
||||
val (featuresSummarizer, _) =
|
||||
Summarizer.getClassificationSummarizers(sc.parallelize(instances))
|
||||
val featuresStd = featuresSummarizer.std
|
||||
val featuresMean = featuresSummarizer.mean
|
||||
val stdCoefVec = Vectors.dense(Array.tabulate(coefVec.size)(i => coefVec(i) / featuresStd(i)))
|
||||
val weightSum = instances.map(_.weight).sum
|
||||
|
||||
// compute the loss and the gradients
|
||||
var lossSum = 0.0
|
||||
val gradientCoef = Array.ofDim[Double](numFeatures)
|
||||
var gradientIntercept = 0.0
|
||||
instances.foreach { case Instance(l, w, f) =>
|
||||
val centered = f.toDense.copy
|
||||
BLAS.axpy(-1.0, featuresMean, centered)
|
||||
val margin = BLAS.dot(stdCoefVec, centered) + interceptValue
|
||||
val labelScaled = 2 * l - 1.0
|
||||
if (1.0 > labelScaled * margin) {
|
||||
lossSum += (1.0 - labelScaled * margin) * w
|
||||
gradientCoef.indices.foreach { i =>
|
||||
gradientCoef(i) += (f(i) - featuresMean(i)) * -(2 * l - 1.0) * w / featuresStd(i)
|
||||
}
|
||||
gradientIntercept += -(2 * l - 1.0) * w
|
||||
}
|
||||
}
|
||||
val loss = lossSum / weightSum
|
||||
val gradient = Vectors.dense((gradientCoef :+ gradientIntercept).map(_ / weightSum))
|
||||
|
||||
Seq(1, 2, 4).foreach { blockSize =>
|
||||
val blocks1 = scaledInstances
|
||||
.grouped(blockSize)
|
||||
.map(seq => InstanceBlock.fromInstances(seq))
|
||||
.toArray
|
||||
val blocks2 = blocks1.map { block =>
|
||||
new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
|
||||
}
|
||||
|
||||
Seq(blocks1, blocks2).foreach { blocks =>
|
||||
val agg = getNewAggregator(instances, Vectors.dense(coefVec.toArray :+ interceptValue),
|
||||
fitIntercept = true)
|
||||
blocks.foreach(agg.add)
|
||||
assert(agg.loss ~== loss relTol 1e-9)
|
||||
assert(agg.gradient ~== gradient relTol 1e-9)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("check with zero standard deviation") {
|
||||
val coefArray = Array(1.0, 2.0)
|
||||
val coefArrayFiltered = Array(2.0)
|
||||
val interceptValue = 1.0
|
||||
|
||||
Seq(false, true).foreach { fitIntercept =>
|
||||
val coefVec = if (fitIntercept) {
|
||||
Vectors.dense(coefArray :+ interceptValue)
|
||||
} else {
|
||||
Vectors.dense(coefArray)
|
||||
}
|
||||
val aggConstantFeature = getNewAggregator(instancesConstantFeature,
|
||||
coefVec, fitIntercept = fitIntercept)
|
||||
aggConstantFeature
|
||||
.add(InstanceBlock.fromInstances(standardize(instancesConstantFeature)))
|
||||
val grad = aggConstantFeature.gradient
|
||||
|
||||
val coefVecFiltered = if (fitIntercept) {
|
||||
Vectors.dense(coefArrayFiltered :+ interceptValue)
|
||||
} else {
|
||||
Vectors.dense(coefArrayFiltered)
|
||||
}
|
||||
val aggConstantFeatureFiltered = getNewAggregator(instancesConstantFeatureFiltered,
|
||||
coefVecFiltered, fitIntercept = fitIntercept)
|
||||
aggConstantFeatureFiltered
|
||||
.add(InstanceBlock.fromInstances(standardize(instancesConstantFeatureFiltered)))
|
||||
val gradFiltered = aggConstantFeatureFiltered.gradient
|
||||
|
||||
// constant features should not affect gradient
|
||||
assert(aggConstantFeature.loss ~== aggConstantFeatureFiltered.loss relTol 1e-9)
|
||||
assert(grad(0) === 0)
|
||||
assert(grad(1) ~== gradFiltered(0) relTol 1e-9)
|
||||
if (fitIntercept) {
|
||||
assert(grad.toArray.last ~== gradFiltered.toArray.last relTol 1e-9)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -571,9 +571,9 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
|
|||
>>> model.getMaxBlockSizeInMB()
|
||||
0.0
|
||||
>>> model.coefficients
|
||||
DenseVector([0.0, -0.2792, -0.1833])
|
||||
DenseVector([0.0, -1.0319, -0.5159])
|
||||
>>> model.intercept
|
||||
1.0206118982229047
|
||||
2.579645978780695
|
||||
>>> model.numClasses
|
||||
2
|
||||
>>> model.numFeatures
|
||||
|
@ -582,12 +582,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
|
|||
>>> model.predict(test0.head().features)
|
||||
1.0
|
||||
>>> model.predictRaw(test0.head().features)
|
||||
DenseVector([-1.4831, 1.4831])
|
||||
DenseVector([-4.1274, 4.1274])
|
||||
>>> result = model.transform(test0).head()
|
||||
>>> result.newPrediction
|
||||
1.0
|
||||
>>> result.rawPrediction
|
||||
DenseVector([-1.4831, 1.4831])
|
||||
DenseVector([-4.1274, 4.1274])
|
||||
>>> svm_path = temp_path + "/svm"
|
||||
>>> svm.save(svm_path)
|
||||
>>> svm2 = LinearSVC.load(svm_path)
|
||||
|
|
|
@ -223,12 +223,12 @@ class TrainingSummaryTest(SparkSessionTestCase):
|
|||
self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
|
||||
self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
|
||||
print(s.weightedTruePositiveRate)
|
||||
self.assertAlmostEqual(s.weightedTruePositiveRate, 0.5, 2)
|
||||
self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.5, 2)
|
||||
self.assertAlmostEqual(s.weightedRecall, 0.5, 2)
|
||||
self.assertAlmostEqual(s.weightedPrecision, 0.25, 2)
|
||||
self.assertAlmostEqual(s.weightedFMeasure(), 0.3333333333333333, 2)
|
||||
self.assertAlmostEqual(s.weightedFMeasure(1.0), 0.3333333333333333, 2)
|
||||
self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2)
|
||||
self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2)
|
||||
self.assertAlmostEqual(s.weightedRecall, 1.0, 2)
|
||||
self.assertAlmostEqual(s.weightedPrecision, 1.0, 2)
|
||||
self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2)
|
||||
self.assertAlmostEqual(s.weightedFMeasure(1.0), 1.0, 2)
|
||||
# test evaluation (with training dataset) produces a summary with same values
|
||||
# one check is enough to verify a summary is returned, Scala version runs full test
|
||||
sameSummary = model.evaluate(df)
|
||||
|
|
Loading…
Reference in a new issue