[SPARK-32140][ML][PYSPARK] Add training summary to FMClassificationModel

### What changes were proposed in this pull request?
Add training summary for FMClassificationModel...
### Why are the changes needed?
so that user can get the training process status, such as loss value of each iteration and total iteration number.

### Does this PR introduce _any_ user-facing change?
Yes
FMClassificationModel.summary
FMClassificationModel.evaluate

### How was this patch tested?
new tests

Closes #28960 from huaxingao/fm_summary.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Huaxin Gao <huaxing@us.ibm.com>
This commit is contained in:
Huaxin Gao 2020-07-15 10:13:03 -07:00
parent cf22d947fb
commit b05f309bc9
7 changed files with 256 additions and 31 deletions

View file

@ -30,7 +30,7 @@ import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.{Vector => OldVector}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
/**
@ -212,14 +212,34 @@ class FMClassifier @Since("3.0.0") (
if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK)
val coefficients = trainImpl(data, numFeatures, LogisticLoss)
val (coefficients, objectiveHistory) = trainImpl(data, numFeatures, LogisticLoss)
val (intercept, linear, factors) = splitCoefficients(
coefficients, numFeatures, $(factorSize), $(fitIntercept), $(fitLinear))
if (handlePersistence) data.unpersist()
copyValues(new FMClassificationModel(uid, intercept, linear, factors))
createModel(dataset, intercept, linear, factors, objectiveHistory)
}
private def createModel(
dataset: Dataset[_],
intercept: Double,
linear: Vector,
factors: Matrix,
objectiveHistory: Array[Double]): FMClassificationModel = {
val model = copyValues(new FMClassificationModel(uid, intercept, linear, factors))
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)
val (summaryModel, probabilityColName, predictionColName) = model.findSummaryModel()
val summary = new FMClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
weightColName,
objectiveHistory)
model.setSummary(Some(summary))
}
@Since("3.0.0")
@ -243,7 +263,8 @@ class FMClassificationModel private[classification] (
@Since("3.0.0") val linear: Vector,
@Since("3.0.0") val factors: Matrix)
extends ProbabilisticClassificationModel[Vector, FMClassificationModel]
with FMClassifierParams with MLWritable {
with FMClassifierParams with MLWritable
with HasTrainingSummary[FMClassificationTrainingSummary]{
@Since("3.0.0")
override val numClasses: Int = 2
@ -251,6 +272,27 @@ class FMClassificationModel private[classification] (
@Since("3.0.0")
override val numFeatures: Int = linear.size
/**
* Gets summary of model on training set. An exception is thrown
* if `hasSummary` is false.
*/
@Since("3.1.0")
override def summary: FMClassificationTrainingSummary = super.summary
/**
* Evaluates the model on a test dataset.
*
* @param dataset Test dataset to evaluate model on.
*/
@Since("3.1.0")
def evaluate(dataset: Dataset[_]): FMClassificationSummary = {
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)
// Handle possible missing or invalid probability or prediction columns
val (summaryModel, probability, predictionColName) = findSummaryModel()
new FMClassificationSummaryImpl(summaryModel.transform(dataset),
probability, predictionColName, $(labelCol), weightColName)
}
@Since("3.0.0")
override def predictRaw(features: Vector): Vector = {
val rawPrediction = getRawPrediction(features, intercept, linear, factors)
@ -328,3 +370,53 @@ object FMClassificationModel extends MLReadable[FMClassificationModel] {
}
}
}
/**
* Abstraction for FMClassifier results for a given model.
*/
sealed trait FMClassificationSummary extends BinaryClassificationSummary
/**
* Abstraction for FMClassifier training results.
*/
sealed trait FMClassificationTrainingSummary extends FMClassificationSummary with TrainingSummary
/**
* FMClassifier results for a given model.
*
* @param predictions dataframe output by the model's `transform` method.
* @param scoreCol field in "predictions" which gives the probability of each instance.
* @param predictionCol field in "predictions" which gives the prediction for a data instance as a
* double.
* @param labelCol field in "predictions" which gives the true label of each instance.
* @param weightCol field in "predictions" which gives the weight of each instance.
*/
private class FMClassificationSummaryImpl(
@transient override val predictions: DataFrame,
override val scoreCol: String,
override val predictionCol: String,
override val labelCol: String,
override val weightCol: String)
extends FMClassificationSummary
/**
* FMClassifier training results.
*
* @param predictions dataframe output by the model's `transform` method.
* @param scoreCol field in "predictions" which gives the probability of each instance.
* @param predictionCol field in "predictions" which gives the prediction for a data instance as a
* double.
* @param labelCol field in "predictions" which gives the true label of each instance.
* @param weightCol field in "predictions" which gives the weight of each instance.
* @param objectiveHistory objective function (scaled loss + regularization) at each iteration.
*/
private class FMClassificationTrainingSummaryImpl(
predictions: DataFrame,
scoreCol: String,
predictionCol: String,
labelCol: String,
weightCol: String,
override val objectiveHistory: Array[Double])
extends FMClassificationSummaryImpl(
predictions, scoreCol, predictionCol, labelCol, weightCol)
with FMClassificationTrainingSummary

