[SPARK-30699][ML][PYSPARK] GMM blockify input vectors
### What changes were proposed in this pull request? 1, add new param blockSize; 2, if blockSize==1, keep original behavior, code path trainOnRows; 3, if blockSize>1, standardize and stack input vectors to blocks (like ALS/MLP), code path trainOnBlocks ### Why are the changes needed? performance gain on dense dataset HIGGS: 1, save about 45% RAM; 2, 3X faster with openBLAS ### Does this PR introduce any user-facing change? add a new expert param `blockSize` ### How was this patch tested? added testsuites Closes #27473 from zhengruifeng/blockify_gmm. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
This commit is contained in:
parent
a89006aba0
commit
e7fa778dc7
|
@ -271,7 +271,7 @@ private[spark] object BLAS extends Serializable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's ?SPR.
|
||||
* Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's ?SPR.
|
||||
*
|
||||
* @param U the upper triangular part of the matrix packed in an array (column major)
|
||||
*/
|
||||
|
|
|
@ -55,7 +55,7 @@ class MultivariateGaussian @Since("2.0.0") (
|
|||
*/
|
||||
@transient private lazy val tuple = {
|
||||
val (rootSigmaInv, u) = calculateCovarianceConstants
|
||||
val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv)
|
||||
val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv).toDense
|
||||
val rootSigmaInvMulMu = rootSigmaInvMat.multiply(mean)
|
||||
(rootSigmaInvMat, u, rootSigmaInvMulMu)
|
||||
}
|
||||
|
@ -81,6 +81,36 @@ class MultivariateGaussian @Since("2.0.0") (
|
|||
u - 0.5 * BLAS.dot(v, v)
|
||||
}
|
||||
|
||||
private[ml] def pdf(X: Matrix): DenseVector = {
|
||||
val mat = DenseMatrix.zeros(X.numRows, X.numCols)
|
||||
pdf(X, mat)
|
||||
}
|
||||
|
||||
private[ml] def pdf(X: Matrix, mat: DenseMatrix): DenseVector = {
|
||||
require(!mat.isTransposed)
|
||||
|
||||
BLAS.gemm(1.0, X, rootSigmaInvMat.transpose, 0.0, mat)
|
||||
val m = mat.numRows
|
||||
val n = mat.numCols
|
||||
|
||||
val pdfVec = mat.multiply(rootSigmaInvMulMu)
|
||||
|
||||
val blas = BLAS.getBLAS(n)
|
||||
val squared1 = blas.ddot(n, rootSigmaInvMulMu.values, 1, rootSigmaInvMulMu.values, 1)
|
||||
|
||||
val localU = u
|
||||
var i = 0
|
||||
while (i < m) {
|
||||
val squared2 = blas.ddot(n, mat.values, i, m, mat.values, i, m)
|
||||
val dot = pdfVec(i)
|
||||
val squaredSum = squared1 + squared2 - dot - dot
|
||||
pdfVec.values(i) = math.exp(localU - 0.5 * squaredSum)
|
||||
i += 1
|
||||
}
|
||||
|
||||
pdfVec
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate distribution dependent components used for the density function:
|
||||
* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * inv(sigma) * (x-mu))
|
||||
|
|
|
@ -27,6 +27,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
|
|||
test("univariate") {
|
||||
val x1 = Vectors.dense(0.0)
|
||||
val x2 = Vectors.dense(1.5)
|
||||
val mat = Matrices.fromVectors(Seq(x1, x2))
|
||||
|
||||
val mu = Vectors.dense(0.0)
|
||||
val sigma1 = Matrices.dense(1, 1, Array(1.0))
|
||||
|
@ -35,6 +36,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
|
|||
assert(dist1.logpdf(x2) ~== -2.0439385332046727 absTol 1E-5)
|
||||
assert(dist1.pdf(x1) ~== 0.39894 absTol 1E-5)
|
||||
assert(dist1.pdf(x2) ~== 0.12952 absTol 1E-5)
|
||||
assert(dist1.pdf(mat) ~== Vectors.dense(0.39894, 0.12952) absTol 1E-5)
|
||||
|
||||
val sigma2 = Matrices.dense(1, 1, Array(4.0))
|
||||
val dist2 = new MultivariateGaussian(mu, sigma2)
|
||||
|
@ -42,11 +44,13 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
|
|||
assert(dist2.logpdf(x2) ~== -1.893335713764618 absTol 1E-5)
|
||||
assert(dist2.pdf(x1) ~== 0.19947 absTol 1E-5)
|
||||
assert(dist2.pdf(x2) ~== 0.15057 absTol 1E-5)
|
||||
assert(dist2.pdf(mat) ~== Vectors.dense(0.19947, 0.15057) absTol 1E-5)
|
||||
}
|
||||
|
||||
test("multivariate") {
|
||||
val x1 = Vectors.dense(0.0, 0.0)
|
||||
val x2 = Vectors.dense(1.0, 1.0)
|
||||
val mat = Matrices.fromVectors(Seq(x1, x2))
|
||||
|
||||
val mu = Vectors.dense(0.0, 0.0)
|
||||
val sigma1 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0))
|
||||
|
@ -55,6 +59,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
|
|||
assert(dist1.logpdf(x2) ~== -2.8378770664093453 absTol 1E-5)
|
||||
assert(dist1.pdf(x1) ~== 0.15915 absTol 1E-5)
|
||||
assert(dist1.pdf(x2) ~== 0.05855 absTol 1E-5)
|
||||
assert(dist1.pdf(mat) ~== Vectors.dense(0.15915, 0.05855) absTol 1E-5)
|
||||
|
||||
val sigma2 = Matrices.dense(2, 2, Array(4.0, -1.0, -1.0, 2.0))
|
||||
val dist2 = new MultivariateGaussian(mu, sigma2)
|
||||
|
@ -62,21 +67,25 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
|
|||
assert(dist2.logpdf(x2) ~== -3.3822607123655732 absTol 1E-5)
|
||||
assert(dist2.pdf(x1) ~== 0.060155 absTol 1E-5)
|
||||
assert(dist2.pdf(x2) ~== 0.033971 absTol 1E-5)
|
||||
assert(dist2.pdf(mat) ~== Vectors.dense(0.060155, 0.033971) absTol 1E-5)
|
||||
}
|
||||
|
||||
test("multivariate degenerate") {
|
||||
val x1 = Vectors.dense(0.0, 0.0)
|
||||
val x2 = Vectors.dense(1.0, 1.0)
|
||||
val mat = Matrices.fromVectors(Seq(x1, x2))
|
||||
|
||||
val mu = Vectors.dense(0.0, 0.0)
|
||||
val sigma = Matrices.dense(2, 2, Array(1.0, 1.0, 1.0, 1.0))
|
||||
val dist = new MultivariateGaussian(mu, sigma)
|
||||
assert(dist.pdf(x1) ~== 0.11254 absTol 1E-5)
|
||||
assert(dist.pdf(x2) ~== 0.068259 absTol 1E-5)
|
||||
assert(dist.pdf(mat) ~== Vectors.dense(0.11254, 0.068259) absTol 1E-5)
|
||||
}
|
||||
|
||||
test("SPARK-11302") {
|
||||
val x = Vectors.dense(629, 640, 1.7188, 618.19)
|
||||
val mat = Matrices.fromVectors(Seq(x))
|
||||
val mu = Vectors.dense(
|
||||
1055.3910505836575, 1070.489299610895, 1.39020554474708, 1040.5907503867697)
|
||||
val sigma = Matrices.dense(4, 4, Array(
|
||||
|
@ -87,5 +96,6 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
|
|||
val dist = new MultivariateGaussian(mu, sigma)
|
||||
// Agrees with R's dmvnorm: 7.154782e-05
|
||||
assert(dist.pdf(x) ~== 7.154782224045512E-5 absTol 1E-9)
|
||||
assert(dist.pdf(mat) ~== Vectors.dense(7.154782224045512E-5) absTol 1E-5)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
|
|||
*/
|
||||
private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter with HasFeaturesCol
|
||||
with HasSeed with HasPredictionCol with HasWeightCol with HasProbabilityCol with HasTol
|
||||
with HasAggregationDepth {
|
||||
with HasAggregationDepth with HasBlockSize {
|
||||
|
||||
/**
|
||||
* Number of independent Gaussians in the mixture model. Must be greater than 1. Default: 2.
|
||||
|
@ -279,8 +279,7 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] {
|
|||
* @param weights Weights for each Gaussian
|
||||
* @return Probability (partial assignment) for each of the k clusters
|
||||
*/
|
||||
private[clustering]
|
||||
def computeProbabilities(
|
||||
private[clustering] def computeProbabilities(
|
||||
features: Vector,
|
||||
dists: Array[MultivariateGaussian],
|
||||
weights: Array[Double]): Array[Double] = {
|
||||
|
@ -375,6 +374,25 @@ class GaussianMixture @Since("2.0.0") (
|
|||
@Since("3.0.0")
|
||||
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
|
||||
|
||||
/**
|
||||
* Set block size for stacking input data in matrices.
|
||||
* If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
|
||||
* If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines
|
||||
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
|
||||
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
|
||||
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
|
||||
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
|
||||
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
|
||||
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
|
||||
* performance regression.
|
||||
* Default is 1.
|
||||
*
|
||||
* @group expertSetParam
|
||||
*/
|
||||
@Since("3.1.0")
|
||||
def setBlockSize(value: Int): this.type = set(blockSize, value)
|
||||
setDefault(blockSize -> 1)
|
||||
|
||||
/**
|
||||
* Number of samples per cluster to use when initializing Gaussians.
|
||||
*/
|
||||
|
@ -392,7 +410,11 @@ class GaussianMixture @Since("2.0.0") (
|
|||
s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" +
|
||||
s" matrix is quadratic in the number of features.")
|
||||
|
||||
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
|
||||
instr.logPipelineStage(this)
|
||||
instr.logDataset(dataset)
|
||||
instr.logParams(this, featuresCol, predictionCol, probabilityCol, weightCol, k, maxIter,
|
||||
seed, tol, aggregationDepth, blockSize)
|
||||
instr.logNumFeatures(numFeatures)
|
||||
|
||||
val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
|
||||
col($(weightCol)).cast(DoubleType)
|
||||
|
@ -401,25 +423,50 @@ class GaussianMixture @Since("2.0.0") (
|
|||
}
|
||||
|
||||
val instances = dataset.select(DatasetUtils.columnToVector(dataset, $(featuresCol)), w)
|
||||
.as[(Vector, Double)]
|
||||
.rdd
|
||||
.as[(Vector, Double)].rdd
|
||||
.setName("training instances")
|
||||
|
||||
if (handlePersistence) {
|
||||
if ($(blockSize) == 1 && dataset.storageLevel == StorageLevel.NONE) {
|
||||
instances.persist(StorageLevel.MEMORY_AND_DISK)
|
||||
}
|
||||
|
||||
val sc = spark.sparkContext
|
||||
val numClusters = $(k)
|
||||
|
||||
instr.logPipelineStage(this)
|
||||
instr.logDataset(dataset)
|
||||
instr.logParams(this, featuresCol, predictionCol, probabilityCol, weightCol, k, maxIter,
|
||||
seed, tol, aggregationDepth)
|
||||
instr.logNumFeatures(numFeatures)
|
||||
|
||||
// TODO: SPARK-15785 Support users supplied initial GMM.
|
||||
val (weights, gaussians) = initRandom(instances, numClusters, numFeatures)
|
||||
val (weights, gaussians) = initRandom(instances, $(k), numFeatures)
|
||||
|
||||
val (logLikelihood, iteration) = if ($(blockSize) == 1) {
|
||||
trainOnRows(instances, weights, gaussians, numFeatures, instr)
|
||||
} else {
|
||||
val sparsity = 1 - instances.map { case (v, _) => v.numNonzeros.toDouble / v.size }.mean()
|
||||
instr.logNamedValue("sparsity", sparsity.toString)
|
||||
if (sparsity > 0.5) {
|
||||
logWarning(s"sparsity of input dataset is $sparsity, " +
|
||||
s"which may hurt performance in high-level BLAS.")
|
||||
}
|
||||
trainOnBlocks(instances, weights, gaussians, numFeatures, instr)
|
||||
}
|
||||
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
|
||||
|
||||
val gaussianDists = gaussians.map { case (mean, covVec) =>
|
||||
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
|
||||
new MultivariateGaussian(mean, cov)
|
||||
}
|
||||
|
||||
val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists))
|
||||
.setParent(this)
|
||||
val summary = new GaussianMixtureSummary(model.transform(dataset),
|
||||
$(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iteration)
|
||||
instr.logNamedValue("logLikelihood", logLikelihood)
|
||||
instr.logNamedValue("clusterSizes", summary.clusterSizes)
|
||||
model.setSummary(Some(summary))
|
||||
}
|
||||
|
||||
private def trainOnRows(
|
||||
instances: RDD[(Vector, Double)],
|
||||
weights: Array[Double],
|
||||
gaussians: Array[(DenseVector, DenseVector)],
|
||||
numFeatures: Int,
|
||||
instr: Instrumentation): (Double, Int) = {
|
||||
val sc = instances.sparkContext
|
||||
var logLikelihood = Double.MinValue
|
||||
var logLikelihoodPrev = 0.0
|
||||
|
||||
|
@ -440,7 +487,7 @@ class GaussianMixture @Since("2.0.0") (
|
|||
val ws = agg.weights.sum
|
||||
if (iteration == 0) weightSumAccum.add(ws)
|
||||
logLikelihoodAccum.add(agg.logLikelihood)
|
||||
Iterator.tabulate(numClusters) { i =>
|
||||
Iterator.tabulate(bcWeights.value.length) { i =>
|
||||
(i, (agg.means(i), agg.covs(i), agg.weights(i), ws))
|
||||
}
|
||||
} else Iterator.empty
|
||||
|
@ -471,21 +518,77 @@ class GaussianMixture @Since("2.0.0") (
|
|||
instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood)
|
||||
iteration += 1
|
||||
}
|
||||
if (handlePersistence) {
|
||||
instances.unpersist()
|
||||
}
|
||||
|
||||
val gaussianDists = gaussians.map { case (mean, covVec) =>
|
||||
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
|
||||
new MultivariateGaussian(mean, cov)
|
||||
}
|
||||
(logLikelihood, iteration)
|
||||
}
|
||||
|
||||
val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this)
|
||||
val summary = new GaussianMixtureSummary(model.transform(dataset),
|
||||
$(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iteration)
|
||||
instr.logNamedValue("logLikelihood", logLikelihood)
|
||||
instr.logNamedValue("clusterSizes", summary.clusterSizes)
|
||||
model.setSummary(Some(summary))
|
||||
private def trainOnBlocks(
|
||||
instances: RDD[(Vector, Double)],
|
||||
weights: Array[Double],
|
||||
gaussians: Array[(DenseVector, DenseVector)],
|
||||
numFeatures: Int,
|
||||
instr: Instrumentation): (Double, Int) = {
|
||||
val blocks = instances.mapPartitions { iter =>
|
||||
iter.grouped($(blockSize))
|
||||
.map { seq => (Matrices.fromVectors(seq.map(_._1)), seq.map(_._2).toArray) }
|
||||
}.persist(StorageLevel.MEMORY_AND_DISK)
|
||||
.setName(s"training dataset (blockSize=${$(blockSize)})")
|
||||
|
||||
val sc = instances.sparkContext
|
||||
var logLikelihood = Double.MinValue
|
||||
var logLikelihoodPrev = 0.0
|
||||
|
||||
var iteration = 0
|
||||
while (iteration < $(maxIter) && math.abs(logLikelihood - logLikelihoodPrev) > $(tol)) {
|
||||
val weightSumAccum = if (iteration == 0) sc.doubleAccumulator else null
|
||||
val logLikelihoodAccum = sc.doubleAccumulator
|
||||
val bcWeights = sc.broadcast(weights)
|
||||
val bcGaussians = sc.broadcast(gaussians)
|
||||
|
||||
// aggregate the cluster contribution for all sample points,
|
||||
// and then compute the new distributions
|
||||
blocks.mapPartitions { iter =>
|
||||
if (iter.nonEmpty) {
|
||||
val agg = new BlockExpectationAggregator(numFeatures,
|
||||
$(blockSize), bcWeights, bcGaussians)
|
||||
while (iter.hasNext) { agg.add(iter.next) }
|
||||
// sum of weights in this partition
|
||||
val ws = agg.weights.sum
|
||||
if (iteration == 0) weightSumAccum.add(ws)
|
||||
logLikelihoodAccum.add(agg.logLikelihood)
|
||||
agg.meanIter.zip(agg.covIter).zipWithIndex
|
||||
.map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), ws)) }
|
||||
} else Iterator.empty
|
||||
}.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
|
||||
// update the weights, means and covariances for i-th distributions
|
||||
BLAS.axpy(1.0, mean2, mean1)
|
||||
BLAS.axpy(1.0, cov2, cov1)
|
||||
(mean1, cov1, w1 + w2, ws1 + ws2)
|
||||
}.mapValues { case (mean, cov, w, ws) =>
|
||||
// Create new distributions based on the partial assignments
|
||||
// (often referred to as the "M" step in literature)
|
||||
GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
|
||||
}.collect().foreach { case (i, (weight, gaussian)) =>
|
||||
weights(i) = weight
|
||||
gaussians(i) = gaussian
|
||||
}
|
||||
|
||||
bcWeights.destroy()
|
||||
bcGaussians.destroy()
|
||||
|
||||
if (iteration == 0) {
|
||||
instr.logNumExamples(weightSumAccum.count)
|
||||
instr.logSumOfWeights(weightSumAccum.value)
|
||||
}
|
||||
|
||||
logLikelihoodPrev = logLikelihood // current becomes previous
|
||||
logLikelihood = logLikelihoodAccum.value // this is the freshly computed log-likelihood
|
||||
instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood)
|
||||
iteration += 1
|
||||
}
|
||||
blocks.unpersist()
|
||||
|
||||
(logLikelihood, iteration)
|
||||
}
|
||||
|
||||
@Since("2.0.0")
|
||||
|
@ -626,16 +729,15 @@ private class ExpectationAggregator(
|
|||
bcWeights: Broadcast[Array[Double]],
|
||||
bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable {
|
||||
|
||||
private val k: Int = bcWeights.value.length
|
||||
private var totalCnt: Long = 0L
|
||||
private var newLogLikelihood: Double = 0.0
|
||||
private lazy val newWeights: Array[Double] = Array.ofDim[Double](k)
|
||||
private lazy val newMeans: Array[DenseVector] = Array.fill(k)(
|
||||
new DenseVector(Array.ofDim[Double](numFeatures)))
|
||||
private lazy val newCovs: Array[DenseVector] = Array.fill(k)(
|
||||
new DenseVector(Array.ofDim[Double](numFeatures * (numFeatures + 1) / 2)))
|
||||
private val k = bcWeights.value.length
|
||||
private var totalCnt = 0L
|
||||
private var newLogLikelihood = 0.0
|
||||
private val covSize = numFeatures * (numFeatures + 1) / 2
|
||||
private lazy val newWeights = Array.ofDim[Double](k)
|
||||
@transient private lazy val newMeans = Array.fill(k)(Vectors.zeros(numFeatures).toDense)
|
||||
@transient private lazy val newCovs = Array.fill(k)(Vectors.zeros(covSize).toDense)
|
||||
|
||||
@transient private lazy val oldGaussians = {
|
||||
@transient private lazy val gaussians = {
|
||||
bcGaussians.value.map { case (mean, covVec) =>
|
||||
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
|
||||
new MultivariateGaussian(mean, cov)
|
||||
|
@ -656,19 +758,19 @@ private class ExpectationAggregator(
|
|||
* Add a new training instance to this ExpectationAggregator, update the weights,
|
||||
* means and covariances for each distributions, and update the log likelihood.
|
||||
*
|
||||
* @param weightedVector The instance of data point to be added.
|
||||
* @param instance The instance of data point to be added.
|
||||
* @return This ExpectationAggregator object.
|
||||
*/
|
||||
def add(weightedVector: (Vector, Double)): this.type = {
|
||||
val (instance: Vector, weight: Double) = weightedVector
|
||||
def add(instance: (Vector, Double)): this.type = {
|
||||
val (vector: Vector, weight: Double) = instance
|
||||
val localWeights = bcWeights.value
|
||||
val localOldGaussians = oldGaussians
|
||||
val localGaussians = gaussians
|
||||
|
||||
val prob = new Array[Double](k)
|
||||
var probSum = 0.0
|
||||
var i = 0
|
||||
while (i < k) {
|
||||
val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance)
|
||||
val p = EPSILON + localWeights(i) * localGaussians(i).pdf(vector)
|
||||
prob(i) = p
|
||||
probSum += p
|
||||
i += 1
|
||||
|
@ -682,42 +784,128 @@ private class ExpectationAggregator(
|
|||
while (i < k) {
|
||||
val w = prob(i) / probSum * weight
|
||||
localNewWeights(i) += w
|
||||
BLAS.axpy(w, instance, localNewMeans(i))
|
||||
BLAS.spr(w, instance, localNewCovs(i))
|
||||
BLAS.axpy(w, vector, localNewMeans(i))
|
||||
BLAS.spr(w, vector, localNewCovs(i))
|
||||
i += 1
|
||||
}
|
||||
|
||||
totalCnt += 1
|
||||
this
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* BlockExpectationAggregator computes the partial expectation results.
|
||||
*
|
||||
* @param numFeatures The number of features.
|
||||
* @param bcWeights The broadcast weights for each Gaussian distribution in the mixture.
|
||||
* @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) Distribution
|
||||
* in the mixture. Note only upper triangular part of the covariance
|
||||
* matrix of each distribution is stored as dense vector (column major)
|
||||
* in order to reduce shuffled data size.
|
||||
*/
|
||||
private class BlockExpectationAggregator(
|
||||
numFeatures: Int,
|
||||
blockSize: Int,
|
||||
bcWeights: Broadcast[Array[Double]],
|
||||
bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable {
|
||||
|
||||
private val k = bcWeights.value.length
|
||||
private var totalCnt = 0L
|
||||
private var newLogLikelihood = 0.0
|
||||
private val covSize = numFeatures * (numFeatures + 1) / 2
|
||||
private lazy val newWeights = Array.ofDim[Double](k)
|
||||
@transient private lazy val newMeansMat = DenseMatrix.zeros(numFeatures, k)
|
||||
@transient private lazy val newCovsMat = DenseMatrix.zeros(covSize, k)
|
||||
@transient private lazy val auxiliaryProbMat = DenseMatrix.zeros(blockSize, k)
|
||||
@transient private lazy val auxiliaryPDFMat = DenseMatrix.zeros(blockSize, numFeatures)
|
||||
@transient private lazy val auxiliaryCovVec = Vectors.zeros(covSize).toDense
|
||||
|
||||
@transient private lazy val gaussians = {
|
||||
bcGaussians.value.map { case (mean, covVec) =>
|
||||
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
|
||||
new MultivariateGaussian(mean, cov)
|
||||
}
|
||||
}
|
||||
|
||||
def count: Long = totalCnt
|
||||
|
||||
def logLikelihood: Double = newLogLikelihood
|
||||
|
||||
def weights: Array[Double] = newWeights
|
||||
|
||||
def meanIter: Iterator[DenseVector] = newMeansMat.colIter.map(_.toDense)
|
||||
|
||||
def covIter: Iterator[DenseVector] = newCovsMat.colIter.map(_.toDense)
|
||||
|
||||
/**
|
||||
* Merge another ExpectationAggregator, update the weights, means and covariances
|
||||
* for each distributions, and update the log likelihood.
|
||||
* (Note that it's in place merging; as a result, `this` object will be modified.)
|
||||
* Add a new training instance block to this BlockExpectationAggregator, update the weights,
|
||||
* means and covariances for each distributions, and update the log likelihood.
|
||||
*
|
||||
* @param other The other ExpectationAggregator to be merged.
|
||||
* @return This ExpectationAggregator object.
|
||||
* @param block The instance block of data point to be added.
|
||||
* @return This BlockExpectationAggregator object.
|
||||
*/
|
||||
def merge(other: ExpectationAggregator): this.type = {
|
||||
if (other.count != 0) {
|
||||
totalCnt += other.totalCnt
|
||||
def add(block: (Matrix, Array[Double])): this.type = {
|
||||
val (matrix: Matrix, weights: Array[Double]) = block
|
||||
require(matrix.isTransposed)
|
||||
val size = matrix.numRows
|
||||
require(weights.length == size)
|
||||
|
||||
val localThisNewWeights = this.newWeights
|
||||
val localOtherNewWeights = other.newWeights
|
||||
val localThisNewMeans = this.newMeans
|
||||
val localOtherNewMeans = other.newMeans
|
||||
val localThisNewCovs = this.newCovs
|
||||
val localOtherNewCovs = other.newCovs
|
||||
var i = 0
|
||||
while (i < k) {
|
||||
localThisNewWeights(i) += localOtherNewWeights(i)
|
||||
BLAS.axpy(1.0, localOtherNewMeans(i), localThisNewMeans(i))
|
||||
BLAS.axpy(1.0, localOtherNewCovs(i), localThisNewCovs(i))
|
||||
i += 1
|
||||
}
|
||||
newLogLikelihood += other.newLogLikelihood
|
||||
val blas1 = BLAS.getBLAS(size)
|
||||
val blas2 = BLAS.getBLAS(k)
|
||||
|
||||
val probMat = if (blockSize == size) auxiliaryProbMat else DenseMatrix.zeros(size, k)
|
||||
require(!probMat.isTransposed)
|
||||
java.util.Arrays.fill(probMat.values, EPSILON)
|
||||
|
||||
val pdfMat = if (blockSize == size) auxiliaryPDFMat else DenseMatrix.zeros(size, numFeatures)
|
||||
var j = 0
|
||||
while (j < k) {
|
||||
val pdfVec = gaussians(j).pdf(matrix, pdfMat)
|
||||
blas1.daxpy(size, bcWeights.value(j), pdfVec.values, 0, 1, probMat.values, j * size, 1)
|
||||
j += 1
|
||||
}
|
||||
|
||||
var i = 0
|
||||
while (i < size) {
|
||||
val weight = weights(i)
|
||||
val probSum = blas2.dasum(k, probMat.values, i, size)
|
||||
blas2.dscal(k, weight / probSum, probMat.values, i, size)
|
||||
blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1)
|
||||
newLogLikelihood += math.log(probSum) * weight
|
||||
i += 1
|
||||
}
|
||||
|
||||
BLAS.gemm(1.0, matrix.transpose, probMat, 1.0, newMeansMat)
|
||||
|
||||
// compute the cov vector for each row vector
|
||||
val covVec = auxiliaryCovVec
|
||||
val covVecIter = matrix match {
|
||||
case dm: DenseMatrix =>
|
||||
Iterator.tabulate(size) { i =>
|
||||
java.util.Arrays.fill(covVec.values, 0.0)
|
||||
// when input block is dense, directly use nativeBLAS to avoid array copy
|
||||
BLAS.nativeBLAS.dspr("U", numFeatures, 1.0, dm.values, i * numFeatures, 1,
|
||||
covVec.values, 0)
|
||||
covVec
|
||||
}
|
||||
|
||||
case sm: SparseMatrix =>
|
||||
sm.rowIter.map { vec =>
|
||||
java.util.Arrays.fill(covVec.values, 0.0)
|
||||
BLAS.spr(1.0, vec, covVec)
|
||||
covVec
|
||||
}
|
||||
}
|
||||
|
||||
covVecIter.zipWithIndex.foreach { case (covVec, i) =>
|
||||
BLAS.nativeBLAS.dger(covSize, k, 1.0, covVec.values, 0, 1,
|
||||
probMat.values, i, size, newCovsMat.values, 0, covSize)
|
||||
}
|
||||
|
||||
totalCnt += size
|
||||
|
||||
this
|
||||
}
|
||||
}
|
||||
|
|
|
@ -285,6 +285,17 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
|
|||
testClusteringModelSingleProbabilisticPrediction(model, model.predictProbability, dataset,
|
||||
model.getFeaturesCol, model.getProbabilityCol)
|
||||
}
|
||||
|
||||
test("GMM on blocks") {
|
||||
Seq(dataset, sparseDataset, denseDataset, rDataset).foreach { dataset =>
|
||||
val gmm = new GaussianMixture().setK(k).setMaxIter(20).setBlockSize(1).setSeed(seed)
|
||||
val model = gmm.fit(dataset)
|
||||
Seq(2, 4, 8, 16, 32).foreach { blockSize =>
|
||||
val model2 = gmm.setBlockSize(blockSize).fit(dataset)
|
||||
modelEquals(model, model2)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object GaussianMixtureSuite extends SparkFunSuite {
|
||||
|
|
|
@ -98,7 +98,8 @@ class ClusteringSummary(JavaWrapper):
|
|||
|
||||
@inherit_doc
|
||||
class _GaussianMixtureParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol,
|
||||
HasProbabilityCol, HasTol, HasAggregationDepth, HasWeightCol):
|
||||
HasProbabilityCol, HasTol, HasAggregationDepth, HasWeightCol,
|
||||
HasBlockSize):
|
||||
"""
|
||||
Params for :py:class:`GaussianMixture` and :py:class:`GaussianMixtureModel`.
|
||||
|
||||
|
@ -243,6 +244,8 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav
|
|||
>>> gm.getMaxIter()
|
||||
30
|
||||
>>> model = gm.fit(df)
|
||||
>>> model.getBlockSize()
|
||||
1
|
||||
>>> model.getAggregationDepth()
|
||||
2
|
||||
>>> model.getFeaturesCol()
|
||||
|
@ -327,16 +330,16 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav
|
|||
@keyword_only
|
||||
def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
|
||||
probabilityCol="probability", tol=0.01, maxIter=100, seed=None,
|
||||
aggregationDepth=2, weightCol=None):
|
||||
aggregationDepth=2, weightCol=None, blockSize=1):
|
||||
"""
|
||||
__init__(self, featuresCol="features", predictionCol="prediction", k=2, \
|
||||
probabilityCol="probability", tol=0.01, maxIter=100, seed=None, \
|
||||
aggregationDepth=2, weightCol=None)
|
||||
aggregationDepth=2, weightCol=None, blockSize=1)
|
||||
"""
|
||||
super(GaussianMixture, self).__init__()
|
||||
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.GaussianMixture",
|
||||
self.uid)
|
||||
self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2)
|
||||
self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2, blockSize=1)
|
||||
kwargs = self._input_kwargs
|
||||
self.setParams(**kwargs)
|
||||
|
||||
|
@ -347,11 +350,11 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav
|
|||
@since("2.0.0")
|
||||
def setParams(self, featuresCol="features", predictionCol="prediction", k=2,
|
||||
probabilityCol="probability", tol=0.01, maxIter=100, seed=None,
|
||||
aggregationDepth=2, weightCol=None):
|
||||
aggregationDepth=2, weightCol=None, blockSize=1):
|
||||
"""
|
||||
setParams(self, featuresCol="features", predictionCol="prediction", k=2, \
|
||||
probabilityCol="probability", tol=0.01, maxIter=100, seed=None, \
|
||||
aggregationDepth=2, weightCol=None)
|
||||
aggregationDepth=2, weightCol=None, blockSize=1)
|
||||
|
||||
Sets params for GaussianMixture.
|
||||
"""
|
||||
|
@ -421,6 +424,13 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav
|
|||
"""
|
||||
return self._set(aggregationDepth=value)
|
||||
|
||||
@since("3.1.0")
|
||||
def setBlockSize(self, value):
|
||||
"""
|
||||
Sets the value of :py:attr:`blockSize`.
|
||||
"""
|
||||
return self._set(blockSize=value)
|
||||
|
||||
|
||||
class GaussianMixtureSummary(ClusteringSummary):
|
||||
"""
|
||||
|
|
Loading…
Reference in a new issue