[SPARK-23217][ML] Add cosine distance measure to ClusteringEvaluator

## What changes were proposed in this pull request?

The PR provided an implementation of ClusteringEvaluator using the cosine distance measure.
This allows to evaluate clustering results created using the cosine distance, introduced in SPARK-22119.

In the corresponding JIRA, there is a design document for the algorithm implemented here.

## How was this patch tested?

Added UT which compares the result to the one provided by python sklearn.

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20396 from mgaido91/SPARK-23217.
This commit is contained in:
Marco Gaido 2018-02-13 11:51:19 -06:00 committed by Sean Owen
parent 05d051293f
commit 4e0fb010cc
2 changed files with 299 additions and 65 deletions

View file

@ -20,11 +20,12 @@ package org.apache.spark.ml.evaluation
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable,
SchemaUtils}
import org.apache.spark.sql.{Column, DataFrame, Dataset}
import org.apache.spark.sql.functions.{avg, col, udf}
import org.apache.spark.sql.types.DoubleType
@ -32,15 +33,11 @@ import org.apache.spark.sql.types.DoubleType
* :: Experimental ::
*
* Evaluator for clustering results.
* The metric computes the Silhouette measure
* using the squared Euclidean distance.
* The metric computes the Silhouette measure using the specified distance measure.
*
* The Silhouette is a measure for the validation
* of the consistency within clusters. It ranges
* between 1 and -1, where a value close to 1
* means that the points in a cluster are close
* to the other points in the same cluster and
* far from the points of the other clusters.
* The Silhouette is a measure for the validation of the consistency within clusters. It ranges
* between 1 and -1, where a value close to 1 means that the points in a cluster are close to the
* other points in the same cluster and far from the points of the other clusters.
*/
@Experimental
@Since("2.3.0")
@ -84,18 +81,40 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str
@Since("2.3.0")
def setMetricName(value: String): this.type = set(metricName, value)
setDefault(metricName -> "silhouette")
/**
* param for distance measure to be used in evaluation
* (supports `"squaredEuclidean"` (default), `"cosine"`)
* @group param
*/
@Since("2.4.0")
val distanceMeasure: Param[String] = {
val availableValues = Array("squaredEuclidean", "cosine")
val allowedParams = ParamValidators.inArray(availableValues)
new Param(this, "distanceMeasure", "distance measure in evaluation. Supported options: " +
availableValues.mkString("'", "', '", "'"), allowedParams)
}
/** @group getParam */
@Since("2.4.0")
def getDistanceMeasure: String = $(distanceMeasure)
/** @group setParam */
@Since("2.4.0")
def setDistanceMeasure(value: String): this.type = set(distanceMeasure, value)
setDefault(metricName -> "silhouette", distanceMeasure -> "squaredEuclidean")
@Since("2.3.0")
override def evaluate(dataset: Dataset[_]): Double = {
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
SchemaUtils.checkNumericType(dataset.schema, $(predictionCol))
$(metricName) match {
case "silhouette" =>
($(metricName), $(distanceMeasure)) match {
case ("silhouette", "squaredEuclidean") =>
SquaredEuclideanSilhouette.computeSilhouetteScore(
dataset, $(predictionCol), $(featuresCol)
)
dataset, $(predictionCol), $(featuresCol))
case ("silhouette", "cosine") =>
CosineSilhouette.computeSilhouetteScore(dataset, $(predictionCol), $(featuresCol))
}
}
}
@ -111,6 +130,48 @@ object ClusteringEvaluator
}
private[evaluation] abstract class Silhouette {
/**
* It computes the Silhouette coefficient for a point.
*/
def pointSilhouetteCoefficient(
clusterIds: Set[Double],
pointClusterId: Double,
pointClusterNumOfPoints: Long,
averageDistanceToCluster: (Double) => Double): Double = {
// Here we compute the average dissimilarity of the current point to any cluster of which the
// point is not a member.
// The cluster with the lowest average dissimilarity - i.e. the nearest cluster to the current
// point - is said to be the "neighboring cluster".
val otherClusterIds = clusterIds.filter(_ != pointClusterId)
val neighboringClusterDissimilarity = otherClusterIds.map(averageDistanceToCluster).min
// adjustment for excluding the node itself from the computation of the average dissimilarity
val currentClusterDissimilarity = if (pointClusterNumOfPoints == 1) {
0.0
} else {
averageDistanceToCluster(pointClusterId) * pointClusterNumOfPoints /
(pointClusterNumOfPoints - 1)
}
if (currentClusterDissimilarity < neighboringClusterDissimilarity) {
1 - (currentClusterDissimilarity / neighboringClusterDissimilarity)
} else if (currentClusterDissimilarity > neighboringClusterDissimilarity) {
(neighboringClusterDissimilarity / currentClusterDissimilarity) - 1
} else {
0.0
}
}
/**
* Compute the mean Silhouette values of all samples.
*/
def overallScore(df: DataFrame, scoreColumn: Column): Double = {
df.select(avg(scoreColumn)).collect()(0).getDouble(0)
}
}
/**
* SquaredEuclideanSilhouette computes the average of the
* Silhouette over all the data of the dataset, which is
@ -259,7 +320,7 @@ object ClusteringEvaluator
* `N` is the number of points in the dataset and `W` is the number
* of worker nodes.
*/
private[evaluation] object SquaredEuclideanSilhouette {
private[evaluation] object SquaredEuclideanSilhouette extends Silhouette {
private[this] var kryoRegistrationPerformed: Boolean = false
@ -336,18 +397,19 @@ private[evaluation] object SquaredEuclideanSilhouette {
* It computes the Silhouette coefficient for a point.
*
* @param broadcastedClustersMap A map of the precomputed values for each cluster.
* @param features The [[org.apache.spark.ml.linalg.Vector]] representing the current point.
* @param point The [[org.apache.spark.ml.linalg.Vector]] representing the current point.
* @param clusterId The id of the cluster the current point belongs to.
* @param squaredNorm The `$\Xi_{X}$` (which is the squared norm) precomputed for the point.
* @return The Silhouette for the point.
*/
def computeSilhouetteCoefficient(
broadcastedClustersMap: Broadcast[Map[Double, ClusterStats]],
features: Vector,
point: Vector,
clusterId: Double,
squaredNorm: Double): Double = {
def compute(squaredNorm: Double, point: Vector, clusterStats: ClusterStats): Double = {
def compute(targetClusterId: Double): Double = {
val clusterStats = broadcastedClustersMap.value(targetClusterId)
val pointDotClusterFeaturesSum = BLAS.dot(point, clusterStats.featureSum)
squaredNorm +
@ -355,41 +417,14 @@ private[evaluation] object SquaredEuclideanSilhouette {
2 * pointDotClusterFeaturesSum / clusterStats.numOfPoints
}
// Here we compute the average dissimilarity of the
// current point to any cluster of which the point
// is not a member.
// The cluster with the lowest average dissimilarity
// - i.e. the nearest cluster to the current point -
// is said to be the "neighboring cluster".
var neighboringClusterDissimilarity = Double.MaxValue
broadcastedClustersMap.value.keySet.foreach {
c =>
if (c != clusterId) {
val dissimilarity = compute(squaredNorm, features, broadcastedClustersMap.value(c))
if(dissimilarity < neighboringClusterDissimilarity) {
neighboringClusterDissimilarity = dissimilarity
}
}
}
val currentCluster = broadcastedClustersMap.value(clusterId)
// adjustment for excluding the node itself from
// the computation of the average dissimilarity
val currentClusterDissimilarity = if (currentCluster.numOfPoints == 1) {
0
} else {
compute(squaredNorm, features, currentCluster) * currentCluster.numOfPoints /
(currentCluster.numOfPoints - 1)
}
(currentClusterDissimilarity compare neighboringClusterDissimilarity).signum match {
case -1 => 1 - (currentClusterDissimilarity / neighboringClusterDissimilarity)
case 1 => (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1
case 0 => 0.0
}
pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet,
clusterId,
broadcastedClustersMap.value(clusterId).numOfPoints,
compute)
}
/**
* Compute the mean Silhouette values of all samples.
* Compute the Silhouette score of the dataset using squared Euclidean distance measure.
*
* @param dataset The input dataset (previously clustered) on which compute the Silhouette.
* @param predictionCol The name of the column which contains the predicted cluster id
@ -412,7 +447,7 @@ private[evaluation] object SquaredEuclideanSilhouette {
val clustersStatsMap = SquaredEuclideanSilhouette
.computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol)
// Silhouette is reasonable only when the number of clusters is grater then 1
// Silhouette is reasonable only when the number of clusters is greater then 1
assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.")
val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap)
@ -421,13 +456,190 @@ private[evaluation] object SquaredEuclideanSilhouette {
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double)
}
val silhouetteScore = dfWithSquaredNorm
.select(avg(
computeSilhouetteCoefficientUDF(
col(featuresCol), col(predictionCol).cast(DoubleType), col("squaredNorm"))
))
.collect()(0)
.getDouble(0)
val silhouetteScore = overallScore(dfWithSquaredNorm,
computeSilhouetteCoefficientUDF(col(featuresCol), col(predictionCol).cast(DoubleType),
col("squaredNorm")))
bClustersStatsMap.destroy()
silhouetteScore
}
}
/**
* The algorithm which is implemented in this object, instead, is an efficient and parallel
* implementation of the Silhouette using the cosine distance measure. The cosine distance
* measure is defined as `1 - s` where `s` is the cosine similarity between two points.
*
* The total distance of the point `X` to the points `$C_{i}$` belonging to the cluster `$\Gamma$`
* is:
*
* <blockquote>
* $$
* \sum\limits_{i=1}^N d(X, C_{i} ) =
* \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ \|X\|\|C_{i}\|} \Big)
* = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|}
* \frac{c_{ij}}{\|C_{i}\|}
* = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N
* \frac{c_{ij}}{\|C_{i}\|} \Big)
* $$
* </blockquote>
*
* where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` is the `j`-th dimension
* of the `i`-th point in cluster `$\Gamma$`.
*
* Then, we can define the vector:
*
* <blockquote>
* $$
* \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D
* $$
* </blockquote>
*
* which can be precomputed for each point and the vector
*
* <blockquote>
* $$
* \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i = 1, ..., D
* $$
* </blockquote>
*
* which can be precomputed too for each cluster `$\Gamma$` by its points `$C_{i}$`.
*
* With these definitions, the numerator becomes:
*
* <blockquote>
* $$
* N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}
* $$
* </blockquote>
*
* Thus the average distance of a point `X` to the points of the cluster `$\Gamma$` is:
*
* <blockquote>
* $$
* 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N}
* $$
* </blockquote>
*
* In the implementation, the precomputed values for the clusters are distributed among the worker
* nodes via broadcasted variables, because we can assume that the clusters are limited in number.
*
* The main strengths of this algorithm are the low computational complexity and the intrinsic
* parallelism. The precomputed information for each point and for each cluster can be computed
* with a computational complexity which is `O(N/W)`, where `N` is the number of points in the
* dataset and `W` is the number of worker nodes. After that, every point can be analyzed
* independently from the others.
*
* For every point we need to compute the average distance to all the clusters. Since the formula
* above requires `O(D)` operations, this phase has a computational complexity which is
* `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite low), `D` is the number
* of dimensions, `N` is the number of points in the dataset and `W` is the number of worker
* nodes.
*/
private[evaluation] object CosineSilhouette extends Silhouette {
private[this] val normalizedFeaturesColName = "normalizedFeatures"
/**
* The method takes the input dataset and computes the aggregated values
* about a cluster which are needed by the algorithm.
*
* @param df The DataFrame which contains the input data
* @param predictionCol The name of the column which contains the predicted cluster id
* for the point.
* @return A [[scala.collection.immutable.Map]] which associates each cluster id to a
* its statistics (ie. the precomputed values `N` and `$\Omega_{\Gamma}$`).
*/
def computeClusterStats(df: DataFrame, predictionCol: String): Map[Double, (Vector, Long)] = {
val numFeatures = df.select(col(normalizedFeaturesColName)).first().getAs[Vector](0).size
val clustersStatsRDD = df.select(
col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
.rdd
.map { row => (row.getDouble(0), row.getAs[Vector](1)) }
.aggregateByKey[(DenseVector, Long)]((Vectors.zeros(numFeatures).toDense, 0L))(
seqOp = {
case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), (normalizedFeatures)) =>
BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum)
(normalizedFeaturesSum, numOfPoints + 1)
},
combOp = {
case ((normalizedFeaturesSum1, numOfPoints1), (normalizedFeaturesSum2, numOfPoints2)) =>
BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1)
(normalizedFeaturesSum1, numOfPoints1 + numOfPoints2)
}
)
clustersStatsRDD
.collectAsMap()
.toMap
}
/**
* It computes the Silhouette coefficient for a point.
*
* @param broadcastedClustersMap A map of the precomputed values for each cluster.
* @param normalizedFeatures The [[org.apache.spark.ml.linalg.Vector]] representing the
* normalized features of the current point.
* @param clusterId The id of the cluster the current point belongs to.
*/
def computeSilhouetteCoefficient(
broadcastedClustersMap: Broadcast[Map[Double, (Vector, Long)]],
normalizedFeatures: Vector,
clusterId: Double): Double = {
def compute(targetClusterId: Double): Double = {
val (normalizedFeatureSum, numOfPoints) = broadcastedClustersMap.value(targetClusterId)
1 - BLAS.dot(normalizedFeatures, normalizedFeatureSum) / numOfPoints
}
pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet,
clusterId,
broadcastedClustersMap.value(clusterId)._2,
compute)
}
/**
* Compute the Silhouette score of the dataset using the cosine distance measure.
*
* @param dataset The input dataset (previously clustered) on which compute the Silhouette.
* @param predictionCol The name of the column which contains the predicted cluster id
* for the point.
* @param featuresCol The name of the column which contains the feature vector of the point.
* @return The average of the Silhouette values of the clustered data.
*/
def computeSilhouetteScore(
dataset: Dataset[_],
predictionCol: String,
featuresCol: String): Double = {
val normalizeFeatureUDF = udf {
features: Vector => {
val norm = Vectors.norm(features, 2.0)
features match {
case d: DenseVector => Vectors.dense(d.values.map(_ / norm))
case s: SparseVector => Vectors.sparse(s.size, s.indices, s.values.map(_ / norm))
}
}
}
val dfWithNormalizedFeatures = dataset.withColumn(normalizedFeaturesColName,
normalizeFeatureUDF(col(featuresCol)))
// compute aggregate values for clusters needed by the algorithm
val clustersStatsMap = computeClusterStats(dfWithNormalizedFeatures, predictionCol)
// Silhouette is reasonable only when the number of clusters is greater then 1
assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.")
val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap)
val computeSilhouetteCoefficientUDF = udf {
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double)
}
val silhouetteScore = overallScore(dfWithNormalizedFeatures,
computeSilhouetteCoefficientUDF(col(normalizedFeaturesColName),
col(predictionCol).cast(DoubleType)))
bClustersStatsMap.destroy()

View file

@ -66,16 +66,38 @@ class ClusteringEvaluatorSuite
assert(evaluator.evaluate(irisDataset) ~== 0.6564679231 relTol 1e-5)
}
test("number of clusters must be greater than one") {
val singleClusterDataset = irisDataset.where($"label" === 0.0)
/*
Use the following python code to load the data and evaluate it using scikit-learn package.
from sklearn import datasets
from sklearn.metrics import silhouette_score
iris = datasets.load_iris()
round(silhouette_score(iris.data, iris.target, metric='cosine'), 10)
0.7222369298
*/
test("cosine Silhouette") {
val evaluator = new ClusteringEvaluator()
.setFeaturesCol("features")
.setPredictionCol("label")
.setDistanceMeasure("cosine")
val e = intercept[AssertionError]{
evaluator.evaluate(singleClusterDataset)
assert(evaluator.evaluate(irisDataset) ~== 0.7222369298 relTol 1e-5)
}
test("number of clusters must be greater than one") {
val singleClusterDataset = irisDataset.where($"label" === 0.0)
Seq("squaredEuclidean", "cosine").foreach { distanceMeasure =>
val evaluator = new ClusteringEvaluator()
.setFeaturesCol("features")
.setPredictionCol("label")
.setDistanceMeasure(distanceMeasure)
val e = intercept[AssertionError] {
evaluator.evaluate(singleClusterDataset)
}
assert(e.getMessage.contains("Number of clusters must be greater than one"))
}
assert(e.getMessage.contains("Number of clusters must be greater than one"))
}
}