[SPARK-17090][ML] Make tree aggregation level in linear/logistic regression configurable

## What changes were proposed in this pull request?

Linear/logistic regression use treeAggregate with default depth (always = 2) for collecting coefficient gradient updates to the driver. For high dimensional problems, this can cause OOM error on the driver. This patch makes it configurable to avoid this problem if users' input data has many features. It adds a HasTreeDepth API in `sharedParams.scala`, and extends it to both Linear regression and logistic regression in .ml

Author: hqzizania <hqzizania@gmail.com>

Closes #14717 from hqzizania/SPARK-17090.
This commit is contained in:
hqzizania 2016-08-20 18:52:44 -07:00 committed by DB Tsai
parent 9f37d4eac2
commit 61ef74f227
5 changed files with 74 additions and 17 deletions

View file

@ -48,7 +48,7 @@ import org.apache.spark.storage.StorageLevel
*/
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
with HasStandardization with HasWeightCol with HasThreshold {
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {
/**
* Set threshold in binary classification, in range [0, 1].
@ -256,6 +256,17 @@ class LogisticRegression @Since("1.2.0") (
@Since("1.5.0")
override def getThresholds: Array[Double] = super.getThresholds
/**
* Suggested depth for treeAggregate (>= 2).
* If the dimensions of features or the number of partitions are large,
* this param could be adjusted to a larger size.
* Default is 2.
* @group expertSetParam
*/
@Since("2.1.0")
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)
private var optInitialModel: Option[LogisticRegressionModel] = None
/** @group setParam */
@ -294,7 +305,8 @@ class LogisticRegression @Since("1.2.0") (
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.treeAggregate(
new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp)
new MultivariateOnlineSummarizer, new MultiClassSummarizer
)(seqOp, combOp, $(aggregationDepth))
}
val histogram = labelSummarizer.histogram
@ -358,7 +370,7 @@ class LogisticRegression @Since("1.2.0") (
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
$(standardization), bcFeaturesStd, regParamL2, multinomial = false)
$(standardization), bcFeaturesStd, regParamL2, multinomial = false, $(aggregationDepth))
val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@ -1331,8 +1343,8 @@ private class LogisticCostFun(
standardization: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
regParamL2: Double,
multinomial: Boolean) extends DiffFunction[BDV[Double]] {
multinomial: Boolean,
aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
@ -1347,7 +1359,7 @@ private class LogisticCostFun(
instances.treeAggregate(
new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept,
multinomial)
)(seqOp, combOp)
)(seqOp, combOp, aggregationDepth)
}
val totalGradientArray = logisticAggregator.gradient.toArray

View file

@ -44,7 +44,8 @@ import org.apache.spark.storage.StorageLevel
*/
private[classification] trait MultinomialLogisticRegressionParams
extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter
with HasFitIntercept with HasTol with HasStandardization with HasWeightCol {
with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth {
/**
* Set thresholds in multiclass (or binary) classification to adjust the probability of
@ -163,6 +164,17 @@ class MultinomialLogisticRegression @Since("2.1.0") (
@Since("2.1.0")
override def setThresholds(value: Array[Double]): this.type = super.setThresholds(value)
/**
* Suggested depth for treeAggregate (>= 2).
* If the dimensions of features or the number of partitions are large,
* this param could be adjusted to a larger size.
* Default is 2.
* @group expertSetParam
*/
@Since("2.1.0")
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)
override protected[spark] def train(dataset: Dataset[_]): MultinomialLogisticRegressionModel = {
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
val instances: RDD[Instance] =
@ -245,7 +257,7 @@ class MultinomialLogisticRegression @Since("2.1.0") (
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
$(standardization), bcFeaturesStd, regParamL2, multinomial = true)
$(standardization), bcFeaturesStd, regParamL2, multinomial = true, $(aggregationDepth))
val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))

View file

@ -78,7 +78,9 @@ private[shared] object SharedParamsCodeGen {
ParamDesc[String]("weightCol", "weight column name. If this is not set or empty, we treat " +
"all instance weights as 1.0"),
ParamDesc[String]("solver", "the solver algorithm for optimization. If this is not set or " +
"empty, default value is 'auto'", Some("\"auto\"")))
"empty, default value is 'auto'", Some("\"auto\"")),
ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"),
isValid = "ParamValidators.gtEq(2)"))
val code = genSharedParams(params)
val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala"

View file

@ -334,10 +334,10 @@ private[ml] trait HasElasticNetParam extends Params {
private[ml] trait HasTol extends Params {
/**
* Param for the convergence tolerance for iterative algorithms.
* Param for the convergence tolerance for iterative algorithms (>= 0).
* @group param
*/
final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms")
final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms (>= 0)", ParamValidators.gtEq(0))
/** @group getParam */
final def getTol: Double = $(tol)
@ -349,10 +349,10 @@ private[ml] trait HasTol extends Params {
private[ml] trait HasStepSize extends Params {
/**
* Param for Step size to be used for each iteration of optimization.
* Param for Step size to be used for each iteration of optimization (> 0).
* @group param
*/
final val stepSize: DoubleParam = new DoubleParam(this, "stepSize", "Step size to be used for each iteration of optimization")
final val stepSize: DoubleParam = new DoubleParam(this, "stepSize", "Step size to be used for each iteration of optimization (> 0)", ParamValidators.gt(0))
/** @group getParam */
final def getStepSize: Double = $(stepSize)
@ -389,4 +389,21 @@ private[ml] trait HasSolver extends Params {
/** @group getParam */
final def getSolver: String = $(solver)
}
/**
* Trait for shared param aggregationDepth (default: 2).
*/
private[ml] trait HasAggregationDepth extends Params {
/**
* Param for suggested depth for treeAggregate (>= 2).
* @group param
*/
final val aggregationDepth: IntParam = new IntParam(this, "aggregationDepth", "suggested depth for treeAggregate (>= 2)", ParamValidators.gtEq(2))
setDefault(aggregationDepth, 2)
/** @group getParam */
final def getAggregationDepth: Int = $(aggregationDepth)
}
// scalastyle:on

View file

@ -53,6 +53,7 @@ import org.apache.spark.storage.StorageLevel
private[regression] trait LinearRegressionParams extends PredictorParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver
with HasAggregationDepth
/**
* Linear regression.
@ -172,6 +173,17 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
def setSolver(value: String): this.type = set(solver, value)
setDefault(solver -> "auto")
/**
* Suggested depth for treeAggregate (>= 2).
* If the dimensions of features or the number of partitions are large,
* this param could be adjusted to a larger size.
* Default is 2.
* @group expertSetParam
*/
@Since("2.1.0")
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)
override protected def train(dataset: Dataset[_]): LinearRegressionModel = {
// Extract the number of features before deciding optimization solver.
val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
@ -230,7 +242,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.treeAggregate(
new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer)(seqOp, combOp)
new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer
)(seqOp, combOp, $(aggregationDepth))
}
val yMean = ySummarizer.mean(0)
@ -296,7 +309,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
$(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam)
$(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam, $(aggregationDepth))
val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@ -1016,7 +1029,8 @@ private class LeastSquaresCostFun(
standardization: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
bcFeaturesMean: Broadcast[Array[Double]],
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
effectiveL2regParam: Double,
aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
@ -1029,7 +1043,7 @@ private class LeastSquaresCostFun(
instances.treeAggregate(
new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd,
bcFeaturesMean))(seqOp, combOp)
bcFeaturesMean))(seqOp, combOp, aggregationDepth)
}
val totalGradientArray = leastSquaresAggregator.gradient.toArray