[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model

### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.

Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit

But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.

Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).

OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.

In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.

### Why are the changes needed?
Bug fix.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Unit test.

Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder

training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```

2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.

```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
    MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
    TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase

dataset = spark.createDataFrame(
    [(Vectors.dense([0.0]), 0.0),
     (Vectors.dense([0.4]), 1.0),
     (Vectors.dense([0.5]), 0.0),
     (Vectors.dense([0.6]), 1.0),
     (Vectors.dense([1.0]), 1.0)] * 10,
    ["features", "label"])

ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()

pipeline = Pipeline(stages=[ova])

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')

cvModel2 = CrossValidatorModel.load('/tmp/model002')
```

TrainValidationSplit testing code are similar so I do not paste them.

Closes #28279 from WeichenXu123/fix_pipeline_tuning.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
This commit is contained in:
Weichen Xu 2020-04-26 21:04:14 -07:00 committed by Xiangrui Meng
parent 91ec2eacfa
commit 4a21c4cc92
2 changed files with 189 additions and 3 deletions

View file

@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext
from pyspark.ml.base import Estimator, Model, Transformer from pyspark.ml.base import Estimator, Model, Transformer
from pyspark.ml.param import Param, Params from pyspark.ml.param import Param, Params
from pyspark.ml.util import * from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaParams from pyspark.ml.wrapper import JavaParams, JavaWrapper
from pyspark.ml.common import inherit_doc from pyspark.ml.common import inherit_doc, _java2py, _py2java
@inherit_doc @inherit_doc
@ -174,6 +174,55 @@ class Pipeline(Estimator, MLReadable, MLWritable):
return _java_obj return _java_obj
def _make_java_param_pair(self, param, value):
"""
Makes a Java param pair.
"""
sc = SparkContext._active_spark_context
param = self._resolveParam(param)
java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent, param.name, param.doc)
if isinstance(value, Params) and hasattr(value, "_to_java"):
# Convert JavaEstimator/JavaTransformer object or Estimator/Transformer object which
# implements `_to_java` method (such as OneVsRest, Pipeline object) to java object.
# used in the case of an estimator having another estimator as a parameter
# the reason why this is not in _py2java in common.py is that importing
# Estimator and Model in common.py results in a circular import with inherit_doc
java_value = value._to_java()
else:
java_value = _py2java(sc, value)
return java_param.w(java_value)
def _transfer_param_map_to_java(self, pyParamMap):
"""
Transforms a Python ParamMap into a Java ParamMap.
"""
paramMap = JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap")
for param in self.params:
if param in pyParamMap:
pair = self._make_java_param_pair(param, pyParamMap[param])
paramMap.put([pair])
return paramMap
def _transfer_param_map_from_java(self, javaParamMap):
"""
Transforms a Java ParamMap into a Python ParamMap.
"""
sc = SparkContext._active_spark_context
paramMap = dict()
for pair in javaParamMap.toList():
param = pair.param()
if self.hasParam(str(param.name())):
java_obj = pair.value()
if sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj):
# Note: JavaParams._from_java support both JavaEstimator/JavaTransformer class
# and Estimator/Transformer class which implements `_from_java` static method
# (such as OneVsRest, Pipeline class).
py_obj = JavaParams._from_java(java_obj)
else:
py_obj = _java2py(sc, java_obj)
paramMap[self.getParam(param.name())] = py_obj
return paramMap
@inherit_doc @inherit_doc
class PipelineWriter(MLWriter): class PipelineWriter(MLWriter):

View file

@ -18,7 +18,8 @@
import tempfile import tempfile
import unittest import unittest
from pyspark.ml import Estimator, Model from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml import Estimator, Pipeline, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \ from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator MulticlassClassificationEvaluator, RegressionEvaluator
@ -310,6 +311,75 @@ class CrossValidatorTests(SparkSessionTestCase):
loadedModel = CrossValidatorModel.load(cvModelPath) loadedModel = CrossValidatorModel.load(cvModelPath)
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
def test_save_load_pipeline_estimator(self):
temp_path = tempfile.mkdtemp()
training = self.spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(5)
lr2 = LogisticRegression().setMaxIter(10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(ova.classifier, [lr1, lr2]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
# test save/load of CrossValidatorModel
cvModelPath = temp_path + "/cvModel"
cvModel.save(cvModelPath)
loadedModel = CrossValidatorModel.load(cvModelPath)
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
self.assertEqual(len(loadedModel.bestModel.stages), len(cvModel.bestModel.stages))
for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
cvModel.bestModel.stages):
self.assertEqual(loadedStage.uid, originalStage.uid)
# Test nested pipeline
nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
crossval2 = CrossValidator(estimator=nested_pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel2 = crossval2.fit(training)
# test save/load of CrossValidatorModel
cvModelPath2 = temp_path + "/cvModel2"
cvModel2.save(cvModelPath2)
loadedModel2 = CrossValidatorModel.load(cvModelPath2)
self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid)
loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
original_nested_pipeline_model = cvModel2.bestModel.stages[1]
self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
self.assertEqual(len(loaded_nested_pipeline_model.stages),
len(original_nested_pipeline_model.stages))
for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
original_nested_pipeline_model.stages):
self.assertEqual(loadedStage.uid, originalStage.uid)
class TrainValidationSplitTests(SparkSessionTestCase): class TrainValidationSplitTests(SparkSessionTestCase):
@ -511,6 +581,73 @@ class TrainValidationSplitTests(SparkSessionTestCase):
loadedModel = TrainValidationSplitModel.load(tvsModelPath) loadedModel = TrainValidationSplitModel.load(tvsModelPath)
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
def test_save_load_pipeline_estimator(self):
temp_path = tempfile.mkdtemp()
training = self.spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(5)
lr2 = LogisticRegression().setMaxIter(10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(ova.classifier, [lr1, lr2]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
# Run train validation split, and choose the best set of parameters.
tvsModel = tvs.fit(training)
# test save/load of CrossValidatorModel
tvsModelPath = temp_path + "/tvsModel"
tvsModel.save(tvsModelPath)
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
self.assertEqual(len(loadedModel.bestModel.stages), len(tvsModel.bestModel.stages))
for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
tvsModel.bestModel.stages):
self.assertEqual(loadedStage.uid, originalStage.uid)
# Test nested pipeline
nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
tvs2 = TrainValidationSplit(estimator=nested_pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
# Run train validation split, and choose the best set of parameters.
tvsModel2 = tvs2.fit(training)
# test save/load of CrossValidatorModel
tvsModelPath2 = temp_path + "/tvsModel2"
tvsModel2.save(tvsModelPath2)
loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2)
self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid)
loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
original_nested_pipeline_model = tvsModel2.bestModel.stages[1]
self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
self.assertEqual(len(loaded_nested_pipeline_model.stages),
len(original_nested_pipeline_model.stages))
for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
original_nested_pipeline_model.stages):
self.assertEqual(loadedStage.uid, originalStage.uid)
def test_copy(self): def test_copy(self):
dataset = self.spark.createDataFrame([ dataset = self.spark.createDataFrame([
(10, 10.0), (10, 10.0),