diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index a214b1a26f..43193adf3e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -198,7 +198,7 @@ class GaussianMixture private ( val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_) // aggregate the cluster contribution for all sample points - val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _ += _) + val sums = breezeData.treeAggregate(ExpectationSum.zero(k, d))(compute.value, _ += _) // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) @@ -227,6 +227,7 @@ class GaussianMixture private ( llhp = llh // current becomes previous llh = sums.logLikelihood // this is the freshly computed log-likelihood iter += 1 + compute.destroy(blocking = false) } new GaussianMixtureModel(weights, gaussians)