[WIP] Test (#30327)

* resend

* address comments

* directly gen new Iter

* directly gen new Iter

* update blockify strategy

* address comments

* try to fix 2.13

* try to fix scala 2.13

* use 1.0 as the default value for gemv

* update

Co-authored-by: zhengruifeng <ruifengz@foxmail.com>
This commit is contained in:
WeichenXu 2020-11-12 10:20:33 +08:00 committed by GitHub
parent 9d58a2f0f0
commit 61ee5d8a4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 225 additions and 87 deletions

View file

@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */ /** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold with HasBlockSize { with HasAggregationDepth with HasThreshold with HasBlockSizeInMB {
/** /**
* Param for threshold in binary classification prediction. * Param for threshold in binary classification prediction.
@ -57,7 +57,7 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR
"threshold in binary classification prediction applied to rawPrediction") "threshold in binary classification prediction applied to rawPrediction")
setDefault(regParam -> 0.0, maxIter -> 100, fitIntercept -> true, tol -> 1E-6, setDefault(regParam -> 0.0, maxIter -> 100, fitIntercept -> true, tol -> 1E-6,
standardization -> true, threshold -> 0.0, aggregationDepth -> 2, blockSize -> 1) standardization -> true, threshold -> 0.0, aggregationDepth -> 2, blockSizeInMB -> 0.0)
} }
/** /**
@ -153,22 +153,13 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
/** /**
* Set block size for stacking input data in matrices. * Sets the value of param [[blockSizeInMB]].
* If blockSize == 1, then stacking will be skipped, and each vector is treated individually; * Default is 0.0.
* If blockSize &gt; 1, then vectors will be stacked to blocks, and high-level BLAS routines
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
* performance regression.
* Default is 1.
* *
* @group expertSetParam * @group expertSetParam
*/ */
@Since("3.1.0") @Since("3.1.0")
def setBlockSize(value: Int): this.type = set(blockSize, value) def setBlockSizeInMB(value: Double): this.type = set(blockSizeInMB, value)
@Since("2.2.0") @Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
@ -177,19 +168,19 @@ class LinearSVC @Since("2.2.0") (
instr.logPipelineStage(this) instr.logPipelineStage(this)
instr.logDataset(dataset) instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth,
blockSizeInMB)
if (dataset.storageLevel != StorageLevel.NONE) {
instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " +
s"then cached during training. Be careful of double caching!")
}
val instances = extractInstances(dataset) val instances = extractInstances(dataset)
.setName("training instances") .setName("training instances")
if (dataset.storageLevel == StorageLevel.NONE && $(blockSize) == 1) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}
var requestedMetrics = Seq("mean", "std", "count")
if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros"
val (summarizer, labelSummarizer) = Summarizer val (summarizer, labelSummarizer) = Summarizer
.getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics) .getClassificationSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count"))
val histogram = labelSummarizer.histogram val histogram = labelSummarizer.histogram
val numInvalid = labelSummarizer.countInvalid val numInvalid = labelSummarizer.countInvalid
@ -199,14 +190,12 @@ class LinearSVC @Since("2.2.0") (
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
instr.logSumOfWeights(summarizer.weightSum) instr.logSumOfWeights(summarizer.weightSum)
if ($(blockSize) > 1) {
val scale = 1.0 / summarizer.count / numFeatures var actualBlockSizeInMB = $(blockSizeInMB)
val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum if (actualBlockSizeInMB == 0) {
instr.logNamedValue("sparsity", sparsity.toString) actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB
if (sparsity > 0.5) { require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0")
instr.logWarning(s"sparsity of input dataset is $sparsity, " + instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString)
s"which may hurt performance in high-level BLAS.")
}
} }
val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match {
@ -245,12 +234,8 @@ class LinearSVC @Since("2.2.0") (
Note that the intercept in scaled space and original space is the same; Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed. as a result, no scaling is needed.
*/ */
val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) { val (rawCoefficients, objectiveHistory) =
trainOnRows(instances, featuresStd, regularization, optimizer) trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer)
} else {
trainOnBlocks(instances, featuresStd, regularization, optimizer)
}
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
if (rawCoefficients == null) { if (rawCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed." val msg = s"${optimizer.getClass.getName} failed."
@ -284,35 +269,9 @@ class LinearSVC @Since("2.2.0") (
model.setSummary(Some(summary)) model.setSummary(Some(summary))
} }
private def trainOnRows( private def trainImpl(
instances: RDD[Instance],
featuresStd: Array[Double],
regularization: Option[L2Regularization],
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
val numFeatures = featuresStd.length
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc,
regularization, $(aggregationDepth))
val states = optimizer.iterations(new CachedDiffFunction(costFun),
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
bcFeaturesStd.destroy()
(if (state != null) state.x.toArray else null, arrayBuilder.result)
}
private def trainOnBlocks(
instances: RDD[Instance], instances: RDD[Instance],
actualBlockSizeInMB: Double,
featuresStd: Array[Double], featuresStd: Array[Double],
regularization: Option[L2Regularization], regularization: Option[L2Regularization],
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
@ -326,9 +285,11 @@ class LinearSVC @Since("2.2.0") (
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
} }
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong
val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage)
.persist(StorageLevel.MEMORY_AND_DISK) .persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training blocks (blockSize=${$(blockSize)})") .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")
val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_) val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, val costFun = new RDDLossFunction(blocks, getAggregatorFunc,

View file

@ -17,6 +17,8 @@
package org.apache.spark.ml.feature package org.apache.spark.ml.feature
import scala.collection.mutable
import org.apache.spark.ml.linalg._ import org.apache.spark.ml.linalg._
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
@ -100,6 +102,32 @@ private[spark] case class InstanceBlock(
private[spark] object InstanceBlock { private[spark] object InstanceBlock {
/**
* Suggested value for BlockSizeInMB in Level-2 routine cases.
* According to performance tests of BLAS routine (see SPARK-31714) and
* LinearSVC (see SPARK-32907), 1.0 MB should be an acceptable value for
* linear models using Level-2 routine (GEMV) to perform prediction and
* gradient computation.
*/
val DefaultBlockSizeInMB = 1.0
private def getBlockMemUsage(
numCols: Long,
numRows: Long,
nnz: Long,
allUnitWeight: Boolean): Long = {
val doubleBytes = java.lang.Double.BYTES
val arrayHeader = 12L
val denseSize = Matrices.getDenseSize(numCols, numRows)
val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
val matrixSize = math.min(denseSize, sparseSize)
if (allUnitWeight) {
matrixSize + doubleBytes * numRows + arrayHeader * 2
} else {
matrixSize + doubleBytes * numRows * 2 + arrayHeader * 2
}
}
def fromInstances(instances: Seq[Instance]): InstanceBlock = { def fromInstances(instances: Seq[Instance]): InstanceBlock = {
val labels = instances.map(_.label).toArray val labels = instances.map(_.label).toArray
val weights = if (instances.exists(_.weight != 1)) { val weights = if (instances.exists(_.weight != 1)) {
@ -114,6 +142,50 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
} }
def blokifyWithMaxMemUsage(
instanceIterator: Iterator[Instance],
maxMemUsage: Long): Iterator[InstanceBlock] = {
require(maxMemUsage > 0)
new Iterator[InstanceBlock]() {
private var numCols = -1L
override def hasNext: Boolean = instanceIterator.hasNext
override def next(): InstanceBlock = {
val buff = mutable.ArrayBuilder.make[Instance]
var buffCnt = 0L
var buffNnz = 0L
var buffUnitWeight = true
var blockMemUsage = 0L
while (instanceIterator.hasNext && blockMemUsage < maxMemUsage) {
val instance: Instance = instanceIterator.next()
if (numCols < 0L) numCols = instance.features.size
require(numCols == instance.features.size)
val nnz = instance.features.numNonzeros
buff += instance
buffCnt += 1L
buffNnz += nnz
buffUnitWeight &&= (instance.weight == 1)
blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight)
}
// the block mem usage may slightly exceed threshold, not a big issue.
// and this ensure even if one row exceed block limit, each block has one row
InstanceBlock.fromInstances(buff.result())
}
}
}
def blokifyWithMaxMemUsage(
instances: RDD[Instance],
maxMemUsage: Long): RDD[InstanceBlock] = {
require(maxMemUsage > 0)
instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
}
} }

View file

@ -108,7 +108,12 @@ private[shared] object SharedParamsCodeGen {
ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
"stacked within partitions. If block size is more than remaining data in a partition " + "stacked within partitions. If block size is more than remaining data in a partition " +
"then it is adjusted to the size of this data.", "then it is adjusted to the size of this data.",
isValid = "ParamValidators.gt(0)", isExpertParam = true) isValid = "ParamValidators.gt(0)", isExpertParam = true),
ParamDesc[Double]("blockSizeInMB", "Maximum memory in MB for stacking input data " +
"in blocks. Data is stacked within partitions. If more than remaining data size in a " +
"partition then it is adjusted to the data size. If 0, try to infer an appropriate value " +
"based on the statistics of dataset. Must be >= 0.",
Some("0.0"), isValid = "ParamValidators.gtEq(0.0)", isExpertParam = true)
) )
val code = genSharedParams(params) val code = genSharedParams(params)

