Revert "[SPARK-30642][SPARK-30659][SPARK-30660][SPARK-30662]"

### What changes were proposed in this pull request?
Revert
#27360
#27396
#27374
#27389

### Why are the changes needed?
BLAS need more performace tests, specially on sparse datasets.
Perfermance test of LogisticRegression (https://github.com/apache/spark/pull/27374) on sparse dataset shows that blockify vectors to matrices and use BLAS will cause performance regression.
LinearSVC and LinearRegression were also updated in the same way as LogisticRegression, so we need to revert them to make sure no regression.

### Does this PR introduce any user-facing change?
remove newly added param blockSize

### How was this patch tested?
reverted testsuites

Closes #27487 from zhengruifeng/revert_blockify_ii.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
This commit is contained in:
zhengruifeng 2020-02-08 08:46:16 +08:00
parent a7451f44d2
commit 12e1bbaddb
27 changed files with 260 additions and 1071 deletions

View file

@ -502,7 +502,6 @@ private[serializer] object KryoSerializer {
"org.apache.spark.ml.attribute.NumericAttribute",
"org.apache.spark.ml.feature.Instance",
"org.apache.spark.ml.feature.InstanceBlock",
"org.apache.spark.ml.feature.LabeledPoint",
"org.apache.spark.ml.feature.OffsetInstance",
"org.apache.spark.ml.linalg.DenseMatrix",

View file

@ -682,6 +682,7 @@ private[spark] object BLAS extends Serializable {
val xTemp = xValues(k) * alpha
while (i < indEnd) {
val rowIndex = Arows(i)
yValues(Arows(i)) += Avals(i) * xTemp
i += 1
}

View file

@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.HingeAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold with HasBlockSize {
with HasAggregationDepth with HasThreshold {
/**
* Param for threshold in binary classification prediction.
@ -155,26 +155,19 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)
/**
* Set block size for stacking input data in matrices.
* Default is 1024.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val instances = extractInstances(dataset)
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
@ -215,33 +208,20 @@ class LinearSVC @Since("2.2.0") (
throw new SparkException(msg)
}
val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val featuresStd = summarizer.std.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val regularization = if (regParamL2 != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization(regParamL2, shouldApply,
if ($(standardization)) None else Some(featuresStd.apply)))
if ($(standardization)) None else Some(getFeaturesStd)))
} else {
None
}
val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")
val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))
def regParamL1Fun = (index: Int) => 0D
@ -258,7 +238,6 @@ class LinearSVC @Since("2.2.0") (
scaledObjectiveHistory += state.adjustedValue
}
blocks.unpersist()
bcFeaturesStd.destroy()
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
@ -289,6 +268,8 @@ class LinearSVC @Since("2.2.0") (
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
}
if (handlePersistence) instances.unpersist()
copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
}
}

View file

@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.LogisticAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
@ -50,8 +50,7 @@ import org.apache.spark.util.VersionUtils
*/
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth
with HasBlockSize {
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {
import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames
@ -431,15 +430,6 @@ class LogisticRegression @Since("1.2.0") (
@Since("2.2.0")
def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value)
/**
* Set block size for stacking input data in matrices.
* Default is 1024.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
private def assertBoundConstrainedOptimizationParamsValid(
numCoefficientSets: Int,
numFeatures: Int): Unit = {
@ -492,17 +482,24 @@ class LogisticRegression @Since("1.2.0") (
this
}
override protected[spark] def train(
dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr =>
override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
train(dataset, handlePersistence)
}
protected[spark] def train(
dataset: Dataset[_],
handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
val instances = extractInstances(dataset)
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol,
fitIntercept)
val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)
val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
@ -585,9 +582,8 @@ class LogisticRegression @Since("1.2.0") (
s"dangerous ground, so the algorithm may not converge.")
}
val featuresMean = summarizer.mean.compressed
val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val featuresMean = summarizer.mean.toArray
val featuresStd = summarizer.std.toArray
if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
@ -599,7 +595,8 @@ class LogisticRegression @Since("1.2.0") (
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)
val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept),
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
multinomial = isMultinomial)(_)
val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) {
featuresStd(j / numCoefficientSets)
@ -615,21 +612,7 @@ class LogisticRegression @Since("1.2.0") (
None
}
val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))
val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets
@ -823,7 +806,6 @@ class LogisticRegression @Since("1.2.0") (
state = states.next()
arrayBuilder += state.adjustedValue
}
blocks.unpersist()
bcFeaturesStd.destroy()
if (state == null) {
@ -893,6 +875,8 @@ class LogisticRegression @Since("1.2.0") (
}
}
if (handlePersistence) instances.unpersist()
val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
numClasses, isMultinomial))

View file

@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion
/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {
import MultilayerPerceptronClassifier._
@ -54,6 +54,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
@Since("1.5.0")
final def getLayers: Array[Int] = $(layers)
/**
* Block size for stacking input data in matrices to speed up the computation.
* Data is stacked within partitions. If block size is more than remaining data in
* a partition then it is adjusted to the size of this data.
* Recommended size is between 10 and 1000.
* Default: 128
*
* @group expertParam
*/
@Since("1.5.0")
final val blockSize: IntParam = new IntParam(this, "blockSize",
"Block size for stacking input data in matrices. Data is stacked within partitions." +
" If block size is more than remaining data in a partition then " +
"it is adjusted to the size of this data. Recommended size is between 10 and 1000",
ParamValidators.gt(0))
/** @group expertGetParam */
@Since("1.5.0")
final def getBlockSize: Int = $(blockSize)
/**
* The solver algorithm for optimization.
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".

View file

@ -17,10 +17,7 @@
package org.apache.spark.ml.feature
import scala.collection.mutable
import org.apache.spark.ml.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.linalg.Vector
/**
* Class that represents an instance of weighted data point with label and features.
@ -31,131 +28,6 @@ import org.apache.spark.rdd.RDD
*/
private[spark] case class Instance(label: Double, weight: Double, features: Vector)
/**
* Class that represents an block of instance.
* If all weights are 1, then an empty array is stored.
*/
private[spark] case class InstanceBlock(
labels: Array[Double],
weights: Array[Double],
matrix: Matrix) {
require(labels.length == matrix.numRows)
require(matrix.isTransposed)
if (weights.nonEmpty) {
require(labels.length == weights.length)
}
def size: Int = labels.length
def numFeatures: Int = matrix.numCols
def instanceIterator: Iterator[Instance] = {
if (weights.nonEmpty) {
labels.iterator.zip(weights.iterator).zip(matrix.rowIter)
.map { case ((label, weight), vec) => Instance(label, weight, vec) }
} else {
labels.iterator.zip(matrix.rowIter)
.map { case (label, vec) => Instance(label, 1.0, vec) }
}
}
def getLabel(i: Int): Double = labels(i)
def labelIter: Iterator[Double] = labels.iterator
@transient lazy val getWeight: Int => Double = {
if (weights.nonEmpty) {
(i: Int) => weights(i)
} else {
(i: Int) => 1.0
}
}
def weightIter: Iterator[Double] = {
if (weights.nonEmpty) {
weights.iterator
} else {
Iterator.fill(size)(1.0)
}
}
// directly get the non-zero iterator of i-th row vector without array copy or slice
@transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = {
matrix match {
case dm: DenseMatrix =>
(i: Int) =>
val start = numFeatures * i
Iterator.tabulate(numFeatures)(j =>
(j, dm.values(start + j))
).filter(_._2 != 0)
case sm: SparseMatrix =>
(i: Int) =>
val start = sm.colPtrs(i)
val end = sm.colPtrs(i + 1)
Iterator.tabulate(end - start)(j =>
(sm.rowIndices(start + j), sm.values(start + j))
).filter(_._2 != 0)
}
}
}
private[spark] object InstanceBlock {
def fromInstances(instances: Seq[Instance]): InstanceBlock = {
val labels = instances.map(_.label).toArray
val weights = if (instances.exists(_.weight != 1)) {
instances.map(_.weight).toArray
} else {
Array.emptyDoubleArray
}
val numRows = instances.length
val numCols = instances.head.features.size
val denseSize = Matrices.getDenseSize(numCols, numRows)
val nnz = instances.iterator.map(_.features.numNonzeros).sum
val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
val matrix = if (denseSize < sparseSize) {
val values = Array.ofDim[Double](numRows * numCols)
var offset = 0
var j = 0
while (j < numRows) {
instances(j).features.foreachNonZero { (i, v) =>
values(offset + i) = v
}
offset += numCols
j += 1
}
new DenseMatrix(numRows, numCols, values, true)
} else {
val colIndices = mutable.ArrayBuilder.make[Int]
val values = mutable.ArrayBuilder.make[Double]
val rowPtrs = mutable.ArrayBuilder.make[Int]
var rowPtr = 0
rowPtrs += 0
var j = 0
while (j < numRows) {
var nnz = 0
instances(j).features.foreachNonZero { (i, v) =>
colIndices += i
values += v
nnz += 1
}
rowPtr += nnz
rowPtrs += rowPtr
j += 1
}
new SparseMatrix(numRows, numCols, rowPtrs.result(),
colIndices.result(), values.result(), true)
}
InstanceBlock(labels, weights, matrix)
}
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
}
/**
* Case class that represents an instance of data point with
* label, weight, offset and features.

View file

@ -18,7 +18,7 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
/**
@ -32,28 +32,21 @@ import org.apache.spark.ml.linalg._
*
* @param bcCoefficients The coefficients corresponding to the features.
* @param fitIntercept Whether to fit an intercept term.
* @param bcFeaturesStd The standard deviation values of the features.
*/
private[ml] class HingeAggregator(
numFeatures: Int,
bcFeaturesStd: Broadcast[Array[Double]],
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] {
extends DifferentiableLossAggregator[Instance, HingeAggregator] {
private val numFeatures: Int = bcFeaturesStd.value.length
private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
protected override val dim: Int = numFeaturesPlusIntercept
@transient private lazy val coefficientsArray = bcCoefficients.value match {
case DenseVector(values) => values
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
s" but got type ${bcCoefficients.value.getClass}.")
}
@transient private lazy val linear = {
if (fitIntercept) {
new DenseVector(coefficientsArray.take(numFeatures))
} else {
new DenseVector(coefficientsArray)
}
}
protected override val dim: Int = numFeaturesPlusIntercept
/**
* Add a new training instance to this HingeAggregator, and update the loss and gradient
@ -69,13 +62,16 @@ private[ml] class HingeAggregator(
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = coefficientsArray
val localGradientSumArray = gradientSumArray
val dotProduct = {
var sum = 0.0
features.foreachNonZero { (index, value) =>
sum += localCoefficients(index) * value
if (localFeaturesStd(index) != 0.0) {
sum += localCoefficients(index) * value / localFeaturesStd(index)
}
}
if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
sum
@ -92,7 +88,9 @@ private[ml] class HingeAggregator(
if (1.0 > labelScaled * dotProduct) {
val gradientScale = -labelScaled * weight
features.foreachNonZero { (index, value) =>
localGradientSumArray(index) += value * gradientScale
if (localFeaturesStd(index) != 0.0) {
localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
}
}
if (fitIntercept) {
localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
@ -104,78 +102,4 @@ private[ml] class HingeAggregator(
this
}
}
/**
* Add a new training instance block to this HingeAggregator, and update the loss and gradient
* of the objective function.
*
* @param block The InstanceBlock to be added.
* @return This HingeAggregator object.
*/
def add(block: InstanceBlock): this.type = {
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
require(block.weightIter.forall(_ >= 0),
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
if (block.weightIter.forall(_ == 0)) return this
val size = block.size
val localGradientSumArray = gradientSumArray
// vec here represents dotProducts
val vec = if (fitIntercept && coefficientsArray.last != 0) {
val intercept = coefficientsArray.last
new DenseVector(Array.fill(size)(intercept))
} else {
new DenseVector(Array.ofDim[Double](size))
}
if (fitIntercept) {
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
} else {
BLAS.gemv(1.0, block.matrix, linear, 0.0, vec)
}
// in-place convert dotProducts to gradient scales
// then, vec represents gradient scales
var i = 0
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
weightSum += weight
// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val label = block.getLabel(i)
val labelScaled = 2 * label - 1.0
val loss = (1.0 - labelScaled * vec(i)) * weight
if (loss > 0) {
lossSum += loss
val gradScale = -labelScaled * weight
vec.values(i) = gradScale
} else {
vec.values(i) = 0.0
}
} else {
vec.values(i) = 0.0
}
i += 1
}
// predictions are all correct, no gradient signal
if (vec.values.forall(_ == 0)) return this
if (fitIntercept) {
// localGradientSumArray is of size numFeatures+1, so can not
// be directly used as the output of BLAS.gemv
val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec)
linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v }
localGradientSumArray(numFeatures) += vec.values.sum
} else {
val gradSumVec = new DenseVector(localGradientSumArray)
BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)
}
this
}
}

View file

@ -17,8 +17,8 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.Vector
/**
* HuberAggregator computes the gradient and loss for a huber loss function,
@ -62,17 +62,19 @@ import org.apache.spark.ml.linalg._
*
* @param fitIntercept Whether to fit an intercept term.
* @param epsilon The shape parameter to control the amount of robustness.
* @param bcFeaturesStd The broadcast standard deviation values of the features.
* @param bcParameters including three parts: the regression coefficients corresponding
* to the features, the intercept (if fitIntercept is ture)
* and the scale parameter (sigma).
*/
private[ml] class HuberAggregator(
numFeatures: Int,
fitIntercept: Boolean,
epsilon: Double)(bcParameters: Broadcast[Vector])
extends DifferentiableLossAggregator[InstanceBlock, HuberAggregator] {
epsilon: Double,
bcFeaturesStd: Broadcast[Array[Double]])(bcParameters: Broadcast[Vector])
extends DifferentiableLossAggregator[Instance, HuberAggregator] {
protected override val dim: Int = bcParameters.value.size
private val numFeatures: Int = if (fitIntercept) dim - 2 else dim - 1
private val sigma: Double = bcParameters.value(dim - 1)
private val intercept: Double = if (fitIntercept) {
bcParameters.value(dim - 2)
@ -80,8 +82,7 @@ private[ml] class HuberAggregator(
0.0
}
// make transient so we do not serialize between aggregation stages
@transient private lazy val linear =
new DenseVector(bcParameters.value.toArray.take(numFeatures))
@transient private lazy val coefficients = bcParameters.value.toArray.slice(0, numFeatures)
/**
* Add a new training instance to this HuberAggregator, and update the loss and gradient
@ -97,13 +98,16 @@ private[ml] class HuberAggregator(
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this
val localCoefficients = linear.values
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = coefficients
val localGradientSumArray = gradientSumArray
val margin = {
var sum = 0.0
features.foreachNonZero { (index, value) =>
sum += localCoefficients(index) * value
if (localFeaturesStd(index) != 0.0) {
sum += localCoefficients(index) * (value / localFeaturesStd(index))
}
}
if (fitIntercept) sum += intercept
sum
@ -115,7 +119,10 @@ private[ml] class HuberAggregator(
val linearLossDivSigma = linearLoss / sigma
features.foreachNonZero { (index, value) =>
localGradientSumArray(index) -= weight * linearLossDivSigma * value
if (localFeaturesStd(index) != 0.0) {
localGradientSumArray(index) +=
-1.0 * weight * linearLossDivSigma * (value / localFeaturesStd(index))
}
}
if (fitIntercept) {
localGradientSumArray(dim - 2) += -1.0 * weight * linearLossDivSigma
@ -127,7 +134,10 @@ private[ml] class HuberAggregator(
(sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon)
features.foreachNonZero { (index, value) =>
localGradientSumArray(index) += weight * sign * epsilon * value
if (localFeaturesStd(index) != 0.0) {
localGradientSumArray(index) +=
weight * sign * epsilon * (value / localFeaturesStd(index))
}
}
if (fitIntercept) {
localGradientSumArray(dim - 2) += weight * sign * epsilon
@ -139,75 +149,4 @@ private[ml] class HuberAggregator(
this
}
}
/**
* Add a new training instance block to this HuberAggregator, and update the loss and gradient
* of the objective function.
*
* @param block The instance block of data point to be added.
* @return This HuberAggregator object.
*/
def add(block: InstanceBlock): HuberAggregator = {
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
require(block.weightIter.forall(_ >= 0),
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
if (block.weightIter.forall(_ == 0)) return this
val size = block.size
val localGradientSumArray = gradientSumArray
// vec here represents margins or dotProducts
val vec = if (fitIntercept && intercept != 0) {
new DenseVector(Array.fill(size)(intercept))
} else {
new DenseVector(Array.ofDim[Double](size))
}
if (fitIntercept) {
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
} else {
BLAS.gemv(1.0, block.matrix, linear, 0.0, vec)
}
// in-place convert margins to multipliers
// then, vec represents multipliers
var i = 0
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
weightSum += weight
val label = block.getLabel(i)
val margin = vec(i)
val linearLoss = label - margin
if (math.abs(linearLoss) <= sigma * epsilon) {
lossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma)
val linearLossDivSigma = linearLoss / sigma
val multiplier = -1.0 * weight * linearLossDivSigma
vec.values(i) = multiplier
localGradientSumArray(dim - 1) += 0.5 * weight * (1.0 - math.pow(linearLossDivSigma, 2.0))
} else {
lossSum += 0.5 * weight *
(sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon)
val sign = if (linearLoss >= 0) -1.0 else 1.0
val multiplier = weight * sign * epsilon
vec.values(i) = multiplier
localGradientSumArray(dim - 1) += 0.5 * weight * (1.0 - epsilon * epsilon)
}
} else {
vec.values(i) = 0.0
}
i += 1
}
val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec)
linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v }
if (fitIntercept) {
localGradientSumArray(dim - 2) += vec.values.sum
}
this
}
}

