[SPARK-32907][ML] adaptively blockify instances - revert blockify gmm

### What changes were proposed in this pull request?
revert blockify gmm

### Why are the changes needed?
WeichenXu123  and I thought we should use memory size instead of number of rows to blockify instance; then if a buffer's size is large and determined by number of rows, we should discard it.
In GMM, we found that the pre-allocated memory maybe too large and should be discarded:
```
transient private lazy val auxiliaryPDFMat = DenseMatrix.zeros(blockSize, numFeatures)
```
We had some offline discuss and thought it is better to revert blockify GMM.

### Does this PR introduce _any_ user-facing change?
blockSize added in master branch will be removed

### How was this patch tested?
existing testsuites

Closes #29782 from zhengruifeng/unblockify_gmm.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
This commit is contained in:
zhengruifeng 2020-09-23 15:54:56 +08:00
parent 21b7479797
commit 432afac07e
5 changed files with 20 additions and 294 deletions

View file

@ -55,7 +55,7 @@ class MultivariateGaussian @Since("2.0.0") (
*/
@transient private lazy val tuple = {
val (rootSigmaInv, u) = calculateCovarianceConstants
val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv).toDense
val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv)
val rootSigmaInvMulMu = rootSigmaInvMat.multiply(mean)
(rootSigmaInvMat, u, rootSigmaInvMulMu)
}
@ -81,36 +81,6 @@ class MultivariateGaussian @Since("2.0.0") (
u - 0.5 * BLAS.dot(v, v)
}
private[ml] def pdf(X: Matrix): DenseVector = {
val mat = DenseMatrix.zeros(X.numRows, X.numCols)
pdf(X, mat)
}
private[ml] def pdf(X: Matrix, mat: DenseMatrix): DenseVector = {
require(!mat.isTransposed)
BLAS.gemm(1.0, X, rootSigmaInvMat.transpose, 0.0, mat)
val m = mat.numRows
val n = mat.numCols
val pdfVec = mat.multiply(rootSigmaInvMulMu)
val blas = BLAS.getBLAS(n)
val squared1 = blas.ddot(n, rootSigmaInvMulMu.values, 1, rootSigmaInvMulMu.values, 1)
val localU = u
var i = 0
while (i < m) {
val squared2 = blas.ddot(n, mat.values, i, m, mat.values, i, m)
val dot = pdfVec(i)
val squaredSum = squared1 + squared2 - dot - dot
pdfVec.values(i) = math.exp(localU - 0.5 * squaredSum)
i += 1
}
pdfVec
}
/**
* Calculate distribution dependent components used for the density function:
* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * inv(sigma) * (x-mu))

View file

@ -27,7 +27,6 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
test("univariate") {
val x1 = Vectors.dense(0.0)
val x2 = Vectors.dense(1.5)
val mat = Matrices.fromVectors(Seq(x1, x2))
val mu = Vectors.dense(0.0)
val sigma1 = Matrices.dense(1, 1, Array(1.0))
@ -36,7 +35,6 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
assert(dist1.logpdf(x2) ~== -2.0439385332046727 absTol 1E-5)
assert(dist1.pdf(x1) ~== 0.39894 absTol 1E-5)
assert(dist1.pdf(x2) ~== 0.12952 absTol 1E-5)
assert(dist1.pdf(mat) ~== Vectors.dense(0.39894, 0.12952) absTol 1E-5)
val sigma2 = Matrices.dense(1, 1, Array(4.0))
val dist2 = new MultivariateGaussian(mu, sigma2)
@ -44,13 +42,11 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
assert(dist2.logpdf(x2) ~== -1.893335713764618 absTol 1E-5)
assert(dist2.pdf(x1) ~== 0.19947 absTol 1E-5)
assert(dist2.pdf(x2) ~== 0.15057 absTol 1E-5)
assert(dist2.pdf(mat) ~== Vectors.dense(0.19947, 0.15057) absTol 1E-5)
}
test("multivariate") {
val x1 = Vectors.dense(0.0, 0.0)
val x2 = Vectors.dense(1.0, 1.0)
val mat = Matrices.fromVectors(Seq(x1, x2))
val mu = Vectors.dense(0.0, 0.0)
val sigma1 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0))
@ -59,7 +55,6 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
assert(dist1.logpdf(x2) ~== -2.8378770664093453 absTol 1E-5)
assert(dist1.pdf(x1) ~== 0.15915 absTol 1E-5)
assert(dist1.pdf(x2) ~== 0.05855 absTol 1E-5)
assert(dist1.pdf(mat) ~== Vectors.dense(0.15915, 0.05855) absTol 1E-5)
val sigma2 = Matrices.dense(2, 2, Array(4.0, -1.0, -1.0, 2.0))
val dist2 = new MultivariateGaussian(mu, sigma2)
@ -67,25 +62,21 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
assert(dist2.logpdf(x2) ~== -3.3822607123655732 absTol 1E-5)
assert(dist2.pdf(x1) ~== 0.060155 absTol 1E-5)
assert(dist2.pdf(x2) ~== 0.033971 absTol 1E-5)
assert(dist2.pdf(mat) ~== Vectors.dense(0.060155, 0.033971) absTol 1E-5)
}
test("multivariate degenerate") {
val x1 = Vectors.dense(0.0, 0.0)
val x2 = Vectors.dense(1.0, 1.0)
val mat = Matrices.fromVectors(Seq(x1, x2))
val mu = Vectors.dense(0.0, 0.0)
val sigma = Matrices.dense(2, 2, Array(1.0, 1.0, 1.0, 1.0))
val dist = new MultivariateGaussian(mu, sigma)
assert(dist.pdf(x1) ~== 0.11254 absTol 1E-5)
assert(dist.pdf(x2) ~== 0.068259 absTol 1E-5)
assert(dist.pdf(mat) ~== Vectors.dense(0.11254, 0.068259) absTol 1E-5)
}
test("SPARK-11302") {
val x = Vectors.dense(629, 640, 1.7188, 618.19)
val mat = Matrices.fromVectors(Seq(x))
val mu = Vectors.dense(
1055.3910505836575, 1070.489299610895, 1.39020554474708, 1040.5907503867697)
val sigma = Matrices.dense(4, 4, Array(
@ -96,6 +87,5 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
val dist = new MultivariateGaussian(mu, sigma)
// Agrees with R's dmvnorm: 7.154782e-05
assert(dist.pdf(x) ~== 7.154782224045512E-5 absTol 1E-9)
assert(dist.pdf(mat) ~== Vectors.dense(7.154782224045512E-5) absTol 1E-5)
}
}

View file

@ -44,7 +44,7 @@ import org.apache.spark.storage.StorageLevel
*/
private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter with HasFeaturesCol
with HasSeed with HasPredictionCol with HasWeightCol with HasProbabilityCol with HasTol
with HasAggregationDepth with HasBlockSize {
with HasAggregationDepth {
/**
* Number of independent Gaussians in the mixture model. Must be greater than 1. Default: 2.
@ -59,7 +59,7 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w
@Since("2.0.0")
def getK: Int = $(k)
setDefault(k -> 2, maxIter -> 100, tol -> 0.01, blockSize -> 1)
setDefault(k -> 2, maxIter -> 100, tol -> 0.01)
/**
* Validates and transforms the input schema.
@ -170,8 +170,7 @@ class GaussianMixtureModel private[ml] (
@Since("3.0.0")
def predictProbability(features: Vector): Vector = {
val probs = GaussianMixtureModel
.computeProbabilities(features, gaussians, weights)
val probs = GaussianMixtureModel.computeProbabilities(features, gaussians, weights)
Vectors.dense(probs)
}
@ -263,10 +262,8 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] {
require(mus.length == sigmas.length, "Length of Mu and Sigma array must match")
require(mus.length == weights.length, "Length of weight and Gaussian array must match")
val gaussians = mus.zip(sigmas).map {
case (mu, sigma) =>
new MultivariateGaussian(mu.asML, sigma.asML)
}
val gaussians = mus.zip(sigmas)
.map { case (mu, sigma) => new MultivariateGaussian(mu.asML, sigma.asML) }
val model = new GaussianMixtureModel(metadata.uid, weights, gaussians)
metadata.getAndSetParams(model)
@ -372,24 +369,6 @@ class GaussianMixture @Since("2.0.0") (
@Since("3.0.0")
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
/**
* Set block size for stacking input data in matrices.
* If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
* If blockSize &gt; 1, then vectors will be stacked to blocks, and high-level BLAS routines
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
* performance regression.
* Default is 1.
*
* @group expertSetParam
*/
@Since("3.1.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
/**
* Number of samples per cluster to use when initializing Gaussians.
*/
@ -410,7 +389,7 @@ class GaussianMixture @Since("2.0.0") (
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, featuresCol, predictionCol, probabilityCol, weightCol, k, maxIter,
seed, tol, aggregationDepth, blockSize)
seed, tol, aggregationDepth)
instr.logNumFeatures(numFeatures)
val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
@ -423,25 +402,12 @@ class GaussianMixture @Since("2.0.0") (
.as[(Vector, Double)].rdd
.setName("training instances")
if ($(blockSize) == 1 && dataset.storageLevel == StorageLevel.NONE) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) }
// TODO: SPARK-15785 Support users supplied initial GMM.
val (weights, gaussians) = initRandom(instances, $(k), numFeatures)
val (logLikelihood, iteration) = if ($(blockSize) == 1) {
trainOnRows(instances, weights, gaussians, numFeatures, instr)
} else {
val sparsity = 1 - instances.map { case (v, _) => v.numNonzeros.toDouble / v.size }.mean()
instr.logNamedValue("sparsity", sparsity.toString)
if (sparsity > 0.5) {
logWarning(s"sparsity of input dataset is $sparsity, " +
s"which may hurt performance in high-level BLAS.")
}
trainOnBlocks(instances, weights, gaussians, numFeatures, instr)
}
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
val (logLikelihood, iteration) = trainImpl(instances, weights, gaussians, numFeatures, instr)
if (handlePersistence) { instances.unpersist() }
val gaussianDists = gaussians.map { case (mean, covVec) =>
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
@ -457,7 +423,7 @@ class GaussianMixture @Since("2.0.0") (
model.setSummary(Some(summary))
}
private def trainOnRows(
private def trainImpl(
instances: RDD[(Vector, Double)],
weights: Array[Double],
gaussians: Array[(DenseVector, DenseVector)],
@ -514,70 +480,6 @@ class GaussianMixture @Since("2.0.0") (
(logLikelihood, iteration)
}
private def trainOnBlocks(
instances: RDD[(Vector, Double)],
weights: Array[Double],
gaussians: Array[(DenseVector, DenseVector)],
numFeatures: Int,
instr: Instrumentation): (Double, Int) = {
val blocks = instances.mapPartitions { iter =>
iter.grouped($(blockSize))
.map { seq => (Matrices.fromVectors(seq.map(_._1)), seq.map(_._2).toArray) }
}.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")
val sc = instances.sparkContext
var logLikelihood = Double.MinValue
var logLikelihoodPrev = 0.0
var iteration = 0
while (iteration < $(maxIter) && math.abs(logLikelihood - logLikelihoodPrev) > $(tol)) {
val weightSumAccum = if (iteration == 0) sc.doubleAccumulator else null
val logLikelihoodAccum = sc.doubleAccumulator
val bcWeights = sc.broadcast(weights)
val bcGaussians = sc.broadcast(gaussians)
// aggregate the cluster contribution for all sample points,
// and then compute the new distributions
blocks.mapPartitions { iter =>
if (iter.nonEmpty) {
val agg = new BlockExpectationAggregator(numFeatures,
$(blockSize), bcWeights, bcGaussians)
while (iter.hasNext) { agg.add(iter.next) }
// sum of weights in this partition
val ws = agg.weights.sum
if (iteration == 0) weightSumAccum.add(ws)
logLikelihoodAccum.add(agg.logLikelihood)
agg.meanIter.zip(agg.covIter).zipWithIndex
.map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), ws)) }
} else Iterator.empty
}.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) =>
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
}.collect().foreach { case (i, (weight, gaussian)) =>
weights(i) = weight
gaussians(i) = gaussian
}
bcWeights.destroy()
bcGaussians.destroy()
if (iteration == 0) {
instr.logNumExamples(weightSumAccum.count)
instr.logSumOfWeights(weightSumAccum.value)
}
logLikelihoodPrev = logLikelihood // current becomes previous
logLikelihood = logLikelihoodAccum.value // this is the freshly computed log-likelihood
instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood)
iteration += 1
}
blocks.unpersist()
(logLikelihood, iteration)
}
@Since("2.0.0")
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
@ -792,121 +694,6 @@ private class ExpectationAggregator(
}
/**
* BlockExpectationAggregator computes the partial expectation results.
*
* @param numFeatures The number of features.
* @param bcWeights The broadcast weights for each Gaussian distribution in the mixture.
* @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) Distribution
* in the mixture. Note only upper triangular part of the covariance
* matrix of each distribution is stored as dense vector (column major)
* in order to reduce shuffled data size.
*/
private class BlockExpectationAggregator(
numFeatures: Int,
blockSize: Int,
bcWeights: Broadcast[Array[Double]],
bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable {
private val k = bcWeights.value.length
private var totalCnt = 0L
private var newLogLikelihood = 0.0
private val covSize = numFeatures * (numFeatures + 1) / 2
private lazy val newWeights = Array.ofDim[Double](k)
@transient private lazy val newMeansMat = DenseMatrix.zeros(numFeatures, k)
@transient private lazy val newCovsMat = DenseMatrix.zeros(covSize, k)
@transient private lazy val auxiliaryProbMat = DenseMatrix.zeros(blockSize, k)
@transient private lazy val auxiliaryPDFMat = DenseMatrix.zeros(blockSize, numFeatures)
@transient private lazy val auxiliaryCovVec = Vectors.zeros(covSize).toDense
@transient private lazy val gaussians = {
bcGaussians.value.map { case (mean, covVec) =>
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
new MultivariateGaussian(mean, cov)
}
}
def count: Long = totalCnt
def logLikelihood: Double = newLogLikelihood
def weights: Array[Double] = newWeights
def meanIter: Iterator[DenseVector] = newMeansMat.colIter.map(_.toDense)
def covIter: Iterator[DenseVector] = newCovsMat.colIter.map(_.toDense)
/**
* Add a new training instance block to this BlockExpectationAggregator, update the weights,
* means and covariances for each distributions, and update the log likelihood.
*
* @param block The instance block of data point to be added.
* @return This BlockExpectationAggregator object.
*/
def add(block: (Matrix, Array[Double])): this.type = {
val (matrix: Matrix, weights: Array[Double]) = block
require(matrix.isTransposed)
val size = matrix.numRows
require(weights.length == size)
val blas1 = BLAS.getBLAS(size)
val blas2 = BLAS.getBLAS(k)
val probMat = if (blockSize == size) auxiliaryProbMat else DenseMatrix.zeros(size, k)
require(!probMat.isTransposed)
java.util.Arrays.fill(probMat.values, EPSILON)
val pdfMat = if (blockSize == size) auxiliaryPDFMat else DenseMatrix.zeros(size, numFeatures)
var j = 0
while (j < k) {
val pdfVec = gaussians(j).pdf(matrix, pdfMat)
blas1.daxpy(size, bcWeights.value(j), pdfVec.values, 0, 1, probMat.values, j * size, 1)
j += 1
}
var i = 0
while (i < size) {
val weight = weights(i)
val probSum = blas2.dasum(k, probMat.values, i, size)
blas2.dscal(k, weight / probSum, probMat.values, i, size)
blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1)
newLogLikelihood += math.log(probSum) * weight
i += 1
}
BLAS.gemm(1.0, matrix.transpose, probMat, 1.0, newMeansMat)
// compute the cov vector for each row vector
val covVec = auxiliaryCovVec
val covVecIter = matrix match {
case dm: DenseMatrix =>
Iterator.tabulate(size) { i =>
java.util.Arrays.fill(covVec.values, 0.0)
// when input block is dense, directly use nativeBLAS to avoid array copy
BLAS.nativeBLAS.dspr("U", numFeatures, 1.0, dm.values, i * numFeatures, 1,
covVec.values, 0)
covVec
}
case sm: SparseMatrix =>
sm.rowIter.map { vec =>
java.util.Arrays.fill(covVec.values, 0.0)
BLAS.spr(1.0, vec, covVec)
covVec
}
}
covVecIter.zipWithIndex.foreach { case (covVec, i) =>
BLAS.nativeBLAS.dger(covSize, k, 1.0, covVec.values, 0, 1,
probMat.values, i, size, newCovsMat.values, 0, covSize)
}
totalCnt += size
this
}
}
/**
* Summary of GaussianMixture.
*

View file

@ -285,17 +285,6 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
testClusteringModelSingleProbabilisticPrediction(model, model.predictProbability, dataset,
model.getFeaturesCol, model.getProbabilityCol)
}
test("GMM on blocks") {
Seq(dataset, sparseDataset, denseDataset, rDataset).foreach { dataset =>
val gmm = new GaussianMixture().setK(k).setMaxIter(20).setBlockSize(1).setSeed(seed)
val model = gmm.fit(dataset)
Seq(2, 4, 8, 16, 32).foreach { blockSize =>
val model2 = gmm.setBlockSize(blockSize).fit(dataset)
modelEquals(model, model2)
}
}
}
}
object GaussianMixtureSuite extends SparkFunSuite {

View file

@ -20,8 +20,8 @@ import warnings
from pyspark import since, keyword_only
from pyspark.ml.param.shared import HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol, \
HasAggregationDepth, HasWeightCol, HasTol, HasProbabilityCol, HasBlockSize, \
HasDistanceMeasure, HasCheckpointInterval, Param, Params, TypeConverters
HasAggregationDepth, HasWeightCol, HasTol, HasProbabilityCol, HasDistanceMeasure, \
HasCheckpointInterval, Param, Params, TypeConverters
from pyspark.ml.util import JavaMLWritable, JavaMLReadable, GeneralJavaMLWritable, \
HasTrainingSummary, SparkContext
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaWrapper
@ -101,8 +101,7 @@ class ClusteringSummary(JavaWrapper):
@inherit_doc
class _GaussianMixtureParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol,
HasProbabilityCol, HasTol, HasAggregationDepth, HasWeightCol,
HasBlockSize):
HasProbabilityCol, HasTol, HasAggregationDepth, HasWeightCol):
"""
Params for :py:class:`GaussianMixture` and :py:class:`GaussianMixtureModel`.
@ -114,7 +113,7 @@ class _GaussianMixtureParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionC
def __init__(self, *args):
super(_GaussianMixtureParams, self).__init__(*args)
self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2, blockSize=1)
self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2)
@since("2.0.0")
def getK(self):
@ -251,8 +250,6 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav
>>> gm.getMaxIter()
30
>>> model = gm.fit(df)
>>> model.getBlockSize()
1
>>> model.getAggregationDepth()
2
>>> model.getFeaturesCol()
@ -339,11 +336,11 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav
@keyword_only
def __init__(self, *, featuresCol="features", predictionCol="prediction", k=2,
probabilityCol="probability", tol=0.01, maxIter=100, seed=None,
aggregationDepth=2, weightCol=None, blockSize=1):
aggregationDepth=2, weightCol=None):
"""
__init__(self, \\*, featuresCol="features", predictionCol="prediction", k=2, \
probabilityCol="probability", tol=0.01, maxIter=100, seed=None, \
aggregationDepth=2, weightCol=None, blockSize=1)
aggregationDepth=2, weightCol=None)
"""
super(GaussianMixture, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.GaussianMixture",
@ -358,11 +355,11 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav
@since("2.0.0")
def setParams(self, *, featuresCol="features", predictionCol="prediction", k=2,
probabilityCol="probability", tol=0.01, maxIter=100, seed=None,
aggregationDepth=2, weightCol=None, blockSize=1):
aggregationDepth=2, weightCol=None):
"""
setParams(self, \\*, featuresCol="features", predictionCol="prediction", k=2, \
probabilityCol="probability", tol=0.01, maxIter=100, seed=None, \
aggregationDepth=2, weightCol=None, blockSize=1)
aggregationDepth=2, weightCol=None)
Sets params for GaussianMixture.
"""
@ -432,13 +429,6 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav
"""
return self._set(aggregationDepth=value)
@since("3.1.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)
class GaussianMixtureSummary(ClusteringSummary):
"""