View file

@ -562,4 +562,22 @@ trait HasBlockSize extends Params {
/** @group expertGetParam */ /** @group expertGetParam */
final def getBlockSize: Int = $(blockSize) final def getBlockSize: Int = $(blockSize)
} }
/**
* Trait for shared param blockSizeInMB (default: 0.0). This trait may be changed or
* removed between minor versions.
*/
trait HasBlockSizeInMB extends Params {
/**
* Param for Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be &gt;= 0..
* @group expertParam
*/
final val blockSizeInMB: DoubleParam = new DoubleParam(this, "blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0))
setDefault(blockSizeInMB, 0.0)
/** @group expertGetParam */
final def getBlockSizeInMB: Double = $(blockSizeInMB)
}
// scalastyle:on // scalastyle:on

View file

@ -214,8 +214,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
.setFitIntercept(fitIntercept) .setFitIntercept(fitIntercept)
.setMaxIter(5) .setMaxIter(5)
val model = lsvc.fit(dataset) val model = lsvc.fit(dataset)
Seq(4, 16, 64).foreach { blockSize => Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s =>
val model2 = lsvc.setBlockSize(blockSize).fit(dataset) val model2 = lsvc.setBlockSizeInMB(s).fit(dataset)
assert(model.intercept ~== model2.intercept relTol 1e-9) assert(model.intercept ~== model2.intercept relTol 1e-9)
assert(model.coefficients ~== model2.coefficients relTol 1e-9) assert(model.coefficients ~== model2.coefficients relTol 1e-9)
} }