View file

@ -17,8 +17,8 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
/**
* LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function,
@ -157,25 +157,26 @@ private[ml] class LeastSquaresAggregator(
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
bcFeaturesStd: Broadcast[Vector],
bcFeaturesMean: Broadcast[Vector])(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[InstanceBlock, LeastSquaresAggregator] {
bcFeaturesStd: Broadcast[Array[Double]],
bcFeaturesMean: Broadcast[Array[Double]])(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[Instance, LeastSquaresAggregator] {
require(labelStd > 0.0, s"${this.getClass.getName} requires the label standard " +
s"deviation to be positive.")
private val numFeatures = bcFeaturesStd.value.size
private val numFeatures = bcFeaturesStd.value.length
protected override val dim: Int = numFeatures
// make transient so we do not serialize between aggregation stages
@transient private lazy val featuresStd = bcFeaturesStd.value
@transient private lazy val effectiveCoefAndOffset = {
val coefficientsArray = bcCoefficients.value.toArray.clone()
val featuresMean = bcFeaturesMean.value
val featuresStd = bcFeaturesStd.value
var sum = 0.0
var i = 0
val len = coefficientsArray.length
while (i < len) {
if (featuresStd(i) != 0.0) {
sum += coefficientsArray(i) / featuresStd(i) * featuresMean(i)
coefficientsArray(i) /= featuresStd(i)
sum += coefficientsArray(i) * featuresMean(i)
} else {
coefficientsArray(i) = 0.0
}
@ -185,7 +186,7 @@ private[ml] class LeastSquaresAggregator(
(Vectors.dense(coefficientsArray), offset)
}
// do not use tuple assignment above because it will circumvent the @transient tag
@transient private lazy val effectiveCoefficientsVec = effectiveCoefAndOffset._1
@transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
@transient private lazy val offset = effectiveCoefAndOffset._2
/**
@ -203,20 +204,16 @@ private[ml] class LeastSquaresAggregator(
if (weight == 0.0) return this
val localEffectiveCoefficientsVec = effectiveCoefficientsVec
val diff = {
var dot = 0.0
features.foreachNonZero { (index, value) =>
dot += localEffectiveCoefficientsVec(index) * value
}
dot - label / labelStd + offset
}
val diff = BLAS.dot(features, effectiveCoefficientsVector) - label / labelStd + offset
if (diff != 0) {
val localGradientSumArray = gradientSumArray
val localFeaturesStd = featuresStd
features.foreachNonZero { (index, value) =>
localGradientSumArray(index) += weight * diff * value
val fStd = localFeaturesStd(index)
if (fStd != 0.0) {
localGradientSumArray(index) += weight * diff * value / fStd
}
}
lossSum += weight * diff * diff / 2.0
}
@ -224,43 +221,4 @@ private[ml] class LeastSquaresAggregator(
this
}
}
/**
* Add a new training instance block to this LeastSquaresAggregator, and update the loss
* and gradient of the objective function.
*
* @param block The instance block of data point to be added.
* @return This LeastSquaresAggregator object.
*/
def add(block: InstanceBlock): LeastSquaresAggregator = {
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
require(block.weightIter.forall(_ >= 0),
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
if (block.weightIter.forall(_ == 0)) return this
val size = block.size
// vec here represents diffs
val vec = new DenseVector(Array.tabulate(size)(i => offset - block.getLabel(i) / labelStd))
BLAS.gemv(1.0, block.matrix, effectiveCoefficientsVec, 1.0, vec)
// in-place convert diffs to multipliers
// then, vec represents multipliers
var i = 0
while (i < size) {
val weight = block.getWeight(i)
val diff = vec(i)
lossSum += weight * diff * diff / 2
weightSum += weight
val multiplier = weight * diff
vec.values(i) = multiplier
i += 1
}
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)
this
}
}

