[SPARK-5016] [MLLIB] Distribute GMM mixture components to executors
Distribute expensive portions of computation for Gaussian mixture components (in particular, pre-computation of `MultivariateGaussian.rootSigmaInv`, the inverse covariance matrix and covariance determinant) across executors. Repost of PR#4654. Notes for reviewers: * What should be the policy for when to distribute computation. Always? When numClusters > threshold? User-specified param? TODO: * Performance testing and comparison for large number of clusters Author: Feynman Liang <fliang@databricks.com> Closes #7166 from feynmanliang/GMM_parallel_mixtures and squashes the following commits: 4f351fa [Feynman Liang] Update heuristic and scaladoc 5ea947e [Feynman Liang] Fix parallelization logic 00eb7db [Feynman Liang] Add helper method for GMM's M step, remove distributeGaussians flag e7c8127 [Feynman Liang] Add distributeGaussians flag and tests 1da3c7f [Feynman Liang] Distribute mixtures
This commit is contained in:
parent
8c32b2e870
commit
f472b8cdc0
|
@ -140,6 +140,10 @@ class GaussianMixture private (
|
|||
// Get length of the input vectors
|
||||
val d = breezeData.first().length
|
||||
|
||||
// Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when
|
||||
// d > 25 except for when k is very small
|
||||
val distributeGaussians = ((k - 1.0) / k) * d > 25
|
||||
|
||||
// Determine initial weights and corresponding Gaussians.
|
||||
// If the user supplied an initial GMM, we use those values, otherwise
|
||||
// we start with uniform weights, a random mean from the data, and
|
||||
|
@ -171,15 +175,26 @@ class GaussianMixture private (
|
|||
// Create new distributions based on the partial assignments
|
||||
// (often referred to as the "M" step in literature)
|
||||
val sumWeights = sums.weights.sum
|
||||
|
||||
if (distributeGaussians) {
|
||||
val numPartitions = math.min(k, 1024)
|
||||
val tuples =
|
||||
Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i)))
|
||||
val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) =>
|
||||
updateWeightsAndGaussians(mean, sigma, weight, sumWeights)
|
||||
}.collect.unzip
|
||||
Array.copy(ws, 0, weights, 0, ws.length)
|
||||
Array.copy(gs, 0, gaussians, 0, gs.length)
|
||||
} else {
|
||||
var i = 0
|
||||
while (i < k) {
|
||||
val mu = sums.means(i) / sums.weights(i)
|
||||
BLAS.syr(-sums.weights(i), Vectors.fromBreeze(mu),
|
||||
Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
|
||||
weights(i) = sums.weights(i) / sumWeights
|
||||
gaussians(i) = new MultivariateGaussian(mu, sums.sigmas(i) / sums.weights(i))
|
||||
val (weight, gaussian) =
|
||||
updateWeightsAndGaussians(sums.means(i), sums.sigmas(i), sums.weights(i), sumWeights)
|
||||
weights(i) = weight
|
||||
gaussians(i) = gaussian
|
||||
i = i + 1
|
||||
}
|
||||
}
|
||||
|
||||
llhp = llh // current becomes previous
|
||||
llh = sums.logLikelihood // this is the freshly computed log-likelihood
|
||||
|
@ -192,6 +207,19 @@ class GaussianMixture private (
|
|||
/** Java-friendly version of [[run()]] */
|
||||
def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd)
|
||||
|
||||
private def updateWeightsAndGaussians(
|
||||
mean: BDV[Double],
|
||||
sigma: BreezeMatrix[Double],
|
||||
weight: Double,
|
||||
sumWeights: Double): (Double, MultivariateGaussian) = {
|
||||
val mu = (mean /= weight)
|
||||
BLAS.syr(-weight, Vectors.fromBreeze(mu),
|
||||
Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix])
|
||||
val newWeight = weight / sumWeights
|
||||
val newGaussian = new MultivariateGaussian(mu, sigma / weight)
|
||||
(newWeight, newGaussian)
|
||||
}
|
||||
|
||||
/** Average of dense breeze vectors */
|
||||
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
|
||||
val v = BDV.zeros[Double](x(0).length)
|
||||
|
|
Loading…
Reference in a new issue