diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 1f8ba0bcf1..cfad20db16 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -67,7 +67,8 @@ exportMethods("glm", "spark.fpGrowth", "spark.freqItemsets", "spark.associationRules", - "spark.findFrequentSequentialPatterns") + "spark.findFrequentSequentialPatterns", + "spark.assignClusters") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index eed7646522..09d817127e 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1479,6 +1479,10 @@ setGeneric("spark.associationRules", function(object) { standardGeneric("spark.a setGeneric("spark.findFrequentSequentialPatterns", function(data, ...) { standardGeneric("spark.findFrequentSequentialPatterns") }) +#' @rdname spark.powerIterationClustering +setGeneric("spark.assignClusters", + function(data, ...) { standardGeneric("spark.assignClusters") }) + #' @param object a fitted ML model object. #' @param path the directory where the model is saved. #' @param ... additional argument(s) passed to the method. diff --git a/R/pkg/R/mllib_clustering.R b/R/pkg/R/mllib_clustering.R index 900be68582..7d9dcebfe7 100644 --- a/R/pkg/R/mllib_clustering.R +++ b/R/pkg/R/mllib_clustering.R @@ -41,6 +41,12 @@ setClass("KMeansModel", representation(jobj = "jobj")) #' @note LDAModel since 2.1.0 setClass("LDAModel", representation(jobj = "jobj")) +#' S4 class that represents a PowerIterationClustering +#' +#' @param jobj a Java object reference to the backing Scala PowerIterationClustering +#' @note PowerIterationClustering since 3.0.0 +setClass("PowerIterationClustering", slots = list(jobj = "jobj")) + #' Bisecting K-Means Clustering Model #' #' Fits a bisecting k-means clustering model against a SparkDataFrame. @@ -610,3 +616,59 @@ setMethod("write.ml", signature(object = "LDAModel", path = "character"), function(object, path, overwrite = FALSE) { write_internal(object, path, overwrite) }) + +#' PowerIterationClustering +#' +#' A scalable graph clustering algorithm. Users can call \code{spark.assignClusters} to +#' return a cluster assignment for each input vertex. +#' +# Run the PIC algorithm and returns a cluster assignment for each input vertex. +#' @param data a SparkDataFrame. +#' @param k the number of clusters to create. +#' @param initMode the initialization algorithm. +#' @param maxIter the maximum number of iterations. +#' @param sourceCol the name of the input column for source vertex IDs. +#' @param destinationCol the name of the input column for destination vertex IDs +#' @param weightCol weight column name. If this is not set or \code{NULL}, +#' we treat all instance weights as 1.0. +#' @param ... additional argument(s) passed to the method. +#' @return A dataset that contains columns of vertex id and the corresponding cluster for the id. +#' The schema of it will be: +#' \code{id: Long} +#' \code{cluster: Int} +#' @rdname spark.powerIterationClustering +#' @aliases assignClusters,PowerIterationClustering-method,SparkDataFrame-method +#' @examples +#' \dontrun{ +#' df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0), +#' list(1L, 2L, 1.0), list(3L, 4L, 1.0), +#' list(4L, 0L, 0.1)), +#' schema = c("src", "dst", "weight")) +#' clusters <- spark.assignClusters(df, initMode="degree", weightCol="weight") +#' showDF(clusters) +#' } +#' @note spark.assignClusters(SparkDataFrame) since 3.0.0 +setMethod("spark.assignClusters", + signature(data = "SparkDataFrame"), + function(data, k = 2L, initMode = c("random", "degree"), maxIter = 20L, + sourceCol = "src", destinationCol = "dst", weightCol = NULL) { + if (!is.numeric(k) || k < 1) { + stop("k should be a number with value >= 1.") + } + if (!is.integer(maxIter) || maxIter <= 0) { + stop("maxIter should be a number with value > 0.") + } + initMode <- match.arg(initMode) + if (!is.null(weightCol) && weightCol == "") { + weightCol <- NULL + } else if (!is.null(weightCol)) { + weightCol <- as.character(weightCol) + } + jobj <- callJStatic("org.apache.spark.ml.r.PowerIterationClusteringWrapper", + "getPowerIterationClustering", + as.integer(k), initMode, + as.integer(maxIter), as.character(sourceCol), + as.character(destinationCol), weightCol) + object <- new("PowerIterationClustering", jobj = jobj) + dataFrame(callJMethod(object@jobj, "assignClusters", data@sdf)) + }) diff --git a/R/pkg/tests/fulltests/test_mllib_clustering.R b/R/pkg/tests/fulltests/test_mllib_clustering.R index 4110e13da4..b78a476f1d 100644 --- a/R/pkg/tests/fulltests/test_mllib_clustering.R +++ b/R/pkg/tests/fulltests/test_mllib_clustering.R @@ -319,4 +319,17 @@ test_that("spark.posterior and spark.perplexity", { expect_equal(length(local.posterior), sum(unlist(local.posterior))) }) +test_that("spark.assignClusters", { + df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0), + list(1L, 2L, 1.0), list(3L, 4L, 1.0), + list(4L, 0L, 0.1)), + schema = c("src", "dst", "weight")) + clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight") + expected_result <- createDataFrame(list(list(4L, 1L), list(0L, 0L), + list(1L, 0L), list(3L, 1L), + list(2L, 0L)), + schema = c("id", "cluster")) + expect_equivalent(expected_result, clusters) +}) + sparkR.session.stop() diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd index 1c6a03c4b9..cbe8c61725 100644 --- a/R/pkg/vignettes/sparkr-vignettes.Rmd +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -549,6 +549,8 @@ SparkR supports the following machine learning models and algorithms. * Latent Dirichlet Allocation (LDA) +* Power Iteration Clustering (PIC) + #### Collaborative Filtering * Alternating Least Squares (ALS) @@ -982,6 +984,18 @@ predicted <- predict(model, df) head(predicted) ``` +#### Power Iteration Clustering + +Power Iteration Clustering (PIC) is a scalable graph clustering algorithm. `spark.assignClusters` method runs the PIC algorithm and returns a cluster assignment for each input vertex. + +```{r} +df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0), + list(1L, 2L, 1.0), list(3L, 4L, 1.0), + list(4L, 0L, 0.1)), + schema = c("src", "dst", "weight")) +head(spark.assignClusters(df, initMode = "degree", weightCol = "weight")) +``` + #### FP-growth `spark.fpGrowth` executes FP-growth algorithm to mine frequent itemsets on a `SparkDataFrame`. `itemsCol` should be an array of values. diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 1186fb73d0..65f2652562 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -265,3 +265,44 @@ Refer to the [R API docs](api/R/spark.gaussianMixture.html) for more details. + +## Power Iteration Clustering (PIC) + +Power Iteration Clustering (PIC) is a scalable graph clustering algorithm +developed by [Lin and Cohen](http://www.cs.cmu.edu/~frank/papers/icml2010-pic-final.pdf). +From the abstract: PIC finds a very low-dimensional embedding of a dataset +using truncated power iteration on a normalized pair-wise similarity matrix of the data. + +`spark.ml`'s PowerIterationClustering implementation takes the following parameters: + +* `k`: the number of clusters to create +* `initMode`: param for the initialization algorithm +* `maxIter`: param for maximum number of iterations +* `srcCol`: param for the name of the input column for source vertex IDs +* `dstCol`: name of the input column for destination vertex IDs +* `weightCol`: Param for weight column name + +**Examples** + +
+ +
+Refer to the [Scala API docs](api/scala/index.html#org.apache.spark.ml.clustering.PowerIterationClustering) for more details. + +{% include_example scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala %} +
+ +
+Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/PowerIterationClustering.html) for more details. + +{% include_example java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java %} +
+ +
+ +Refer to the [R API docs](api/R/spark.powerIterationClustering.html) for more details. + +{% include_example r/ml/powerIterationClustering.R %} +
+ +
diff --git a/docs/sparkr.md b/docs/sparkr.md index 0057f05de0..dbb6124100 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -544,6 +544,7 @@ SparkR supports the following machine learning algorithms currently: * [`spark.gaussianMixture`](api/R/spark.gaussianMixture.html): [`Gaussian Mixture Model (GMM)`](ml-clustering.html#gaussian-mixture-model-gmm) * [`spark.kmeans`](api/R/spark.kmeans.html): [`K-Means`](ml-clustering.html#k-means) * [`spark.lda`](api/R/spark.lda.html): [`Latent Dirichlet Allocation (LDA)`](ml-clustering.html#latent-dirichlet-allocation-lda) +* [`spark.powerIterationClustering (PIC)`](api/R/spark.powerIterationClustering.html): [`Power Iteration Clustering (PIC)`](ml-clustering.html#power-iteration-clustering-pic) #### Collaborative Filtering diff --git a/examples/src/main/r/ml/powerIterationClustering.R b/examples/src/main/r/ml/powerIterationClustering.R new file mode 100644 index 0000000000..ba43037106 --- /dev/null +++ b/examples/src/main/r/ml/powerIterationClustering.R @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# To run this example use +# ./bin/spark-submit examples/src/main/r/ml/powerIterationClustering.R + +# Load SparkR library into your R session +library(SparkR) + +# Initialize SparkSession +sparkR.session(appName = "SparkR-ML-powerIterationCLustering-example") + +# $example on$ +df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0), + list(1L, 2L, 1.0), list(3L, 4L, 1.0), + list(4L, 0L, 0.1)), + schema = c("src", "dst", "weight")) +# assign clusters +clusters <- spark.assignClusters(df, k=2L, maxIter=20L, initMode="degree", weightCol="weight") + +showDF(arrange(clusters, clusters$id)) +# $example off$ + +sparkR.session.stop() diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 1b9a349994..d9a330f67e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -97,8 +97,8 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has /** * :: Experimental :: * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by - * Lin and Cohen. From the abstract: - * PIC finds a very low-dimensional embedding of a dataset using truncated power + * Lin and Cohen. From + * the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power * iteration on a normalized pair-wise similarity matrix of the data. * * This class is not yet an Estimator/Transformer, use `assignClusters` method to run the diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/PowerIterationClusteringWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/PowerIterationClusteringWrapper.scala new file mode 100644 index 0000000000..b5dfad0224 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/PowerIterationClusteringWrapper.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.spark.ml.clustering.PowerIterationClustering + +private[r] object PowerIterationClusteringWrapper { + def getPowerIterationClustering( + k: Int, + initMode: String, + maxIter: Int, + srcCol: String, + dstCol: String, + weightCol: String): PowerIterationClustering = { + val pic = new PowerIterationClustering() + .setK(k) + .setInitMode(initMode) + .setMaxIter(maxIter) + .setSrcCol(srcCol) + .setDstCol(dstCol) + if (weightCol != null) pic.setWeightCol(weightCol) + pic + } +} diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index d0b507ec5d..d8a6dfb7d3 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1193,8 +1193,8 @@ class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReada .. note:: Experimental Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by - `Lin and Cohen `_. From the abstract: - PIC finds a very low-dimensional embedding of a dataset using truncated power + `Lin and Cohen `_. From the + abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data. This class is not yet an Estimator/Transformer, use :py:func:`assignClusters` method