[SPARK-12974][ML][PYSPARK] Add Python API for spark.ml bisecting k-means
Add Python API for spark.ml bisecting k-means. Author: Yanbo Liang <ybliang8@gmail.com> Closes #10889 from yanboliang/spark-12974.
This commit is contained in:
parent
894921d813
commit
a183dda6ab
|
@ -21,7 +21,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel
|
|||
from pyspark.ml.param.shared import *
|
||||
from pyspark.mllib.common import inherit_doc
|
||||
|
||||
__all__ = ['KMeans', 'KMeansModel']
|
||||
__all__ = ['KMeans', 'KMeansModel', 'BisectingKMeans', 'BisectingKMeansModel']
|
||||
|
||||
|
||||
class KMeansModel(JavaModel, MLWritable, MLReadable):
|
||||
|
@ -175,6 +175,129 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol
|
|||
return self.getOrDefault(self.initSteps)
|
||||
|
||||
|
||||
class BisectingKMeansModel(JavaModel):
|
||||
"""
|
||||
.. note:: Experimental
|
||||
|
||||
Model fitted by BisectingKMeans.
|
||||
|
||||
.. versionadded:: 2.0.0
|
||||
"""
|
||||
|
||||
@since("2.0.0")
|
||||
def clusterCenters(self):
|
||||
"""Get the cluster centers, represented as a list of NumPy arrays."""
|
||||
return [c.toArray() for c in self._call_java("clusterCenters")]
|
||||
|
||||
@since("2.0.0")
|
||||
def computeCost(self, dataset):
|
||||
"""
|
||||
Computes the sum of squared distances between the input points
|
||||
and their corresponding cluster centers.
|
||||
"""
|
||||
return self._call_java("computeCost", dataset)
|
||||
|
||||
|
||||
@inherit_doc
|
||||
class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasSeed):
|
||||
"""
|
||||
.. note:: Experimental
|
||||
|
||||
A bisecting k-means algorithm based on the paper "A comparison of document clustering
|
||||
techniques" by Steinbach, Karypis, and Kumar, with modification to fit Spark.
|
||||
The algorithm starts from a single cluster that contains all points.
|
||||
Iteratively it finds divisible clusters on the bottom level and bisects each of them using
|
||||
k-means, until there are `k` leaf clusters in total or no leaf clusters are divisible.
|
||||
The bisecting steps of clusters on the same level are grouped together to increase parallelism.
|
||||
If bisecting all divisible clusters on the bottom level would result more than `k` leaf
|
||||
clusters, larger clusters get higher priority.
|
||||
|
||||
>>> from pyspark.mllib.linalg import Vectors
|
||||
>>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
|
||||
... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
|
||||
>>> df = sqlContext.createDataFrame(data, ["features"])
|
||||
>>> bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0)
|
||||
>>> model = bkm.fit(df)
|
||||
>>> centers = model.clusterCenters()
|
||||
>>> len(centers)
|
||||
2
|
||||
>>> model.computeCost(df)
|
||||
2.000...
|
||||
>>> transformed = model.transform(df).select("features", "prediction")
|
||||
>>> rows = transformed.collect()
|
||||
>>> rows[0].prediction == rows[1].prediction
|
||||
True
|
||||
>>> rows[2].prediction == rows[3].prediction
|
||||
True
|
||||
|
||||
.. versionadded:: 2.0.0
|
||||
"""
|
||||
|
||||
k = Param(Params._dummy(), "k", "number of clusters to create")
|
||||
minDivisibleClusterSize = Param(Params._dummy(), "minDivisibleClusterSize",
|
||||
"the minimum number of points (if >= 1.0) " +
|
||||
"or the minimum proportion")
|
||||
|
||||
@keyword_only
|
||||
def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=20,
|
||||
seed=None, k=4, minDivisibleClusterSize=1.0):
|
||||
"""
|
||||
__init__(self, featuresCol="features", predictionCol="prediction", maxIter=20, \
|
||||
seed=None, k=4, minDivisibleClusterSize=1.0)
|
||||
"""
|
||||
super(BisectingKMeans, self).__init__()
|
||||
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.BisectingKMeans",
|
||||
self.uid)
|
||||
self._setDefault(maxIter=20, k=4, minDivisibleClusterSize=1.0)
|
||||
kwargs = self.__init__._input_kwargs
|
||||
self.setParams(**kwargs)
|
||||
|
||||
@keyword_only
|
||||
@since("2.0.0")
|
||||
def setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20,
|
||||
seed=None, k=4, minDivisibleClusterSize=1.0):
|
||||
"""
|
||||
setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, \
|
||||
seed=None, k=4, minDivisibleClusterSize=1.0)
|
||||
Sets params for BisectingKMeans.
|
||||
"""
|
||||
kwargs = self.setParams._input_kwargs
|
||||
return self._set(**kwargs)
|
||||
|
||||
@since("2.0.0")
|
||||
def setK(self, value):
|
||||
"""
|
||||
Sets the value of :py:attr:`k`.
|
||||
"""
|
||||
self._paramMap[self.k] = value
|
||||
return self
|
||||
|
||||
@since("2.0.0")
|
||||
def getK(self):
|
||||
"""
|
||||
Gets the value of `k` or its default value.
|
||||
"""
|
||||
return self.getOrDefault(self.k)
|
||||
|
||||
@since("2.0.0")
|
||||
def setMinDivisibleClusterSize(self, value):
|
||||
"""
|
||||
Sets the value of :py:attr:`minDivisibleClusterSize`.
|
||||
"""
|
||||
self._paramMap[self.minDivisibleClusterSize] = value
|
||||
return self
|
||||
|
||||
@since("2.0.0")
|
||||
def getMinDivisibleClusterSize(self):
|
||||
"""
|
||||
Gets the value of `minDivisibleClusterSize` or its default value.
|
||||
"""
|
||||
return self.getOrDefault(self.minDivisibleClusterSize)
|
||||
|
||||
def _create_model(self, java_model):
|
||||
return BisectingKMeansModel(java_model)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import doctest
|
||||
import pyspark.ml.clustering
|
||||
|
|
Loading…
Reference in a new issue