From 90b46e014a60069bd18754b02fce056d8f4d1b3e Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 15 Apr 2016 12:58:38 -0700 Subject: [PATCH] [SPARK-7861][ML] PySpark OneVsRest ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-7861 Add PySpark OneVsRest. I implement it with Python since it's a meta-pipeline. ## How was this patch tested? Test with doctest. Author: Xusen Yin Closes #12124 from yinxusen/SPARK-14306-7861. --- python/pyspark/ml/classification.py | 224 +++++++++++++++++++++++++++- python/pyspark/ml/tests.py | 32 +++- 2 files changed, 249 insertions(+), 7 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 7051798485..089316729c 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -15,18 +15,21 @@ # limitations under the License. # +import operator import warnings -from pyspark import since -from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper -from pyspark.ml.param import TypeConverters +from pyspark.ml import Estimator, Model from pyspark.ml.param.shared import * from pyspark.ml.regression import ( RandomForestParams, TreeEnsembleParams, DecisionTreeModel, TreeEnsembleModels) +from pyspark.ml.util import * +from pyspark.ml.wrapper import JavaEstimator, JavaModel +from pyspark.ml.wrapper import JavaWrapper from pyspark.mllib.common import inherit_doc from pyspark.sql import DataFrame - +from pyspark.sql.functions import udf, when +from pyspark.sql.types import ArrayType, DoubleType +from pyspark.storagelevel import StorageLevel __all__ = ['LogisticRegression', 'LogisticRegressionModel', 'LogisticRegressionSummary', 'LogisticRegressionTrainingSummary', @@ -35,7 +38,8 @@ __all__ = ['LogisticRegression', 'LogisticRegressionModel', 'GBTClassifier', 'GBTClassificationModel', 'RandomForestClassifier', 'RandomForestClassificationModel', 'NaiveBayes', 'NaiveBayesModel', - 'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel'] + 'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel', + 'OneVsRest', 'OneVsRestModel'] @inherit_doc @@ -1156,6 +1160,214 @@ class MultilayerPerceptronClassificationModel(JavaModel, JavaMLWritable, JavaMLR return self._call_java("weights") +@inherit_doc +class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol): + """ + Reduction of Multiclass Classification to Binary Classification. + Performs reduction using one against all strategy. + For a multiclass classification with k classes, train k models (one per class). + Each example is scored against all k models and the model with highest score + is picked to label the example. + + >>> from pyspark.sql import Row + >>> from pyspark.mllib.linalg import Vectors + >>> df = sc.parallelize([ + ... Row(label=0.0, features=Vectors.dense(1.0, 0.8)), + ... Row(label=1.0, features=Vectors.sparse(2, [], [])), + ... Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF() + >>> lr = LogisticRegression(maxIter=5, regParam=0.01) + >>> ovr = OneVsRest(classifier=lr) + >>> model = ovr.fit(df) + >>> [x.coefficients for x in model.models] + [DenseVector([3.3925, 1.8785]), DenseVector([-4.3016, -6.3163]), DenseVector([-4.5855, 6.1785])] + >>> [x.intercept for x in model.models] + [-3.6474708290602034, 2.5507881951814495, -1.1016513228162115] + >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF() + >>> model.transform(test0).head().prediction + 1.0 + >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF() + >>> model.transform(test1).head().prediction + 0.0 + >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF() + >>> model.transform(test2).head().prediction + 2.0 + + .. versionadded:: 2.0.0 + """ + + classifier = Param(Params._dummy(), "classifier", "base binary classifier") + + @keyword_only + def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", + classifier=None): + """ + __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + classifier=None) + """ + super(OneVsRest, self).__init__() + kwargs = self.__init__._input_kwargs + self._set(**kwargs) + + @keyword_only + @since("2.0.0") + def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): + """ + setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): + Sets params for OneVsRest. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + @since("2.0.0") + def setClassifier(self, value): + """ + Sets the value of :py:attr:`classifier`. + + .. note:: Only LogisticRegression and NaiveBayes are supported now. + """ + self._set(classifier=value) + return self + + @since("2.0.0") + def getClassifier(self): + """ + Gets the value of classifier or its default value. + """ + return self.getOrDefault(self.classifier) + + def _fit(self, dataset): + labelCol = self.getLabelCol() + featuresCol = self.getFeaturesCol() + predictionCol = self.getPredictionCol() + classifier = self.getClassifier() + assert isinstance(classifier, HasRawPredictionCol),\ + "Classifier %s doesn't extend from HasRawPredictionCol." % type(classifier) + + numClasses = int(dataset.agg({labelCol: "max"}).head()["max("+labelCol+")"]) + 1 + + multiclassLabeled = dataset.select(labelCol, featuresCol) + + # persist if underlying dataset is not persistent. + handlePersistence = \ + dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False) + if handlePersistence: + multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) + + def trainSingleClass(index): + binaryLabelCol = "mc2b$" + str(index) + trainingDataset = multiclassLabeled.withColumn( + binaryLabelCol, + when(multiclassLabeled[labelCol] == float(index), 1.0).otherwise(0.0)) + paramMap = dict([(classifier.labelCol, binaryLabelCol), + (classifier.featuresCol, featuresCol), + (classifier.predictionCol, predictionCol)]) + return classifier.fit(trainingDataset, paramMap) + + # TODO: Parallel training for all classes. + models = [trainSingleClass(i) for i in range(numClasses)] + + if handlePersistence: + multiclassLabeled.unpersist() + + return self._copyValues(OneVsRestModel(models=models)) + + @since("2.0.0") + def copy(self, extra=None): + """ + Creates a copy of this instance with a randomly generated uid + and some extra params. This creates a deep copy of the embedded paramMap, + and copies the embedded and extra parameters over. + + :param extra: Extra parameters to copy to the new instance + :return: Copy of this instance + """ + if extra is None: + extra = dict() + newOvr = Params.copy(self, extra) + if self.isSet(self.classifier): + newOvr.setClassifier(self.getClassifier().copy(extra)) + return newOvr + + +class OneVsRestModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol): + """ + Model fitted by OneVsRest. + This stores the models resulting from training k binary classifiers: one for each class. + Each example is scored against all k models, and the model with the highest score + is picked to label the example. + + .. versionadded:: 2.0.0 + """ + + def __init__(self, models): + super(OneVsRestModel, self).__init__() + self.models = models + + def _transform(self, dataset): + # determine the input columns: these need to be passed through + origCols = dataset.columns + + # add an accumulator column to store predictions of all the models + accColName = "mbc$acc" + str(uuid.uuid4()) + initUDF = udf(lambda _: [], ArrayType(DoubleType())) + newDataset = dataset.withColumn(accColName, initUDF(dataset[origCols[0]])) + + # persist if underlying dataset is not persistent. + handlePersistence = \ + dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False) + if handlePersistence: + newDataset.persist(StorageLevel.MEMORY_AND_DISK) + + # update the accumulator column with the result of prediction of models + aggregatedDataset = newDataset + for index, model in enumerate(self.models): + rawPredictionCol = model._call_java("getRawPredictionCol") + columns = origCols + [rawPredictionCol, accColName] + + # add temporary column to store intermediate scores and update + tmpColName = "mbc$tmp" + str(uuid.uuid4()) + updateUDF = udf( + lambda predictions, prediction: predictions + [prediction.tolist()[1]], + ArrayType(DoubleType())) + transformedDataset = model.transform(aggregatedDataset).select(*columns) + updatedDataset = transformedDataset.withColumn( + tmpColName, + updateUDF(transformedDataset[accColName], transformedDataset[rawPredictionCol])) + newColumns = origCols + [tmpColName] + + # switch out the intermediate column with the accumulator column + aggregatedDataset = updatedDataset\ + .select(*newColumns).withColumnRenamed(tmpColName, accColName) + + if handlePersistence: + newDataset.unpersist() + + # output the index of the classifier with highest confidence as prediction + labelUDF = udf( + lambda predictions: float(max(enumerate(predictions), key=operator.itemgetter(1))[0]), + DoubleType()) + + # output label and label metadata as prediction + return aggregatedDataset.withColumn( + self.getPredictionCol(), labelUDF(aggregatedDataset[accColName])).drop(accColName) + + @since("2.0.0") + def copy(self, extra=None): + """ + Creates a copy of this instance with a randomly generated uid + and some extra params. This creates a deep copy of the embedded paramMap, + and copies the embedded and extra parameters over. + + :param extra: Extra parameters to copy to the new instance + :return: Copy of this instance + """ + if extra is None: + extra = dict() + newModel = Params.copy(self, extra) + newModel.models = [model.copy(extra) for model in self.models] + return newModel + + if __name__ == "__main__": import doctest import pyspark.ml.classification diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 85ad949c5a..d595eff5b4 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -43,7 +43,7 @@ import tempfile import numpy as np from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer -from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier +from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, OneVsRest from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator from pyspark.ml.feature import * @@ -850,6 +850,36 @@ class TrainingSummaryTest(PySparkTestCase): self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) +class OneVsRestTests(PySparkTestCase): + + def test_copy(self): + sqlContext = SQLContext(self.sc) + df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + lr = LogisticRegression(maxIter=5, regParam=0.01) + ovr = OneVsRest(classifier=lr) + ovr1 = ovr.copy({lr.maxIter: 10}) + self.assertEqual(ovr.getClassifier().getMaxIter(), 5) + self.assertEqual(ovr1.getClassifier().getMaxIter(), 10) + model = ovr.fit(df) + model1 = model.copy({model.predictionCol: "indexed"}) + self.assertEqual(model1.getPredictionCol(), "indexed") + + def test_output_columns(self): + sqlContext = SQLContext(self.sc) + df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + lr = LogisticRegression(maxIter=5, regParam=0.01) + ovr = OneVsRest(classifier=lr) + model = ovr.fit(df) + output = model.transform(df) + self.assertEqual(output.columns, ["label", "features", "prediction"]) + + class HashingTFTest(PySparkTestCase): def test_apply_binary_term_freqs(self):