View file

@ -18,8 +18,8 @@ package org.apache.spark.ml.optim.aggregator
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.util.MLUtils
/**
@ -171,6 +171,7 @@ import org.apache.spark.mllib.util.MLUtils
*
*
* @param bcCoefficients The broadcast coefficients corresponding to the features.
* @param bcFeaturesStd The broadcast standard deviation values of the features.
* @param numClasses the number of possible outcomes for k classes classification problem in
* Multinomial Logistic Regression.
* @param fitIntercept Whether to fit an intercept term.
@ -182,12 +183,13 @@ import org.apache.spark.mllib.util.MLUtils
* since this form is optimal for the matrix operations used for prediction.
*/
private[ml] class LogisticAggregator(
numFeatures: Int,
bcFeaturesStd: Broadcast[Array[Double]],
numClasses: Int,
fitIntercept: Boolean,
multinomial: Boolean)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[InstanceBlock, LogisticAggregator] with Logging {
extends DifferentiableLossAggregator[Instance, LogisticAggregator] with Logging {
private val numFeatures = bcFeaturesStd.value.length
private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
private val coefficientSize = bcCoefficients.value.size
protected override val dim: Int = coefficientSize
@ -207,31 +209,6 @@ private[ml] class LogisticAggregator(
s"got type ${bcCoefficients.value.getClass}.)")
}
@transient private lazy val binaryLinear = {
if (!multinomial) {
if (fitIntercept) {
new DenseVector(coefficientsArray.take(numFeatures))
} else {
new DenseVector(coefficientsArray)
}
} else {
null
}
}
@transient private lazy val multinomialLinear = {
if (multinomial) {
if (fitIntercept) {
new DenseMatrix(numClasses, numFeatures, coefficientsArray.take(numClasses * numFeatures))
} else {
new DenseMatrix(numClasses, numFeatures, coefficientsArray)
}
} else {
null
}
}
if (multinomial && numClasses <= 2) {
logInfo(s"Multinomial logistic regression for binary classification yields separate " +
s"coefficients for positive and negative classes. When no regularization is applied, the" +
@ -242,12 +219,15 @@ private[ml] class LogisticAggregator(
/** Update gradient and loss using binary loss function. */
private def binaryUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = {
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = coefficientsArray
val localGradientArray = gradientSumArray
val margin = - {
var sum = 0.0
features.foreachNonZero { (index, value) =>
sum += localCoefficients(index) * value
if (localFeaturesStd(index) != 0.0) {
sum += localCoefficients(index) * value / localFeaturesStd(index)
}
}
if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
sum
@ -256,7 +236,9 @@ private[ml] class LogisticAggregator(
val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
features.foreachNonZero { (index, value) =>
localGradientArray(index) += multiplier * value
if (localFeaturesStd(index) != 0.0) {
localGradientArray(index) += multiplier * value / localFeaturesStd(index)
}
}
if (fitIntercept) {
@ -271,61 +253,6 @@ private[ml] class LogisticAggregator(
}
}
/** Update gradient and loss using binary loss function. */
private def binaryUpdateInPlace(block: InstanceBlock): Unit = {
val size = block.size
val localGradientSumArray = gradientSumArray
// vec here represents margins or negative dotProducts
val vec = if (fitIntercept && coefficientsArray.last != 0) {
val intercept = coefficientsArray.last
new DenseVector(Array.fill(size)(intercept))
} else {
new DenseVector(Array.ofDim[Double](size))
}
if (fitIntercept) {
BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec)
} else {
BLAS.gemv(-1.0, block.matrix, binaryLinear, 0.0, vec)
}
// in-place convert margins to multiplier
// then, vec represents multiplier
var i = 0
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
weightSum += weight
val label = block.getLabel(i)
val margin = vec(i)
if (label > 0) {
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.
lossSum += weight * MLUtils.log1pExp(margin)
} else {
lossSum += weight * (MLUtils.log1pExp(margin) - margin)
}
val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
vec.values(i) = multiplier
} else {
vec.values(i) = 0.0
}
i += 1
}
if (fitIntercept) {
// localGradientSumArray is of size numFeatures+1, so can not
// be directly used as the output of BLAS.gemv
val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec)
linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v }
localGradientSumArray(numFeatures) += vec.values.sum
} else {
val gradSumVec = new DenseVector(localGradientSumArray)
BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)
}
}
/** Update gradient and loss using multinomial (softmax) loss function. */
private def multinomialUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = {
// TODO: use level 2 BLAS operations
@ -333,6 +260,7 @@ private[ml] class LogisticAggregator(
Note: this can still be used when numClasses = 2 for binary
logistic regression without pivoting.
*/
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = coefficientsArray
val localGradientArray = gradientSumArray
@ -342,12 +270,15 @@ private[ml] class LogisticAggregator(
val margins = new Array[Double](numClasses)
features.foreachNonZero { (index, value) =>
if (localFeaturesStd(index) != 0.0) {
val stdValue = value / localFeaturesStd(index)
var j = 0
while (j < numClasses) {
margins(j) += localCoefficients(index * numClasses + j) * value
margins(j) += localCoefficients(index * numClasses + j) * stdValue
j += 1
}
}
}
var i = 0
while (i < numClasses) {
if (fitIntercept) {
@ -383,12 +314,15 @@ private[ml] class LogisticAggregator(
multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0)
}
features.foreachNonZero { (index, value) =>
if (localFeaturesStd(index) != 0.0) {
val stdValue = value / localFeaturesStd(index)
var j = 0
while (j < numClasses) {
localGradientArray(index * numClasses + j) += weight * multipliers(j) * value
localGradientArray(index * numClasses + j) += weight * multipliers(j) * stdValue
j += 1
}
}
}
if (fitIntercept) {
var i = 0
while (i < numClasses) {
@ -405,112 +339,6 @@ private[ml] class LogisticAggregator(
lossSum += weight * loss
}
/** Update gradient and loss using multinomial (softmax) loss function. */
private def multinomialUpdateInPlace(block: InstanceBlock): Unit = {
val size = block.size
val localGradientSumArray = gradientSumArray
// mat here represents margins, shape: S X C
val mat = new DenseMatrix(size, numClasses, Array.ofDim[Double](size * numClasses))
if (fitIntercept) {
val intercept = coefficientsArray.takeRight(numClasses)
var i = 0
while (i < size) {
var j = 0
while (j < numClasses) {
mat.update(i, j, intercept(j))
j += 1
}
i += 1
}
BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 1.0, mat)
} else {
BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 0.0, mat)
}
// in-place convert margins to multipliers
// then, mat represents multipliers
var i = 0
val tmp = Array.ofDim[Double](numClasses)
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
weightSum += weight
val label = block.getLabel(i)
var maxMargin = Double.NegativeInfinity
var j = 0
while (j < numClasses) {
tmp(j) = mat(i, j)
maxMargin = math.max(maxMargin, tmp(j))
j += 1
}
// marginOfLabel is margins(label) in the formula
val marginOfLabel = tmp(label.toInt)
var sum = 0.0
j = 0
while (j < numClasses) {
if (maxMargin > 0) tmp(j) -= maxMargin
val exp = math.exp(tmp(j))
sum += exp
tmp(j) = exp
j += 1
}
j = 0
while (j < numClasses) {
val multiplier = weight * (tmp(j) / sum - (if (label == j) 1.0 else 0.0))
mat.update(i, j, multiplier)
j += 1
}
if (maxMargin > 0) {
lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin)
} else {
lossSum += weight * (math.log(sum) - marginOfLabel)
}
} else {
var j = 0
while (j < numClasses) {
mat.update(i, j, 0.0)
j += 1
}
}
i += 1
}
// block.matrix: S X F, unknown type
// mat (multipliers): S X C, dense
// gradSumMat(gradientSumArray): C X FPI (numFeaturesPlusIntercept), dense
block.matrix match {
case dm: DenseMatrix if !fitIntercept =>
// If fitIntercept==false, gradientSumArray += mat.T X matrix
// GEMM requires block.matrix is dense
val gradSumMat = new DenseMatrix(numClasses, numFeatures, localGradientSumArray)
BLAS.gemm(1.0, mat.transpose, dm, 1.0, gradSumMat)
case _ =>
// Otherwise, use linearGradSumMat (F X C) as a temp matrix:
// linearGradSumMat = matrix.T X mat
val linearGradSumMat = new DenseMatrix(numFeatures, numClasses,
Array.ofDim[Double](numFeatures * numClasses))
BLAS.gemm(1.0, block.matrix.transpose, mat, 0.0, linearGradSumMat)
linearGradSumMat.foreachActive { (i, j, v) =>
if (v != 0) localGradientSumArray(i * numClasses + j) += v
}
if (fitIntercept) {
val start = numClasses * numFeatures
mat.foreachActive { (i, j, v) =>
if (v != 0) localGradientSumArray(start + j) += v
}
}
}
}
/**
* Add a new training instance to this LogisticAggregator, and update the loss and gradient
* of the objective function.
@ -535,28 +363,4 @@ private[ml] class LogisticAggregator(
this
}
}
/**
* Add a new training instance block to this LogisticAggregator, and update the loss and gradient
* of the objective function.
*
* @param block The instance block of data point to be added.
* @return This LogisticAggregator object.
*/
def add(block: InstanceBlock): this.type = {
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
require(block.weightIter.forall(_ >= 0),
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
if (block.weightIter.forall(_ == 0)) return this
if (multinomial) {
multinomialUpdateInPlace(block)
} else {
binaryUpdateInPlace(block)
}
this
}
}

View file

@ -104,11 +104,7 @@ private[shared] object SharedParamsCodeGen {
isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"),
ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " +
"each row is for training or for validation. False indicates training; true indicates " +
"validation"),
ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
"stacked within partitions. If block size is more than remaining data in a partition " +
"then it is adjusted to the size of this data", Some("1024"),
isValid = "ParamValidators.gt(0)", isExpertParam = true)
"validation.")
)
val code = genSharedParams(params)