View file

@ -47,7 +47,7 @@ import org.apache.spark.storage.StorageLevel
*/
private[ml] trait FactorizationMachinesParams extends PredictorParams
with HasMaxIter with HasStepSize with HasTol with HasSolver with HasSeed
with HasFitIntercept with HasRegParam {
with HasFitIntercept with HasRegParam with HasWeightCol {
/**
* Param for dimensionality of the factors (&gt;= 0)
@ -134,7 +134,7 @@ private[ml] trait FactorizationMachines extends FactorizationMachinesParams {
data: RDD[(Double, OldVector)],
numFeatures: Int,
loss: String
): Vector = {
): (Vector, Array[Double]) = {
// initialize coefficients
val initialCoefficients = initCoefficients(numFeatures)
@ -151,8 +151,8 @@ private[ml] trait FactorizationMachines extends FactorizationMachinesParams {
.setRegParam($(regParam))
.setMiniBatchFraction($(miniBatchFraction))
.setConvergenceTol($(tol))
val coefficients = optimizer.optimize(data, initialCoefficients)
coefficients.asML
val (coefficients, lossHistory) = optimizer.optimizeWithLossReturned(data, initialCoefficients)
(coefficients.asML, lossHistory)
}
}
@ -421,7 +421,7 @@ class FMRegressor @Since("3.0.0") (
if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK)
val coefficients = trainImpl(data, numFeatures, SquaredError)
val (coefficients, _) = trainImpl(data, numFeatures, SquaredError)
val (intercept, linear, factors) = splitCoefficients(
coefficients, numFeatures, $(factorSize), $(fitIntercept), $(fitLinear))

View file

@ -129,7 +129,20 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va
* @return solution vector
*/
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
val (weights, _) = optimizeWithLossReturned(data, initialWeights)
weights
}
/**
* Runs gradient descent on the given training data.
* @param data training data
* @param initialWeights initial weights
* @return solution vector and loss value in an array
*/
def optimizeWithLossReturned(
data: RDD[(Double, Vector)],
initialWeights: Vector): (Vector, Array[Double]) = {
GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
@ -139,7 +152,6 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va
miniBatchFraction,
initialWeights,
convergenceTol)
weights
}
}
@ -195,7 +207,7 @@ object GradientDescent extends Logging {
s"numIterations=$numIterations and miniBatchFraction=$miniBatchFraction")
}
val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
val stochasticLossHistory = new ArrayBuffer[Double](numIterations + 1)
// Record previous weight and current one to calculate solution vector difference
var previousWeights: Option[Vector] = None
@ -226,7 +238,7 @@ object GradientDescent extends Logging {
var converged = false // indicates whether converged based on convergenceTol
var i = 1
while (!converged && i <= numIterations) {
while (!converged && (i <= numIterations + 1)) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
@ -249,17 +261,19 @@ object GradientDescent extends Logging {
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory += lossSum / miniBatchSize + regVal
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2
if (i != (numIterations + 1)) {
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2
previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
}
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
@ -271,7 +285,6 @@ object GradientDescent extends Logging {
stochasticLossHistory.takeRight(10).mkString(", ")))
(weights, stochasticLossHistory.toArray)
}
/**

View file

@ -136,7 +136,14 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
}
override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = LBFGS.runLBFGS(
val (weights, _) = optimizeWithLossReturned(data, initialWeights)
weights
}
def optimizeWithLossReturned(
data: RDD[(Double, Vector)],
initialWeights: Vector): (Vector, Array[Double]) = {
LBFGS.runLBFGS(
data,
gradient,
updater,
@ -145,9 +152,7 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
maxNumIterations,
regParam,
initialWeights)
weights
}
}
/**

View file

@ -194,6 +194,32 @@ class FMClassifierSuite extends MLTest with DefaultReadWriteTest {
testPredictionModelSinglePrediction(fmModel, smallBinaryDataset)
}
test("summary and training summary") {
val fm = new FMClassifier()
val model = fm.setMaxIter(5).fit(smallBinaryDataset)
val summary = model.evaluate(smallBinaryDataset)
assert(model.summary.accuracy === summary.accuracy)
assert(model.summary.weightedPrecision === summary.weightedPrecision)
assert(model.summary.weightedRecall === summary.weightedRecall)
assert(model.summary.pr.collect() === summary.pr.collect())
assert(model.summary.roc.collect() === summary.roc.collect())
assert(model.summary.areaUnderROC === summary.areaUnderROC)
}
test("FMClassifier training summary totalIterations") {
Seq(1, 5, 10, 20, 100).foreach { maxIter =>
val trainer = new FMClassifier().setMaxIter(maxIter)
val model = trainer.fit(smallBinaryDataset)
if (maxIter == 1) {
assert(model.summary.totalIterations === maxIter)
} else {
assert(model.summary.totalIterations <= maxIter)
}
}
}
test("read/write") {
def checkModelData(
model: FMClassificationModel,

View file

@ -52,7 +52,8 @@ __all__ = ['LinearSVC', 'LinearSVCModel',
'NaiveBayes', 'NaiveBayesModel',
'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel',
'OneVsRest', 'OneVsRestModel',
'FMClassifier', 'FMClassificationModel']
'FMClassifier', 'FMClassificationModel', 'FMClassificationSummary',
'FMClassificationTrainingSummary']
class _ClassifierParams(HasRawPredictionCol, _PredictorParams):
@ -3226,7 +3227,7 @@ class FMClassifier(_JavaProbabilisticClassifier, _FactorizationMachinesParams, J
class FMClassificationModel(_JavaProbabilisticClassificationModel, _FactorizationMachinesParams,
JavaMLWritable, JavaMLReadable):
JavaMLWritable, JavaMLReadable, HasTrainingSummary):
"""
Model fitted by :class:`FMClassifier`.
@ -3257,6 +3258,49 @@ class FMClassificationModel(_JavaProbabilisticClassificationModel, _Factorizatio
"""
return self._call_java("factors")
@since("3.1.0")
def summary(self):
"""
Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model
trained on the training set. An exception is thrown if `trainingSummary is None`.
"""
if self.hasSummary:
return FMClassificationTrainingSummary(super(FMClassificationModel, self).summary)
else:
raise RuntimeError("No training summary available for this %s" %
self.__class__.__name__)
@since("3.1.0")
def evaluate(self, dataset):
"""
Evaluates the model on a test dataset.
:param dataset:
Test dataset to evaluate model on, where dataset is an
instance of :py:class:`pyspark.sql.DataFrame`
"""
if not isinstance(dataset, DataFrame):
raise ValueError("dataset must be a DataFrame but got %s." % type(dataset))
java_fm_summary = self._call_java("evaluate", dataset)
return FMClassificationSummary(java_fm_summary)
class FMClassificationSummary(_BinaryClassificationSummary):
"""
Abstraction for FMClassifier Results for a given model.
.. versionadded:: 3.1.0
"""
pass
@inherit_doc
class FMClassificationTrainingSummary(FMClassificationSummary, _TrainingSummary):
"""
Abstraction for FMClassifier Training results.
.. versionadded:: 3.1.0
"""
pass
if __name__ == "__main__":
import doctest

View file

@ -18,8 +18,9 @@
import sys
import unittest
from pyspark.ml.classification import BinaryLogisticRegressionSummary, LinearSVC, \
LinearSVCSummary, BinaryRandomForestClassificationSummary, LogisticRegression, \
from pyspark.ml.classification import BinaryLogisticRegressionSummary, FMClassifier, \
FMClassificationSummary, LinearSVC, LinearSVCSummary, \
BinaryRandomForestClassificationSummary, LogisticRegression, \
LogisticRegressionSummary, RandomForestClassificationSummary, \
RandomForestClassifier
from pyspark.ml.clustering import BisectingKMeans, GaussianMixture, KMeans
@ -309,6 +310,50 @@ class TrainingSummaryTest(SparkSessionTestCase):
self.assertFalse(isinstance(sameSummary, BinaryRandomForestClassificationSummary))
self.assertAlmostEqual(sameSummary.accuracy, s.accuracy)
def test_fm_classification_summary(self):
df = self.spark.createDataFrame([(1.0, Vectors.dense(2.0)),
(0.0, Vectors.dense(2.0)),
(0.0, Vectors.dense(6.0)),
(1.0, Vectors.dense(3.0))
],
["label", "features"])
fm = FMClassifier(maxIter=5)
model = fm.fit(df)
self.assertTrue(model.hasSummary)
s = model.summary()
# test that api is callable and returns expected types
self.assertTrue(isinstance(s.predictions, DataFrame))
self.assertEqual(s.scoreCol, "probability")
self.assertEqual(s.labelCol, "label")
self.assertEqual(s.predictionCol, "prediction")
objHist = s.objectiveHistory
self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float))
self.assertGreater(s.totalIterations, 0)
self.assertTrue(isinstance(s.labels, list))
self.assertTrue(isinstance(s.truePositiveRateByLabel, list))
self.assertTrue(isinstance(s.falsePositiveRateByLabel, list))
self.assertTrue(isinstance(s.precisionByLabel, list))
self.assertTrue(isinstance(s.recallByLabel, list))
self.assertTrue(isinstance(s.fMeasureByLabel(), list))
self.assertTrue(isinstance(s.fMeasureByLabel(1.0), list))
self.assertTrue(isinstance(s.roc, DataFrame))
self.assertAlmostEqual(s.areaUnderROC, 0.625, 2)
self.assertTrue(isinstance(s.pr, DataFrame))
self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame))
self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
self.assertAlmostEqual(s.weightedTruePositiveRate, 0.75, 2)
self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.25, 2)
self.assertAlmostEqual(s.weightedRecall, 0.75, 2)
self.assertAlmostEqual(s.weightedPrecision, 0.8333333333333333, 2)
self.assertAlmostEqual(s.weightedFMeasure(), 0.7333333333333334, 2)
self.assertAlmostEqual(s.weightedFMeasure(1.0), 0.7333333333333334, 2)
# test evaluation (with training dataset) produces a summary with same values
# one check is enough to verify a summary is returned, Scala version runs full test
sameSummary = model.evaluate(df)
self.assertTrue(isinstance(sameSummary, FMClassificationSummary))
self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)
def test_gaussian_mixture_summary(self):
data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),),
(Vectors.sparse(1, [], []),)]