diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index fc509d2ba1..e459367333 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -140,6 +140,10 @@ class GaussianMixture private ( // Get length of the input vectors val d = breezeData.first().length + // Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when + // d > 25 except for when k is very small + val distributeGaussians = ((k - 1.0) / k) * d > 25 + // Determine initial weights and corresponding Gaussians. // If the user supplied an initial GMM, we use those values, otherwise // we start with uniform weights, a random mean from the data, and @@ -171,14 +175,25 @@ class GaussianMixture private ( // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) val sumWeights = sums.weights.sum - var i = 0 - while (i < k) { - val mu = sums.means(i) / sums.weights(i) - BLAS.syr(-sums.weights(i), Vectors.fromBreeze(mu), - Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix]) - weights(i) = sums.weights(i) / sumWeights - gaussians(i) = new MultivariateGaussian(mu, sums.sigmas(i) / sums.weights(i)) - i = i + 1 + + if (distributeGaussians) { + val numPartitions = math.min(k, 1024) + val tuples = + Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i))) + val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) => + updateWeightsAndGaussians(mean, sigma, weight, sumWeights) + }.collect.unzip + Array.copy(ws, 0, weights, 0, ws.length) + Array.copy(gs, 0, gaussians, 0, gs.length) + } else { + var i = 0 + while (i < k) { + val (weight, gaussian) = + updateWeightsAndGaussians(sums.means(i), sums.sigmas(i), sums.weights(i), sumWeights) + weights(i) = weight + gaussians(i) = gaussian + i = i + 1 + } } llhp = llh // current becomes previous @@ -192,6 +207,19 @@ class GaussianMixture private ( /** Java-friendly version of [[run()]] */ def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd) + private def updateWeightsAndGaussians( + mean: BDV[Double], + sigma: BreezeMatrix[Double], + weight: Double, + sumWeights: Double): (Double, MultivariateGaussian) = { + val mu = (mean /= weight) + BLAS.syr(-weight, Vectors.fromBreeze(mu), + Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix]) + val newWeight = weight / sumWeights + val newGaussian = new MultivariateGaussian(mu, sigma / weight) + (newWeight, newGaussian) + } + /** Average of dense breeze vectors */ private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = { val v = BDV.zeros[Double](x(0).length)