View file

@ -570,31 +570,12 @@ trait HasDistanceMeasure extends Params {
trait HasValidationIndicatorCol extends Params {
/**
* Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.
* Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation..
* @group param
*/
final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation")
final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.")
/** @group getParam */
final def getValidationIndicatorCol: String = $(validationIndicatorCol)
}
/**
* Trait for shared param blockSize (default: 1024). This trait may be changed or
* removed between minor versions.
*/
@DeveloperApi
trait HasBlockSize extends Params {
/**
* Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.
* @group expertParam
*/
final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data", ParamValidators.gt(0))
setDefault(blockSize, 1024)
/** @group expertGetParam */
final def getBlockSize: Int = $(blockSize)
}
// scalastyle:on

View file

@ -54,8 +54,7 @@ import org.apache.spark.util.random.XORShiftRandom
/**
* Common params for ALS and ALSModel.
*/
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol
with HasBlockSize {
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
/**
* Param for the column name for user ids. Ids must be integers. Other
* numeric types are supported for this column, but will be cast to integers as long as they
@ -126,8 +125,6 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo
/** @group expertGetParam */
def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT)
setDefault(blockSize -> 4096)
}
/**
@ -291,15 +288,6 @@ class ALSModel private[ml] (
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) =>
if (featuresA != null && featuresB != null) {
var dotProduct = 0.0f
@ -363,7 +351,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllUsers(numItems: Int): DataFrame = {
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
}
/**
@ -378,7 +366,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
}
/**
@ -389,7 +377,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllItems(numUsers: Int): DataFrame = {
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
}
/**
@ -404,7 +392,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
}
/**
@ -453,12 +441,11 @@ class ALSModel private[ml] (
dstFactors: DataFrame,
srcOutputColumn: String,
dstOutputColumn: String,
num: Int,
blockSize: Int): DataFrame = {
num: Int): DataFrame = {
import srcFactors.sparkSession.implicits._
val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
.flatMap { case (srcIter, dstIter) =>
@ -496,10 +483,11 @@ class ALSModel private[ml] (
/**
* Blockifies factors to improve the efficiency of cross join
* TODO: SPARK-20443 - expose blockSize as a param?
*/
private def blockify(
factors: Dataset[(Int, Array[Float])],
blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
import factors.sparkSession.implicits._
factors.mapPartitions(_.grouped(blockSize))
}
@ -666,15 +654,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
/**
* Sets both numUserBlocks and numItemBlocks to the specific value.
*
@ -704,7 +683,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
instr.logDataset(dataset)
instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
seed, intermediateStorageLevel, finalStorageLevel, blockSize)
seed, intermediateStorageLevel, finalStorageLevel)
val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
@ -715,8 +694,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
checkpointInterval = $(checkpointInterval), seed = $(seed))
val userDF = userFactors.toDF("id", "features")
val itemDF = itemFactors.toDF("id", "features")
val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize))
.setParent(this)
val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
copyValues(model)
}

View file

@ -28,7 +28,7 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{PipelineStage, PredictorParams}
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.linalg.BLAS._
import org.apache.spark.ml.optim.WeightedLeastSquares
@ -55,7 +55,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion
private[regression] trait LinearRegressionParams extends PredictorParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver
with HasAggregationDepth with HasLoss with HasBlockSize {
with HasAggregationDepth with HasLoss {
import LinearRegression._
@ -316,15 +316,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
def setEpsilon(value: Double): this.type = set(epsilon, value)
setDefault(epsilon -> 1.35)
/**
* Set block size for stacking input data in matrices.
* Default is 1024.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
override protected def train(dataset: Dataset[_]): LinearRegressionModel = instrumented { instr =>
// Extract the number of features before deciding optimization solver.
val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol))
@ -363,6 +354,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
return lrModel.setSummary(Some(trainingSummary))
}
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
val (featuresSummarizer, ySummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std"),
Summarizer.createSummarizerBuffer("mean", "std", "count")))(
@ -398,6 +392,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
s"will be zeros and the intercept will be the mean of the label; as a result, " +
s"training is not needed.")
}
if (handlePersistence) instances.unpersist()
val coefficients = Vectors.sparse(numFeatures, Seq.empty)
val intercept = yMean
@ -426,8 +421,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
// if y is constant (rawYStd is zero), then y cannot be scaled. In this case
// setting yStd=abs(yMean) ensures that y is not scaled anymore in l-bfgs algorithm.
val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean)
val featuresMean = featuresSummarizer.mean.compressed
val featuresStd = featuresSummarizer.std.compressed
val featuresMean = featuresSummarizer.mean.toArray
val featuresStd = featuresSummarizer.std.toArray
val bcFeaturesMean = instances.context.broadcast(featuresMean)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
@ -447,36 +442,23 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
val getFeaturesStd = (j: Int) => if (j >= 0 && j < numFeatures) featuresStd(j) else 0.0
val regularization = if (effectiveL2RegParam != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization(effectiveL2RegParam, shouldApply,
if ($(standardization)) None else Some(featuresStd.apply)))
if ($(standardization)) None else Some(getFeaturesStd)))
} else {
None
}
val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")
val costFun = $(loss) match {
case SquaredError =>
val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept),
bcFeaturesStd, bcFeaturesMean)(_)
new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth))
new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth))
case Huber =>
val getAggregatorFunc = new HuberAggregator(numFeatures, $(fitIntercept), $(epsilon))(_)
new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth))
val getAggregatorFunc = new HuberAggregator($(fitIntercept), $(epsilon), bcFeaturesStd)(_)
new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth))
}
val optimizer = $(loss) match {
@ -542,7 +524,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
throw new SparkException(msg)
}
blocks.unpersist()
bcFeaturesMean.destroy()
bcFeaturesStd.destroy()
@ -576,7 +557,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
after the coefficients are converged. See the following discussion for detail.
http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
*/
yMean - dot(Vectors.dense(rawCoefficients), featuresMean)
yMean - dot(Vectors.dense(rawCoefficients), Vectors.dense(featuresMean))
case Huber => parameters(numFeatures)
}
} else {
@ -591,6 +572,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
(Vectors.dense(rawCoefficients).compressed, interceptValue, scaleValue, arrayBuilder.result())
}
if (handlePersistence) instances.unpersist()
val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept, scale))
// Handle possible missing or invalid prediction columns
val (summaryModel, predictionColName) = model.findSummaryModelAndPredictionCol()