View file

@ -74,4 +74,58 @@ class InstanceSuite extends SparkFunSuite{
} }
} }
test("InstanceBlock: blokify with max memory usage") {
val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
val instances = Seq(instance1, instance2)
val blocks = InstanceBlock
.blokifyWithMaxMemUsage(Iterator.apply(instance1, instance2), 128).toArray
require(blocks.length == 1)
val block = blocks.head
assert(block.size === 2)
assert(block.numFeatures === 2)
block.instanceIterator.zipWithIndex.foreach {
case (instance, i) =>
assert(instance.label === instances(i).label)
assert(instance.weight === instances(i).weight)
assert(instance.features.toArray === instances(i).features.toArray)
}
Seq(0, 1).foreach { i =>
val nzIter = block.getNonZeroIter(i)
val vec = Vectors.sparse(2, nzIter.toSeq)
assert(vec.toArray === instances(i).features.toArray)
}
// instances larger than maxMemUsage
val denseInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(1000)(1.0)))
InstanceBlock.blokifyWithMaxMemUsage(Iterator.single(denseInstance), 64).size
InstanceBlock.blokifyWithMaxMemUsage(Iterator.fill(10)(denseInstance), 64).size
// different numFeatures
intercept[IllegalArgumentException] {
InstanceBlock.blokifyWithMaxMemUsage(Iterator.apply(instance1, denseInstance), 64).size
}
// nnz = 10
val sparseInstance = Instance(-2.0, 3.0,
Vectors.sparse(1000, Array.range(0, 1000, 100), Array.fill(10)(0.1)))
// normally, memory usage of a block does not exceed maxMemUsage too much
val maxMemUsage = 1 << 18
val mixedIter = Iterator.fill(100)(denseInstance) ++
Iterator.fill(1000)(sparseInstance) ++
Iterator.fill(10)(denseInstance) ++
Iterator.fill(10)(sparseInstance) ++
Iterator.fill(100)(denseInstance) ++
Iterator.fill(100)(sparseInstance)
InstanceBlock.blokifyWithMaxMemUsage(mixedIter, maxMemUsage)
.foreach { block =>
val doubleBytes = java.lang.Double.BYTES
val arrayHeader = 12L
val blockMemUsage = block.matrix.getSizeInBytes +
(block.labels.length + block.weights.length) * doubleBytes + arrayHeader * 2
require(blockMemUsage < maxMemUsage * 1.05)
}
}
} }

