[SPARK-17033][ML][MLLIB] GaussianMixture should use treeAggregate to improve performance

## What changes were proposed in this pull request?
```GaussianMixture``` should use ```treeAggregate``` rather than ```aggregate``` to improve performance and scalability. In my test of dataset with 200 features and 1M instance, I found there is 20% increased performance.
BTW, we should destroy broadcast variable ```compute``` at the end of each iteration.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #14621 from yanboliang/spark-17033.
This commit is contained in:
Yanbo Liang 2016-08-12 10:06:17 -07:00
parent 79e2caa132
commit bbae20ade1

View file

@ -198,7 +198,7 @@ class GaussianMixture private (
val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_) val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_)
// aggregate the cluster contribution for all sample points // aggregate the cluster contribution for all sample points
val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _ += _) val sums = breezeData.treeAggregate(ExpectationSum.zero(k, d))(compute.value, _ += _)
// Create new distributions based on the partial assignments // Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature) // (often referred to as the "M" step in literature)
@ -227,6 +227,7 @@ class GaussianMixture private (
llhp = llh // current becomes previous llhp = llh // current becomes previous
llh = sums.logLikelihood // this is the freshly computed log-likelihood llh = sums.logLikelihood // this is the freshly computed log-likelihood
iter += 1 iter += 1
compute.destroy(blocking = false)
} }
new GaussianMixtureModel(weights, gaussians) new GaussianMixtureModel(weights, gaussians)