View file

@ -339,8 +339,10 @@ class LogisticRegressionWithLBFGS
// Convert our input into a DataFrame
val spark = SparkSession.builder().sparkContext(input.context).getOrCreate()
val df = spark.createDataFrame(input.map(_.asML))
// Determine if we should cache the DF
val handlePersistence = input.getStorageLevel == StorageLevel.NONE
// Train our model
val mlLogisticRegressionModel = lr.train(df)
val mlLogisticRegressionModel = lr.train(df, handlePersistence)
// convert the model
val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray)
createModel(weights, mlLogisticRegressionModel.intercept)

View file

@ -179,7 +179,7 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
test("sparse coefficients in HingeAggregator") {
val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
val agg = new HingeAggregator(1, true)(bcCoefficients)
val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients)
val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") {
intercept[IllegalArgumentException] {
agg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))

View file

@ -542,7 +542,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
test("sparse coefficients in LogisticAggregator") {
val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
val binaryAgg = new LogisticAggregator(1, 2,
val binaryAgg = new LogisticAggregator(bcFeaturesStd, 2,
fitIntercept = true, multinomial = false)(bcCoefficientsBinary)
val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") {
intercept[IllegalArgumentException] {
@ -552,7 +552,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
assert(thrownBinary.getMessage.contains("coefficients only supports dense"))
val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0)))
val multinomialAgg = new LogisticAggregator(1, 3,
val multinomialAgg = new LogisticAggregator(bcFeaturesStd, 3,
fitIntercept = true, multinomial = true)(bcCoefficientsMulti)
val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") {
intercept[IllegalArgumentException] {

View file

@ -42,36 +42,5 @@ class InstanceSuite extends SparkFunSuite{
val o2 = ser.deserialize[OffsetInstance](ser.serialize(o))
assert(o === o2)
}
val block1 = InstanceBlock.fromInstances(Seq(instance1))
val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2))
Seq(block1, block2).foreach { o =>
val o2 = ser.deserialize[InstanceBlock](ser.serialize(o))
assert(o.labels === o2.labels)
assert(o.weights === o2.weights)
assert(o.matrix === o2.matrix)
}
}
test("InstanceBlock: check correctness") {
val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
val instances = Seq(instance1, instance2)
val block = InstanceBlock.fromInstances(instances)
assert(block.size === 2)
assert(block.numFeatures === 2)
block.instanceIterator.zipWithIndex.foreach {
case (instance, i) =>
assert(instance.label === instances(i).label)
assert(instance.weight === instances(i).weight)
assert(instance.features.toArray === instances(i).features.toArray)
}
Seq(0, 1).foreach { i =>
val nzIter = block.getNonZeroIter(i)
val vec = Vectors.sparse(2, nzIter.toSeq)
assert(vec.toArray === instances(i).features.toArray)
}
}
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
@ -32,21 +32,21 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
override def beforeAll(): Unit = {
super.beforeAll()
instances = standardize(Array(
instances = Array(
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
Instance(0.0, 0.3, Vectors.dense(4.0, 0.5))
))
instancesConstantFeature = standardize(Array(
)
instancesConstantFeature = Array(
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
Instance(1.0, 0.3, Vectors.dense(1.0, 0.5))
))
instancesConstantFeatureFiltered = standardize(Array(
)
instancesConstantFeatureFiltered = Array(
Instance(0.0, 0.1, Vectors.dense(2.0)),
Instance(1.0, 0.5, Vectors.dense(1.0)),
Instance(2.0, 0.3, Vectors.dense(0.5))
))
)
}
/** Get summary statistics for some data and create a new HingeAggregator. */
@ -54,23 +54,12 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
instances: Array[Instance],
coefficients: Vector,
fitIntercept: Boolean): HingeAggregator = {
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
new HingeAggregator(instances.head.features.size, fitIntercept)(bcCoefficients)
}
private def standardize(instances: Array[Instance]): Array[Instance] = {
val (featuresSummarizer, _) =
val (featuresSummarizer, ySummarizer) =
DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt)
val numFeatures = stdArray.length
instances.map { case Instance(label, weight, features) =>
val standardized = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = stdArray(i)
if (std != 0) standardized(i) = v / std
}
Instance(label, weight, Vectors.dense(standardized).compressed)
}
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
}
test("aggregator add method input size") {
@ -171,21 +160,4 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
}
test("add instance block") {
val coefArray = Array(1.0, 2.0)
val intercept = 1.0
val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
fitIntercept = true)
instances.foreach(agg.add)
val agg2 = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
fitIntercept = true)
val block = InstanceBlock.fromInstances(instances)
agg2.add(block)
assert(agg.loss ~== agg2.loss relTol 1e-8)
assert(agg.gradient ~== agg2.gradient relTol 1e-8)
}
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
@ -32,21 +32,21 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
override def beforeAll(): Unit = {
super.beforeAll()
instances = standardize(Array(
instances = Array(
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
))
instancesConstantFeature = standardize(Array(
)
instancesConstantFeature = Array(
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
Instance(2.0, 0.3, Vectors.dense(1.0, 0.5))
))
instancesConstantFeatureFiltered = standardize(Array(
)
instancesConstantFeatureFiltered = Array(
Instance(0.0, 0.1, Vectors.dense(2.0)),
Instance(1.0, 0.5, Vectors.dense(1.0)),
Instance(2.0, 0.3, Vectors.dense(0.5))
))
)
}
/** Get summary statistics for some data and create a new HuberAggregator. */
@ -56,28 +56,10 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
fitIntercept: Boolean,
epsilon: Double): HuberAggregator = {
val (featuresSummarizer, _) = getRegressionSummarizers(instances)
val numFeatures = featuresSummarizer.variance.size
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
val bcParameters = spark.sparkContext.broadcast(parameters)
new HuberAggregator(numFeatures, fitIntercept, epsilon)(bcParameters)
}
private def standardize(
instances: Array[Instance],
std: Array[Double] = null): Array[Instance] = {
val stdArray = if (std == null) {
getRegressionSummarizers(instances)._1.variance.toArray.map(math.sqrt)
} else {
std
}
val numFeatures = stdArray.length
instances.map { case Instance(label, weight, features) =>
val standardized = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = stdArray(i)
if (std != 0) standardized(i) = v / std
}
Instance(label, weight, Vectors.dense(standardized).compressed)
}
new HuberAggregator(fitIntercept, epsilon, bcFeaturesStd)(bcParameters)
}
test("aggregator add method should check input size") {
@ -173,15 +155,9 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
val parametersFiltered = Vectors.dense(2.0, 3.0, 4.0)
val aggConstantFeature = getNewAggregator(instancesConstantFeature, parameters,
fitIntercept = true, epsilon = 1.35)
// std of instancesConstantFeature
val stdConstantFeature = getRegressionSummarizers(instancesConstantFeature)
._1.variance.toArray.map(math.sqrt)
// Since 3.0.0, we start to standardize input outside of gradient computation,
// so here we use std of instancesConstantFeature to standardize instances
standardize(instances, stdConstantFeature).foreach(aggConstantFeature.add)
val aggConstantFeatureFiltered = getNewAggregator(instancesConstantFeatureFiltered,
parametersFiltered, fitIntercept = true, epsilon = 1.35)
instances.foreach(aggConstantFeature.add)
instancesConstantFeatureFiltered.foreach(aggConstantFeatureFiltered.add)
// constant features should not affect gradient
def validateGradient(grad: Vector, gradFiltered: Vector): Unit = {
@ -191,19 +167,4 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
validateGradient(aggConstantFeature.gradient, aggConstantFeatureFiltered.gradient)
}
test("add instance block") {
val paramWithIntercept = Vectors.dense(1.0, 2.0, 3.0, 4.0)
val agg1 = getNewAggregator(instances, paramWithIntercept,
fitIntercept = true, epsilon = 1.35)
instances.foreach(agg1.add)
val agg2 = getNewAggregator(instances, paramWithIntercept,
fitIntercept = true, epsilon = 1.35)
val block = InstanceBlock.fromInstances(instances)
agg2.add(block)
assert(agg1.loss ~== agg2.loss relTol 1e-8)
assert(agg1.gradient ~== agg2.gradient relTol 1e-8)
}
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
@ -32,21 +32,21 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
override def beforeAll(): Unit = {
super.beforeAll()
instances = standardize(Array(
instances = Array(
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
))
instancesConstantFeature = standardize(Array(
)
instancesConstantFeature = Array(
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
Instance(2.0, 0.3, Vectors.dense(1.0, 0.5))
))
instancesConstantLabel = standardize(Array(
)
instancesConstantLabel = Array(
Instance(1.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
Instance(1.0, 0.3, Vectors.dense(4.0, 0.5))
))
)
}
/** Get summary statistics for some data and create a new LeastSquaresAggregator. */
@ -57,34 +57,15 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances)
val yStd = math.sqrt(ySummarizer.variance(0))
val yMean = ySummarizer.mean(0)
val featuresStd = Vectors.dense(featuresSummarizer.variance.toArray.map(math.sqrt))
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
val featuresMean = featuresSummarizer.mean.asML
val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.compressed)
val bcCoefficients = spark.sparkContext.broadcast(coefficients.compressed)
val featuresMean = featuresSummarizer.mean
val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.toArray)
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd,
bcFeaturesMean)(bcCoefficients)
}
private def standardize(
instances: Array[Instance],
std: Array[Double] = null): Array[Instance] = {
val stdArray = if (std == null) {
getRegressionSummarizers(instances)._1.variance.toArray.map(math.sqrt)
} else {
std
}
val numFeatures = stdArray.length
instances.map { case Instance(label, weight, features) =>
val standardized = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = stdArray(i)
if (std != 0) standardized(i) = v / std
}
Instance(label, weight, Vectors.dense(standardized).compressed)
}
}
test("aggregator add method input size") {
val coefficients = Vectors.dense(1.0, 2.0)
val agg = getNewAggregator(instances, coefficients, fitIntercept = true)
@ -164,15 +145,9 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
test("check with zero standard deviation") {
val coefficients = Vectors.dense(1.0, 2.0)
// aggConstantFeature contains std of instancesConstantFeature, and the std of dim=0 is 0
val aggConstantFeature = getNewAggregator(instancesConstantFeature, coefficients,
fitIntercept = true)
// std of instancesConstantFeature
val stdConstantFeature = getRegressionSummarizers(instancesConstantFeature)
._1.variance.toArray.map(math.sqrt)
// Since 3.0.0, we start to standardize input outside of gradient computation,
// so here we use std of instancesConstantFeature to standardize instances
standardize(instances, stdConstantFeature).foreach(aggConstantFeature.add)
instances.foreach(aggConstantFeature.add)
// constant features should not affect gradient
assert(aggConstantFeature.gradient(0) === 0.0)
@ -182,17 +157,4 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
}
}
}
test("add instance block") {
val coefficients = Vectors.dense(1.0, 2.0)
val agg1 = getNewAggregator(instances, coefficients, fitIntercept = true)
instances.foreach(agg1.add)
val agg2 = getNewAggregator(instances, coefficients, fitIntercept = true)
val block = InstanceBlock.fromInstances(instances)
agg2.add(block)
assert(agg1.loss ~== agg2.loss relTol 1e-8)
assert(agg1.gradient ~== agg2.gradient relTol 1e-8)
}
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
@ -32,21 +32,21 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
override def beforeAll(): Unit = {
super.beforeAll()
instances = standardize(Array(
instances = Array(
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
))
instancesConstantFeature = standardize(Array(
)
instancesConstantFeature = Array(
Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
Instance(2.0, 0.3, Vectors.dense(1.0, 0.5))
))
instancesConstantFeatureFiltered = standardize(Array(
)
instancesConstantFeatureFiltered = Array(
Instance(0.0, 0.1, Vectors.dense(2.0)),
Instance(1.0, 0.5, Vectors.dense(1.0)),
Instance(2.0, 0.3, Vectors.dense(0.5))
))
)
}
/** Get summary statistics for some data and create a new LogisticAggregator. */
@ -55,27 +55,13 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
coefficients: Vector,
fitIntercept: Boolean,
isMultinomial: Boolean): LogisticAggregator = {
val (_, ySummarizer) =
val (featuresSummarizer, ySummarizer) =
DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
val numClasses = ySummarizer.histogram.length
val numFeatures = instances.head.features.size
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
new LogisticAggregator(numFeatures, numClasses, fitIntercept, isMultinomial)(bcCoefficients)
}
private def standardize(instances: Array[Instance]): Array[Instance] = {
val (featuresSummarizer, _) =
DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt)
val numFeatures = stdArray.length
instances.map { case Instance(label, weight, features) =>
val standardized = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = stdArray(i)
if (std != 0) standardized(i) = v / std
}
Instance(label, weight, Vectors.dense(standardized).compressed)
}
new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients)
}
test("aggregator add method input size") {
@ -291,24 +277,4 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
validateGradient(aggConstantFeatureBinary.gradient,
aggConstantFeatureBinaryFiltered.gradient, 1)
}
test("add instance block") {
val binaryInstances = instances.map { instance =>
if (instance.label <= 1.0) instance else Instance(0.0, instance.weight, instance.features)
}
val coefArray = Array(1.0, 2.0)
val intercept = 1.0
val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)),
fitIntercept = true, isMultinomial = false)
binaryInstances.foreach(agg.add)
val agg2 = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)),
fitIntercept = true, isMultinomial = false)
val block = InstanceBlock.fromInstances(binaryInstances)
agg2.add(block)
assert(agg.loss ~== agg2.loss relTol 1e-8)
assert(agg.gradient ~== agg2.gradient relTol 1e-8)
}
}