View file

@ -26,8 +26,8 @@ from pyspark import keyword_only, since, SparkContext
from pyspark.ml import Estimator, Predictor, PredictionModel, Model from pyspark.ml import Estimator, Predictor, PredictionModel, Model
from pyspark.ml.param.shared import HasRawPredictionCol, HasProbabilityCol, HasThresholds, \ from pyspark.ml.param.shared import HasRawPredictionCol, HasProbabilityCol, HasThresholds, \
HasRegParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, \ HasRegParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, \
HasAggregationDepth, HasThreshold, HasBlockSize, Param, Params, TypeConverters, \ HasAggregationDepth, HasThreshold, HasBlockSize, HasBlockSizeInMB, Param, Params, \
HasElasticNetParam, HasSeed, HasStepSize, HasSolver, HasParallelism TypeConverters, HasElasticNetParam, HasSeed, HasStepSize, HasSolver, HasParallelism
from pyspark.ml.tree import _DecisionTreeModel, _DecisionTreeParams, \ from pyspark.ml.tree import _DecisionTreeModel, _DecisionTreeParams, \
_TreeEnsembleModel, _RandomForestParams, _GBTParams, \ _TreeEnsembleModel, _RandomForestParams, _GBTParams, \
_HasVarianceImpurity, _TreeClassifierParams _HasVarianceImpurity, _TreeClassifierParams
@ -504,7 +504,7 @@ class _BinaryClassificationSummary(_ClassificationSummary):
class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol,
HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold,
HasBlockSize): HasBlockSizeInMB):
""" """
Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`.
@ -521,7 +521,7 @@ class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitInterce
super(_LinearSVCParams, self).__init__(*args) super(_LinearSVCParams, self).__init__(*args)
self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True,
standardization=True, threshold=0.0, aggregationDepth=2, standardization=True, threshold=0.0, aggregationDepth=2,
blockSize=1) blockSizeInMB=0.0)
@inherit_doc @inherit_doc
@ -565,8 +565,8 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
LinearSVCModel... LinearSVCModel...
>>> model.getThreshold() >>> model.getThreshold()
0.5 0.5
>>> model.getBlockSize() >>> model.getBlockSizeInMB()
1 0.0
>>> model.coefficients >>> model.coefficients
DenseVector([0.0, -0.2792, -0.1833]) DenseVector([0.0, -0.2792, -0.1833])
>>> model.intercept >>> model.intercept
@ -605,12 +605,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
aggregationDepth=2, blockSize=1): aggregationDepth=2, blockSizeInMB=0.0):
""" """
__init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ __init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
aggregationDepth=2, blockSize=1): aggregationDepth=2, blockSizeInMB=0.0):
""" """
super(LinearSVC, self).__init__() super(LinearSVC, self).__init__()
self._java_obj = self._new_java_obj( self._java_obj = self._new_java_obj(
@ -623,12 +623,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
def setParams(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", def setParams(self, *, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
aggregationDepth=2, blockSize=1): aggregationDepth=2, blockSizeInMB=0.0):
""" """
setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
aggregationDepth=2, blockSize=1): aggregationDepth=2, blockSizeInMB=0.0):
Sets params for Linear SVM Classifier. Sets params for Linear SVM Classifier.
""" """
kwargs = self._input_kwargs kwargs = self._input_kwargs
@ -694,11 +694,11 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
return self._set(aggregationDepth=value) return self._set(aggregationDepth=value)
@since("3.1.0") @since("3.1.0")
def setBlockSize(self, value): def setBlockSizeInMB(self, value):
""" """
Sets the value of :py:attr:`blockSize`. Sets the value of :py:attr:`blockSizeInMB`.
""" """
return self._set(blockSize=value) return self._set(blockSizeInMB=value)
class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable, class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable,

View file

@ -26,6 +26,7 @@ from pyspark.ml.base import _PredictorParams
from pyspark.ml.param.shared import ( from pyspark.ml.param.shared import (
HasAggregationDepth, HasAggregationDepth,
HasBlockSize, HasBlockSize,
HasBlockSizeInMB,
HasElasticNetParam, HasElasticNetParam,
HasFitIntercept, HasFitIntercept,
HasMaxIter, HasMaxIter,
@ -172,7 +173,7 @@ class _LinearSVCParams(
HasWeightCol, HasWeightCol,
HasAggregationDepth, HasAggregationDepth,
HasThreshold, HasThreshold,
HasBlockSize, HasBlockSizeInMB,
): ):
threshold: Param[float] threshold: Param[float]
def __init__(self, *args: Any) -> None: ... def __init__(self, *args: Any) -> None: ...
@ -198,7 +199,7 @@ class LinearSVC(
threshold: float = ..., threshold: float = ...,
weightCol: Optional[str] = ..., weightCol: Optional[str] = ...,
aggregationDepth: int = ..., aggregationDepth: int = ...,
blockSize: int = ... blockSizeInMB: float = ...
) -> None: ... ) -> None: ...
def setParams( def setParams(
self, self,
@ -215,7 +216,7 @@ class LinearSVC(
threshold: float = ..., threshold: float = ...,
weightCol: Optional[str] = ..., weightCol: Optional[str] = ...,
aggregationDepth: int = ..., aggregationDepth: int = ...,
blockSize: int = ... blockSizeInMB: float = ...
) -> LinearSVC: ... ) -> LinearSVC: ...
def setMaxIter(self, value: int) -> LinearSVC: ... def setMaxIter(self, value: int) -> LinearSVC: ...
def setRegParam(self, value: float) -> LinearSVC: ... def setRegParam(self, value: float) -> LinearSVC: ...
@ -225,7 +226,7 @@ class LinearSVC(
def setThreshold(self, value: float) -> LinearSVC: ... def setThreshold(self, value: float) -> LinearSVC: ...
def setWeightCol(self, value: str) -> LinearSVC: ... def setWeightCol(self, value: str) -> LinearSVC: ...
def setAggregationDepth(self, value: int) -> LinearSVC: ... def setAggregationDepth(self, value: int) -> LinearSVC: ...
def setBlockSize(self, value: int) -> LinearSVC: ... def setBlockSizeInMB(self, value: float) -> LinearSVC: ...
class LinearSVCModel( class LinearSVCModel(
_JavaClassificationModel[Vector], _JavaClassificationModel[Vector],

View file

@ -165,7 +165,11 @@ if __name__ == "__main__":
None, "TypeConverters.toString"), None, "TypeConverters.toString"),
("blockSize", "block size for stacking input data in matrices. Data is stacked within " ("blockSize", "block size for stacking input data in matrices. Data is stacked within "
"partitions. If block size is more than remaining data in a partition then it is " "partitions. If block size is more than remaining data in a partition then it is "
"adjusted to the size of this data.", None, "TypeConverters.toInt")] "adjusted to the size of this data.", None, "TypeConverters.toInt"),
("blockSizeInMB", "maximum memory in MB for stacking input data in blocks. Data is " +
"stacked within partitions. If more than remaining data size in a partition then it " +
"is adjusted to the data size. If 0, try to infer an appropriate value based on the " +
"statistics of dataset. Must be >= 0.", "0.0", "TypeConverters.toFloat")]
code = [] code = []
for name, doc, defaultValueStr, typeConverter in shared: for name, doc, defaultValueStr, typeConverter in shared:

View file

@ -597,3 +597,21 @@ class HasBlockSize(Params):
Gets the value of blockSize or its default value. Gets the value of blockSize or its default value.
""" """
return self.getOrDefault(self.blockSize) return self.getOrDefault(self.blockSize)
class HasBlockSizeInMB(Params):
"""
Mixin for param blockSizeInMB: maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.
"""
blockSizeInMB = Param(Params._dummy(), "blockSizeInMB", "maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.", typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasBlockSizeInMB, self).__init__()
self._setDefault(blockSizeInMB=0.0)
def getBlockSizeInMB(self):
"""
Gets the value of blockSizeInMB or its default value.
"""
return self.getOrDefault(self.blockSizeInMB)

View file

@ -185,3 +185,8 @@ class HasBlockSize(Params):
blockSize: Param[int] blockSize: Param[int]
def __init__(self) -> None: ... def __init__(self) -> None: ...
def getBlockSize(self) -> int: ... def getBlockSize(self) -> int: ...
class HasBlockSizeInMB(Params):
blockSizeInMB: Param[float]
def __init__(self) -> None: ...
def getBlockSizeInMB(self) -> float: ...