diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala index 6ef42500f8..cc691d1c0c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala @@ -30,7 +30,7 @@ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.linalg.{Vector => OldVector} import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql._ import org.apache.spark.storage.StorageLevel /** @@ -212,14 +212,34 @@ class FMClassifier @Since("3.0.0") ( if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK) - val coefficients = trainImpl(data, numFeatures, LogisticLoss) + val (coefficients, objectiveHistory) = trainImpl(data, numFeatures, LogisticLoss) val (intercept, linear, factors) = splitCoefficients( coefficients, numFeatures, $(factorSize), $(fitIntercept), $(fitLinear)) if (handlePersistence) data.unpersist() - copyValues(new FMClassificationModel(uid, intercept, linear, factors)) + createModel(dataset, intercept, linear, factors, objectiveHistory) + } + + private def createModel( + dataset: Dataset[_], + intercept: Double, + linear: Vector, + factors: Matrix, + objectiveHistory: Array[Double]): FMClassificationModel = { + val model = copyValues(new FMClassificationModel(uid, intercept, linear, factors)) + val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol) + + val (summaryModel, probabilityColName, predictionColName) = model.findSummaryModel() + val summary = new FMClassificationTrainingSummaryImpl( + summaryModel.transform(dataset), + probabilityColName, + predictionColName, + $(labelCol), + weightColName, + objectiveHistory) + model.setSummary(Some(summary)) } @Since("3.0.0") @@ -243,7 +263,8 @@ class FMClassificationModel private[classification] ( @Since("3.0.0") val linear: Vector, @Since("3.0.0") val factors: Matrix) extends ProbabilisticClassificationModel[Vector, FMClassificationModel] - with FMClassifierParams with MLWritable { + with FMClassifierParams with MLWritable + with HasTrainingSummary[FMClassificationTrainingSummary]{ @Since("3.0.0") override val numClasses: Int = 2 @@ -251,6 +272,27 @@ class FMClassificationModel private[classification] ( @Since("3.0.0") override val numFeatures: Int = linear.size + /** + * Gets summary of model on training set. An exception is thrown + * if `hasSummary` is false. + */ + @Since("3.1.0") + override def summary: FMClassificationTrainingSummary = super.summary + + /** + * Evaluates the model on a test dataset. + * + * @param dataset Test dataset to evaluate model on. + */ + @Since("3.1.0") + def evaluate(dataset: Dataset[_]): FMClassificationSummary = { + val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol) + // Handle possible missing or invalid probability or prediction columns + val (summaryModel, probability, predictionColName) = findSummaryModel() + new FMClassificationSummaryImpl(summaryModel.transform(dataset), + probability, predictionColName, $(labelCol), weightColName) + } + @Since("3.0.0") override def predictRaw(features: Vector): Vector = { val rawPrediction = getRawPrediction(features, intercept, linear, factors) @@ -328,3 +370,53 @@ object FMClassificationModel extends MLReadable[FMClassificationModel] { } } } + +/** + * Abstraction for FMClassifier results for a given model. + */ +sealed trait FMClassificationSummary extends BinaryClassificationSummary + +/** + * Abstraction for FMClassifier training results. + */ +sealed trait FMClassificationTrainingSummary extends FMClassificationSummary with TrainingSummary + +/** + * FMClassifier results for a given model. + * + * @param predictions dataframe output by the model's `transform` method. + * @param scoreCol field in "predictions" which gives the probability of each instance. + * @param predictionCol field in "predictions" which gives the prediction for a data instance as a + * double. + * @param labelCol field in "predictions" which gives the true label of each instance. + * @param weightCol field in "predictions" which gives the weight of each instance. + */ +private class FMClassificationSummaryImpl( + @transient override val predictions: DataFrame, + override val scoreCol: String, + override val predictionCol: String, + override val labelCol: String, + override val weightCol: String) + extends FMClassificationSummary + +/** + * FMClassifier training results. + * + * @param predictions dataframe output by the model's `transform` method. + * @param scoreCol field in "predictions" which gives the probability of each instance. + * @param predictionCol field in "predictions" which gives the prediction for a data instance as a + * double. + * @param labelCol field in "predictions" which gives the true label of each instance. + * @param weightCol field in "predictions" which gives the weight of each instance. + * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. + */ +private class FMClassificationTrainingSummaryImpl( + predictions: DataFrame, + scoreCol: String, + predictionCol: String, + labelCol: String, + weightCol: String, + override val objectiveHistory: Array[Double]) + extends FMClassificationSummaryImpl( + predictions, scoreCol, predictionCol, labelCol, weightCol) + with FMClassificationTrainingSummary diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala index b9307ebb37..84c0985245 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala @@ -47,7 +47,7 @@ import org.apache.spark.storage.StorageLevel */ private[ml] trait FactorizationMachinesParams extends PredictorParams with HasMaxIter with HasStepSize with HasTol with HasSolver with HasSeed - with HasFitIntercept with HasRegParam { + with HasFitIntercept with HasRegParam with HasWeightCol { /** * Param for dimensionality of the factors (>= 0) @@ -134,7 +134,7 @@ private[ml] trait FactorizationMachines extends FactorizationMachinesParams { data: RDD[(Double, OldVector)], numFeatures: Int, loss: String - ): Vector = { + ): (Vector, Array[Double]) = { // initialize coefficients val initialCoefficients = initCoefficients(numFeatures) @@ -151,8 +151,8 @@ private[ml] trait FactorizationMachines extends FactorizationMachinesParams { .setRegParam($(regParam)) .setMiniBatchFraction($(miniBatchFraction)) .setConvergenceTol($(tol)) - val coefficients = optimizer.optimize(data, initialCoefficients) - coefficients.asML + val (coefficients, lossHistory) = optimizer.optimizeWithLossReturned(data, initialCoefficients) + (coefficients.asML, lossHistory) } } @@ -421,7 +421,7 @@ class FMRegressor @Since("3.0.0") ( if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK) - val coefficients = trainImpl(data, numFeatures, SquaredError) + val (coefficients, _) = trainImpl(data, numFeatures, SquaredError) val (intercept, linear, factors) = splitCoefficients( coefficients, numFeatures, $(factorSize), $(fitIntercept), $(fitLinear)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 1336ffd2f7..796a787e77 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -129,7 +129,20 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va * @return solution vector */ def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { - val (weights, _) = GradientDescent.runMiniBatchSGD( + val (weights, _) = optimizeWithLossReturned(data, initialWeights) + weights + } + + /** + * Runs gradient descent on the given training data. + * @param data training data + * @param initialWeights initial weights + * @return solution vector and loss value in an array + */ + def optimizeWithLossReturned( + data: RDD[(Double, Vector)], + initialWeights: Vector): (Vector, Array[Double]) = { + GradientDescent.runMiniBatchSGD( data, gradient, updater, @@ -139,7 +152,6 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va miniBatchFraction, initialWeights, convergenceTol) - weights } } @@ -195,7 +207,7 @@ object GradientDescent extends Logging { s"numIterations=$numIterations and miniBatchFraction=$miniBatchFraction") } - val stochasticLossHistory = new ArrayBuffer[Double](numIterations) + val stochasticLossHistory = new ArrayBuffer[Double](numIterations + 1) // Record previous weight and current one to calculate solution vector difference var previousWeights: Option[Vector] = None @@ -226,7 +238,7 @@ object GradientDescent extends Logging { var converged = false // indicates whether converged based on convergenceTol var i = 1 - while (!converged && i <= numIterations) { + while (!converged && (i <= numIterations + 1)) { val bcWeights = data.context.broadcast(weights) // Sample a subset (fraction miniBatchFraction) of the total data // compute and sum up the subgradients on this subset (this is one map-reduce) @@ -249,17 +261,19 @@ object GradientDescent extends Logging { * and regVal is the regularization value computed in the previous iteration as well. */ stochasticLossHistory += lossSum / miniBatchSize + regVal - val update = updater.compute( - weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), - stepSize, i, regParam) - weights = update._1 - regVal = update._2 + if (i != (numIterations + 1)) { + val update = updater.compute( + weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), + stepSize, i, regParam) + weights = update._1 + regVal = update._2 - previousWeights = currentWeights - currentWeights = Some(weights) - if (previousWeights != None && currentWeights != None) { - converged = isConverged(previousWeights.get, - currentWeights.get, convergenceTol) + previousWeights = currentWeights + currentWeights = Some(weights) + if (previousWeights != None && currentWeights != None) { + converged = isConverged(previousWeights.get, + currentWeights.get, convergenceTol) + } } } else { logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero") @@ -271,7 +285,6 @@ object GradientDescent extends Logging { stochasticLossHistory.takeRight(10).mkString(", "))) (weights, stochasticLossHistory.toArray) - } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 1ee9241104..4fc297560c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -136,7 +136,14 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater) } override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { - val (weights, _) = LBFGS.runLBFGS( + val (weights, _) = optimizeWithLossReturned(data, initialWeights) + weights + } + + def optimizeWithLossReturned( + data: RDD[(Double, Vector)], + initialWeights: Vector): (Vector, Array[Double]) = { + LBFGS.runLBFGS( data, gradient, updater, @@ -145,9 +152,7 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater) maxNumIterations, regParam, initialWeights) - weights } - } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala index d477049824..9a04bdc397 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala @@ -194,6 +194,32 @@ class FMClassifierSuite extends MLTest with DefaultReadWriteTest { testPredictionModelSinglePrediction(fmModel, smallBinaryDataset) } + test("summary and training summary") { + val fm = new FMClassifier() + val model = fm.setMaxIter(5).fit(smallBinaryDataset) + + val summary = model.evaluate(smallBinaryDataset) + + assert(model.summary.accuracy === summary.accuracy) + assert(model.summary.weightedPrecision === summary.weightedPrecision) + assert(model.summary.weightedRecall === summary.weightedRecall) + assert(model.summary.pr.collect() === summary.pr.collect()) + assert(model.summary.roc.collect() === summary.roc.collect()) + assert(model.summary.areaUnderROC === summary.areaUnderROC) + } + + test("FMClassifier training summary totalIterations") { + Seq(1, 5, 10, 20, 100).foreach { maxIter => + val trainer = new FMClassifier().setMaxIter(maxIter) + val model = trainer.fit(smallBinaryDataset) + if (maxIter == 1) { + assert(model.summary.totalIterations === maxIter) + } else { + assert(model.summary.totalIterations <= maxIter) + } + } + } + test("read/write") { def checkModelData( model: FMClassificationModel, diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 7c8cbe3a9f..4f2d33adbc 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -52,7 +52,8 @@ __all__ = ['LinearSVC', 'LinearSVCModel', 'NaiveBayes', 'NaiveBayesModel', 'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel', 'OneVsRest', 'OneVsRestModel', - 'FMClassifier', 'FMClassificationModel'] + 'FMClassifier', 'FMClassificationModel', 'FMClassificationSummary', + 'FMClassificationTrainingSummary'] class _ClassifierParams(HasRawPredictionCol, _PredictorParams): @@ -3226,7 +3227,7 @@ class FMClassifier(_JavaProbabilisticClassifier, _FactorizationMachinesParams, J class FMClassificationModel(_JavaProbabilisticClassificationModel, _FactorizationMachinesParams, - JavaMLWritable, JavaMLReadable): + JavaMLWritable, JavaMLReadable, HasTrainingSummary): """ Model fitted by :class:`FMClassifier`. @@ -3257,6 +3258,49 @@ class FMClassificationModel(_JavaProbabilisticClassificationModel, _Factorizatio """ return self._call_java("factors") + @since("3.1.0") + def summary(self): + """ + Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model + trained on the training set. An exception is thrown if `trainingSummary is None`. + """ + if self.hasSummary: + return FMClassificationTrainingSummary(super(FMClassificationModel, self).summary) + else: + raise RuntimeError("No training summary available for this %s" % + self.__class__.__name__) + + @since("3.1.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + + :param dataset: + Test dataset to evaluate model on, where dataset is an + instance of :py:class:`pyspark.sql.DataFrame` + """ + if not isinstance(dataset, DataFrame): + raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) + java_fm_summary = self._call_java("evaluate", dataset) + return FMClassificationSummary(java_fm_summary) + + +class FMClassificationSummary(_BinaryClassificationSummary): + """ + Abstraction for FMClassifier Results for a given model. + .. versionadded:: 3.1.0 + """ + pass + + +@inherit_doc +class FMClassificationTrainingSummary(FMClassificationSummary, _TrainingSummary): + """ + Abstraction for FMClassifier Training results. + .. versionadded:: 3.1.0 + """ + pass + if __name__ == "__main__": import doctest diff --git a/python/pyspark/ml/tests/test_training_summary.py b/python/pyspark/ml/tests/test_training_summary.py index 15e9ebb0f5..d305be8b96 100644 --- a/python/pyspark/ml/tests/test_training_summary.py +++ b/python/pyspark/ml/tests/test_training_summary.py @@ -18,8 +18,9 @@ import sys import unittest -from pyspark.ml.classification import BinaryLogisticRegressionSummary, LinearSVC, \ - LinearSVCSummary, BinaryRandomForestClassificationSummary, LogisticRegression, \ +from pyspark.ml.classification import BinaryLogisticRegressionSummary, FMClassifier, \ + FMClassificationSummary, LinearSVC, LinearSVCSummary, \ + BinaryRandomForestClassificationSummary, LogisticRegression, \ LogisticRegressionSummary, RandomForestClassificationSummary, \ RandomForestClassifier from pyspark.ml.clustering import BisectingKMeans, GaussianMixture, KMeans @@ -309,6 +310,50 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertFalse(isinstance(sameSummary, BinaryRandomForestClassificationSummary)) self.assertAlmostEqual(sameSummary.accuracy, s.accuracy) + def test_fm_classification_summary(self): + df = self.spark.createDataFrame([(1.0, Vectors.dense(2.0)), + (0.0, Vectors.dense(2.0)), + (0.0, Vectors.dense(6.0)), + (1.0, Vectors.dense(3.0)) + ], + ["label", "features"]) + fm = FMClassifier(maxIter=5) + model = fm.fit(df) + self.assertTrue(model.hasSummary) + s = model.summary() + # test that api is callable and returns expected types + self.assertTrue(isinstance(s.predictions, DataFrame)) + self.assertEqual(s.scoreCol, "probability") + self.assertEqual(s.labelCol, "label") + self.assertEqual(s.predictionCol, "prediction") + objHist = s.objectiveHistory + self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float)) + self.assertGreater(s.totalIterations, 0) + self.assertTrue(isinstance(s.labels, list)) + self.assertTrue(isinstance(s.truePositiveRateByLabel, list)) + self.assertTrue(isinstance(s.falsePositiveRateByLabel, list)) + self.assertTrue(isinstance(s.precisionByLabel, list)) + self.assertTrue(isinstance(s.recallByLabel, list)) + self.assertTrue(isinstance(s.fMeasureByLabel(), list)) + self.assertTrue(isinstance(s.fMeasureByLabel(1.0), list)) + self.assertTrue(isinstance(s.roc, DataFrame)) + self.assertAlmostEqual(s.areaUnderROC, 0.625, 2) + self.assertTrue(isinstance(s.pr, DataFrame)) + self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) + self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) + self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + self.assertAlmostEqual(s.weightedTruePositiveRate, 0.75, 2) + self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.25, 2) + self.assertAlmostEqual(s.weightedRecall, 0.75, 2) + self.assertAlmostEqual(s.weightedPrecision, 0.8333333333333333, 2) + self.assertAlmostEqual(s.weightedFMeasure(), 0.7333333333333334, 2) + self.assertAlmostEqual(s.weightedFMeasure(1.0), 0.7333333333333334, 2) + # test evaluation (with training dataset) produces a summary with same values + # one check is enough to verify a summary is returned, Scala version runs full test + sameSummary = model.evaluate(df) + self.assertTrue(isinstance(sameSummary, FMClassificationSummary)) + self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) + def test_gaussian_mixture_summary(self): data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),), (Vectors.sparse(1, [], []),)]