View file

@ -165,8 +165,7 @@ class JavaProbabilisticClassificationModel(JavaClassificationModel,
class _LinearSVCParams(_JavaClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol,
HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold,
HasBlockSize):
HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold):
"""
Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`.
@ -215,8 +214,6 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable
LinearSVCModel...
>>> model.getThreshold()
0.5
>>> model.getBlockSize()
1024
>>> model.coefficients
DenseVector([0.0, -0.2792, -0.1833])
>>> model.intercept
@ -255,19 +252,18 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
aggregationDepth=2, blockSize=1024):
aggregationDepth=2):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
aggregationDepth=2, blockSize=1024):
aggregationDepth=2):
"""
super(LinearSVC, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.LinearSVC", self.uid)
self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True,
standardization=True, threshold=0.0, aggregationDepth=2,
blockSize=1024)
standardization=True, threshold=0.0, aggregationDepth=2)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@ -276,12 +272,12 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable
def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
aggregationDepth=2, blockSize=1024):
aggregationDepth=2):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
aggregationDepth=2, blockSize=1024):
aggregationDepth=2):
Sets params for Linear SVM Classifier.
"""
kwargs = self._input_kwargs
@ -346,13 +342,6 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable
"""
return self._set(aggregationDepth=value)
@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)
class LinearSVCModel(JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable):
"""
@ -388,7 +377,7 @@ class LinearSVCModel(JavaClassificationModel, _LinearSVCParams, JavaMLWritable,
class _LogisticRegressionParams(_JavaProbabilisticClassifierParams, HasRegParam,
HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol,
HasStandardization, HasWeightCol, HasAggregationDepth,
HasThreshold, HasBlockSize):
HasThreshold):
"""
Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`.
@ -570,8 +559,6 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
10
>>> blor.clear(blor.maxIter)
>>> blorModel = blor.fit(bdf)
>>> blorModel.getBlockSize()
1024
>>> blorModel.setFeaturesCol("features")
LogisticRegressionModel...
>>> blorModel.setProbabilityCol("newProbability")
@ -640,7 +627,7 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
rawPredictionCol="rawPrediction", standardization=True, weightCol=None,
aggregationDepth=2, family="auto",
lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None,
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024):
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
@ -649,14 +636,13 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \
aggregationDepth=2, family="auto", \
lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024):
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None):
If the threshold and thresholds Params are both set, they must be equivalent.
"""
super(LogisticRegression, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.LogisticRegression", self.uid)
self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto",
blockSize=1024)
self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto")
kwargs = self._input_kwargs
self.setParams(**kwargs)
self._checkThresholdConsistency()
@ -669,7 +655,7 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
rawPredictionCol="rawPrediction", standardization=True, weightCol=None,
aggregationDepth=2, family="auto",
lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None,
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024):
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
@ -677,7 +663,7 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \
aggregationDepth=2, family="auto", \
lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024):
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None):
Sets params for logistic regression.
If the threshold and thresholds Params are both set, they must be equivalent.
"""
@ -772,13 +758,6 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
"""
return self._set(aggregationDepth=value)
@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)
class LogisticRegressionModel(JavaProbabilisticClassificationModel, _LogisticRegressionParams,
JavaMLWritable, JavaMLReadable, HasTrainingSummary):
@ -2174,7 +2153,7 @@ class NaiveBayesModel(JavaProbabilisticClassificationModel, _NaiveBayesParams, J
class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter,
HasTol, HasStepSize, HasSolver, HasBlockSize):
HasTol, HasStepSize, HasSolver):
"""
Params for :py:class:`MultilayerPerceptronClassifier`.
@ -2185,6 +2164,11 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
"E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " +
"neurons and output layer of 10 neurons.",
typeConverter=TypeConverters.toListInt)
blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " +
"matrices. Data is stacked within partitions. If block size is more than " +
"remaining data in a partition then it is adjusted to the size of this " +
"data. Recommended size is between 10 and 1000, default is 128.",
typeConverter=TypeConverters.toInt)
solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
"options: l-bfgs, gd.", typeConverter=TypeConverters.toString)
initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.",
@ -2197,6 +2181,13 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
"""
return self.getOrDefault(self.layers)
@since("1.6.0")
def getBlockSize(self):
"""
Gets the value of blockSize or its default value.
"""
return self.getOrDefault(self.blockSize)
@since("2.0.0")
def getInitialWeights(self):
"""
@ -2220,17 +2211,11 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
... (1.0, Vectors.dense([0.0, 1.0])),
... (1.0, Vectors.dense([1.0, 0.0])),
... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"])
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123)
>>> mlp.setMaxIter(100)
MultilayerPerceptronClassifier...
>>> mlp.getMaxIter()
100
>>> mlp.getBlockSize()
128
>>> mlp.setBlockSize(1)
MultilayerPerceptronClassifier...
>>> mlp.getBlockSize()
1
>>> model = mlp.fit(df)
>>> model.setFeaturesCol("features")
MultilayerPerceptronClassificationModel...

View file

@ -164,10 +164,7 @@ if __name__ == "__main__":
"'euclidean'", "TypeConverters.toString"),
("validationIndicatorCol", "name of the column that indicates whether each row is for " +
"training or for validation. False indicates training; true indicates validation.",
None, "TypeConverters.toString"),
("blockSize", "block size for stacking input data in matrices. Data is stacked within "
"partitions. If block size is more than remaining data in a partition then it is "
"adjusted to the size of this data.", "1024", "TypeConverters.toInt")]
None, "TypeConverters.toString")]
code = []
for name, doc, defaultValueStr, typeConverter in shared:

View file

@ -580,21 +580,3 @@ class HasValidationIndicatorCol(Params):
Gets the value of validationIndicatorCol or its default value.
"""
return self.getOrDefault(self.validationIndicatorCol)
class HasBlockSize(Params):
"""
Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.
"""
blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt)
def __init__(self):
super(HasBlockSize, self).__init__()
self._setDefault(blockSize=1024)
def getBlockSize(self):
"""
Gets the value of blockSize or its default value.
"""
return self.getOrDefault(self.blockSize)

View file

@ -28,7 +28,7 @@ __all__ = ['ALS', 'ALSModel']
@inherit_doc
class _ALSModelParams(HasPredictionCol, HasBlockSize):
class _ALSModelParams(HasPredictionCol):
"""
Params for :py:class:`ALS` and :py:class:`ALSModel`.
@ -223,8 +223,6 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
0.1
>>> als.clear(als.regParam)
>>> model = als.fit(df)
>>> model.getBlockSize()
4096
>>> model.getUserCol()
'user'
>>> model.setUserCol("user")
@ -284,13 +282,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
"""
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=false, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", lockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
"""
super(ALS, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
@ -298,8 +296,7 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan",
blockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
kwargs = self._input_kwargs
self.setParams(**kwargs)
@ -309,13 +306,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
"""
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=False, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
Sets params for ALS.
"""
kwargs = self._input_kwargs
@ -446,13 +443,6 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
"""
return self._set(seed=value)
@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)
class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
"""
@ -489,13 +479,6 @@ class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
"""
return self._set(predictionCol=value)
@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)
@property
@since("1.4.0")
def rank(self):

View file

@ -62,7 +62,7 @@ class JavaRegressionModel(JavaPredictionModel, _JavaPredictorParams):
class _LinearRegressionParams(_JavaPredictorParams, HasRegParam, HasElasticNetParam, HasMaxIter,
HasTol, HasFitIntercept, HasStandardization, HasWeightCol, HasSolver,
HasAggregationDepth, HasLoss, HasBlockSize):
HasAggregationDepth, HasLoss):
"""
Params for :py:class:`LinearRegression` and :py:class:`LinearRegressionModel`.
@ -124,8 +124,6 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J
>>> lr.setRegParam(0.0)
LinearRegression...
>>> model = lr.fit(df)
>>> model.getBlockSize()
1024
>>> model.setFeaturesCol("features")
LinearRegressionModel...
>>> model.setPredictionCol("newPrediction")
@ -171,18 +169,17 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
standardization=True, solver="auto", weightCol=None, aggregationDepth=2,
loss="squaredError", epsilon=1.35, blockSize=1024):
loss="squaredError", epsilon=1.35):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \
loss="squaredError", epsilon=1.35, blockSize=1024)
loss="squaredError", epsilon=1.35)
"""
super(LinearRegression, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.regression.LinearRegression", self.uid)
self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35,
blockSize=1024)
self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@ -191,12 +188,12 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J
def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
standardization=True, solver="auto", weightCol=None, aggregationDepth=2,
loss="squaredError", epsilon=1.35, blockSize=1024):
loss="squaredError", epsilon=1.35):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \
loss="squaredError", epsilon=1.35, blockSize=1024)
loss="squaredError", epsilon=1.35)
Sets params for linear regression.
"""
kwargs = self._input_kwargs
@ -272,13 +269,6 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J
"""
return self._set(lossType=value)
@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)
class LinearRegressionModel(JavaRegressionModel, _LinearRegressionParams, GeneralJavaMLWritable,
JavaMLReadable, HasTrainingSummary):