2018-11-18 03:02:15 -05:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
|
|
|
import tempfile
|
2018-11-18 20:22:32 -05:00
|
|
|
import unittest
|
2018-11-18 03:02:15 -05:00
|
|
|
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
from pyspark.ml.feature import HashingTF, Tokenizer
|
|
|
|
from pyspark.ml import Estimator, Pipeline, Model
|
2018-11-18 03:02:15 -05:00
|
|
|
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
|
|
|
|
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
|
|
|
|
MulticlassClassificationEvaluator, RegressionEvaluator
|
|
|
|
from pyspark.ml.linalg import Vectors
|
|
|
|
from pyspark.ml.param import Param, Params
|
|
|
|
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
|
|
|
|
TrainValidationSplit, TrainValidationSplitModel
|
|
|
|
from pyspark.sql.functions import rand
|
|
|
|
from pyspark.testing.mlutils import SparkSessionTestCase
|
|
|
|
|
|
|
|
|
|
|
|
class HasInducedError(Params):
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
super(HasInducedError, self).__init__()
|
|
|
|
self.inducedError = Param(self, "inducedError",
|
|
|
|
"Uniformly-distributed error added to feature")
|
|
|
|
|
|
|
|
def getInducedError(self):
|
|
|
|
return self.getOrDefault(self.inducedError)
|
|
|
|
|
|
|
|
|
|
|
|
class InducedErrorModel(Model, HasInducedError):
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
super(InducedErrorModel, self).__init__()
|
|
|
|
|
|
|
|
def _transform(self, dataset):
|
|
|
|
return dataset.withColumn("prediction",
|
|
|
|
dataset.feature + (rand(0) * self.getInducedError()))
|
|
|
|
|
|
|
|
|
|
|
|
class InducedErrorEstimator(Estimator, HasInducedError):
|
|
|
|
|
|
|
|
def __init__(self, inducedError=1.0):
|
|
|
|
super(InducedErrorEstimator, self).__init__()
|
|
|
|
self._set(inducedError=inducedError)
|
|
|
|
|
|
|
|
def _fit(self, dataset):
|
|
|
|
model = InducedErrorModel()
|
|
|
|
self._copyValues(model)
|
|
|
|
return model
|
|
|
|
|
|
|
|
|
2019-11-19 17:15:00 -05:00
|
|
|
class ParamGridBuilderTests(SparkSessionTestCase):
|
|
|
|
|
|
|
|
def test_addGrid(self):
|
|
|
|
with self.assertRaises(TypeError):
|
|
|
|
grid = (ParamGridBuilder()
|
|
|
|
.addGrid("must be an instance of Param", ["not", "string"])
|
|
|
|
.build())
|
|
|
|
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
class CrossValidatorTests(SparkSessionTestCase):
|
|
|
|
|
|
|
|
def test_copy(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="rmse")
|
|
|
|
|
|
|
|
grid = (ParamGridBuilder()
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
|
|
|
|
.build())
|
|
|
|
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvCopied = cv.copy()
|
|
|
|
self.assertEqual(cv.getEstimator().uid, cvCopied.getEstimator().uid)
|
|
|
|
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
cvModelCopied = cvModel.copy()
|
|
|
|
for index in range(len(cvModel.avgMetrics)):
|
|
|
|
self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index])
|
|
|
|
< 0.0001)
|
|
|
|
|
|
|
|
def test_fit_minimize_metric(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="rmse")
|
|
|
|
|
|
|
|
grid = (ParamGridBuilder()
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
|
|
|
|
.build())
|
|
|
|
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
bestModel = cvModel.bestModel
|
|
|
|
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
|
|
|
|
|
|
|
|
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
|
|
|
|
"Best model should have zero induced error")
|
|
|
|
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
|
|
|
|
|
|
|
|
def test_fit_maximize_metric(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="r2")
|
|
|
|
|
|
|
|
grid = (ParamGridBuilder()
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
|
|
|
|
.build())
|
|
|
|
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
bestModel = cvModel.bestModel
|
|
|
|
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
|
|
|
|
|
|
|
|
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
|
|
|
|
"Best model should have zero induced error")
|
|
|
|
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
|
|
|
|
|
|
|
|
def test_param_grid_type_coercion(self):
|
|
|
|
lr = LogisticRegression(maxIter=10)
|
|
|
|
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.5, 1]).build()
|
|
|
|
for param in paramGrid:
|
|
|
|
for v in param.values():
|
|
|
|
assert(type(v) == float)
|
|
|
|
|
|
|
|
def test_save_load_trained_model(self):
|
|
|
|
# This tests saving and loading the trained model only.
|
|
|
|
# Save/load for CrossValidator will be added later: SPARK-13786
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
lrModel = cvModel.bestModel
|
|
|
|
|
|
|
|
cvModelPath = temp_path + "/cvModel"
|
|
|
|
lrModel.save(cvModelPath)
|
|
|
|
loadedLrModel = LogisticRegressionModel.load(cvModelPath)
|
|
|
|
self.assertEqual(loadedLrModel.uid, lrModel.uid)
|
|
|
|
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
|
|
|
|
|
|
|
|
def test_save_load_simple_estimator(self):
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
|
|
|
|
# test save/load of CrossValidator
|
|
|
|
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
cvPath = temp_path + "/cv"
|
|
|
|
cv.save(cvPath)
|
|
|
|
loadedCV = CrossValidator.load(cvPath)
|
|
|
|
self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
|
|
|
|
self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
|
|
|
|
self.assertEqual(loadedCV.getEstimatorParamMaps(), cv.getEstimatorParamMaps())
|
|
|
|
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
cvModelPath = temp_path + "/cvModel"
|
|
|
|
cvModel.save(cvModelPath)
|
|
|
|
loadedModel = CrossValidatorModel.load(cvModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
|
|
|
|
|
|
|
|
def test_parallel_evaluation(self):
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
|
|
|
|
# test save/load of CrossValidator
|
|
|
|
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cv.setParallelism(1)
|
|
|
|
cvSerialModel = cv.fit(dataset)
|
|
|
|
cv.setParallelism(2)
|
|
|
|
cvParallelModel = cv.fit(dataset)
|
|
|
|
self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
|
|
|
|
|
|
|
|
def test_expose_sub_models(self):
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
|
|
|
|
numFolds = 3
|
|
|
|
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
|
|
|
|
numFolds=numFolds, collectSubModels=True)
|
|
|
|
|
|
|
|
def checkSubModels(subModels):
|
|
|
|
self.assertEqual(len(subModels), numFolds)
|
|
|
|
for i in range(numFolds):
|
|
|
|
self.assertEqual(len(subModels[i]), len(grid))
|
|
|
|
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
checkSubModels(cvModel.subModels)
|
|
|
|
|
|
|
|
# Test the default value for option "persistSubModel" to be "true"
|
|
|
|
testSubPath = temp_path + "/testCrossValidatorSubModels"
|
|
|
|
savingPathWithSubModels = testSubPath + "cvModel3"
|
|
|
|
cvModel.save(savingPathWithSubModels)
|
|
|
|
cvModel3 = CrossValidatorModel.load(savingPathWithSubModels)
|
|
|
|
checkSubModels(cvModel3.subModels)
|
|
|
|
cvModel4 = cvModel3.copy()
|
|
|
|
checkSubModels(cvModel4.subModels)
|
|
|
|
|
|
|
|
savingPathWithoutSubModels = testSubPath + "cvModel2"
|
|
|
|
cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
|
|
|
|
cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels)
|
|
|
|
self.assertEqual(cvModel2.subModels, None)
|
|
|
|
|
|
|
|
for i in range(numFolds):
|
|
|
|
for j in range(len(grid)):
|
|
|
|
self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid)
|
|
|
|
|
|
|
|
def test_save_load_nested_estimator(self):
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
|
|
|
|
ova = OneVsRest(classifier=LogisticRegression())
|
|
|
|
lr1 = LogisticRegression().setMaxIter(100)
|
|
|
|
lr2 = LogisticRegression().setMaxIter(150)
|
|
|
|
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
|
|
|
|
evaluator = MulticlassClassificationEvaluator()
|
|
|
|
|
|
|
|
# test save/load of CrossValidator
|
|
|
|
cv = CrossValidator(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
cvPath = temp_path + "/cv"
|
|
|
|
cv.save(cvPath)
|
|
|
|
loadedCV = CrossValidator.load(cvPath)
|
|
|
|
self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
|
|
|
|
self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
|
|
|
|
|
|
|
|
originalParamMap = cv.getEstimatorParamMaps()
|
|
|
|
loadedParamMap = loadedCV.getEstimatorParamMaps()
|
|
|
|
for i, param in enumerate(loadedParamMap):
|
|
|
|
for p in param:
|
|
|
|
if p.name == "classifier":
|
|
|
|
self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
|
|
|
|
else:
|
|
|
|
self.assertEqual(param[p], originalParamMap[i][p])
|
|
|
|
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
cvModelPath = temp_path + "/cvModel"
|
|
|
|
cvModel.save(cvModelPath)
|
|
|
|
loadedModel = CrossValidatorModel.load(cvModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
|
|
|
|
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
def test_save_load_pipeline_estimator(self):
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
training = self.spark.createDataFrame([
|
|
|
|
(0, "a b c d e spark", 1.0),
|
|
|
|
(1, "b d", 0.0),
|
|
|
|
(2, "spark f g h", 1.0),
|
|
|
|
(3, "hadoop mapreduce", 0.0),
|
|
|
|
(4, "b spark who", 1.0),
|
|
|
|
(5, "g d a y", 0.0),
|
|
|
|
(6, "spark fly", 1.0),
|
|
|
|
(7, "was mapreduce", 0.0),
|
|
|
|
], ["id", "text", "label"])
|
|
|
|
|
|
|
|
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
|
|
|
|
tokenizer = Tokenizer(inputCol="text", outputCol="words")
|
|
|
|
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
|
|
|
|
|
|
|
|
ova = OneVsRest(classifier=LogisticRegression())
|
|
|
|
lr1 = LogisticRegression().setMaxIter(5)
|
|
|
|
lr2 = LogisticRegression().setMaxIter(10)
|
|
|
|
|
|
|
|
pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
|
|
|
|
|
|
|
|
paramGrid = ParamGridBuilder() \
|
|
|
|
.addGrid(hashingTF.numFeatures, [10, 100]) \
|
|
|
|
.addGrid(ova.classifier, [lr1, lr2]) \
|
|
|
|
.build()
|
|
|
|
|
|
|
|
crossval = CrossValidator(estimator=pipeline,
|
|
|
|
estimatorParamMaps=paramGrid,
|
|
|
|
evaluator=MulticlassClassificationEvaluator(),
|
|
|
|
numFolds=2) # use 3+ folds in practice
|
|
|
|
|
|
|
|
# Run cross-validation, and choose the best set of parameters.
|
|
|
|
cvModel = crossval.fit(training)
|
|
|
|
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
cvModelPath = temp_path + "/cvModel"
|
|
|
|
cvModel.save(cvModelPath)
|
|
|
|
loadedModel = CrossValidatorModel.load(cvModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
|
|
|
|
self.assertEqual(len(loadedModel.bestModel.stages), len(cvModel.bestModel.stages))
|
|
|
|
for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
|
|
|
|
cvModel.bestModel.stages):
|
|
|
|
self.assertEqual(loadedStage.uid, originalStage.uid)
|
|
|
|
|
|
|
|
# Test nested pipeline
|
|
|
|
nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
|
|
|
|
crossval2 = CrossValidator(estimator=nested_pipeline,
|
|
|
|
estimatorParamMaps=paramGrid,
|
|
|
|
evaluator=MulticlassClassificationEvaluator(),
|
|
|
|
numFolds=2) # use 3+ folds in practice
|
|
|
|
|
|
|
|
# Run cross-validation, and choose the best set of parameters.
|
|
|
|
cvModel2 = crossval2.fit(training)
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
cvModelPath2 = temp_path + "/cvModel2"
|
|
|
|
cvModel2.save(cvModelPath2)
|
|
|
|
loadedModel2 = CrossValidatorModel.load(cvModelPath2)
|
|
|
|
self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid)
|
|
|
|
loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
|
|
|
|
original_nested_pipeline_model = cvModel2.bestModel.stages[1]
|
|
|
|
self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
|
|
|
|
self.assertEqual(len(loaded_nested_pipeline_model.stages),
|
|
|
|
len(original_nested_pipeline_model.stages))
|
|
|
|
for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
|
|
|
|
original_nested_pipeline_model.stages):
|
|
|
|
self.assertEqual(loadedStage.uid, originalStage.uid)
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
class TrainValidationSplitTests(SparkSessionTestCase):
|
|
|
|
|
|
|
|
def test_fit_minimize_metric(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="rmse")
|
|
|
|
|
|
|
|
grid = ParamGridBuilder() \
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
|
|
|
|
.build()
|
|
|
|
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
bestModel = tvsModel.bestModel
|
|
|
|
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
|
|
|
|
validationMetrics = tvsModel.validationMetrics
|
|
|
|
|
|
|
|
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
|
|
|
|
"Best model should have zero induced error")
|
|
|
|
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
|
|
|
|
self.assertEqual(len(grid), len(validationMetrics),
|
|
|
|
"validationMetrics has the same size of grid parameter")
|
|
|
|
self.assertEqual(0.0, min(validationMetrics))
|
|
|
|
|
|
|
|
def test_fit_maximize_metric(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="r2")
|
|
|
|
|
|
|
|
grid = ParamGridBuilder() \
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
|
|
|
|
.build()
|
|
|
|
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
bestModel = tvsModel.bestModel
|
|
|
|
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
|
|
|
|
validationMetrics = tvsModel.validationMetrics
|
|
|
|
|
|
|
|
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
|
|
|
|
"Best model should have zero induced error")
|
|
|
|
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
|
|
|
|
self.assertEqual(len(grid), len(validationMetrics),
|
|
|
|
"validationMetrics has the same size of grid parameter")
|
|
|
|
self.assertEqual(1.0, max(validationMetrics))
|
|
|
|
|
|
|
|
def test_save_load_trained_model(self):
|
|
|
|
# This tests saving and loading the trained model only.
|
|
|
|
# Save/load for TrainValidationSplit will be added later: SPARK-13786
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
lrModel = tvsModel.bestModel
|
|
|
|
|
|
|
|
tvsModelPath = temp_path + "/tvsModel"
|
|
|
|
lrModel.save(tvsModelPath)
|
|
|
|
loadedLrModel = LogisticRegressionModel.load(tvsModelPath)
|
|
|
|
self.assertEqual(loadedLrModel.uid, lrModel.uid)
|
|
|
|
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
|
|
|
|
|
|
|
|
def test_save_load_simple_estimator(self):
|
|
|
|
# This tests saving and loading the trained model only.
|
|
|
|
# Save/load for TrainValidationSplit will be added later: SPARK-13786
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
|
|
|
|
tvsPath = temp_path + "/tvs"
|
|
|
|
tvs.save(tvsPath)
|
|
|
|
loadedTvs = TrainValidationSplit.load(tvsPath)
|
|
|
|
self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
|
|
|
|
self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
|
|
|
|
self.assertEqual(loadedTvs.getEstimatorParamMaps(), tvs.getEstimatorParamMaps())
|
|
|
|
|
|
|
|
tvsModelPath = temp_path + "/tvsModel"
|
|
|
|
tvsModel.save(tvsModelPath)
|
|
|
|
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
|
|
|
|
|
|
|
|
def test_parallel_evaluation(self):
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvs.setParallelism(1)
|
|
|
|
tvsSerialModel = tvs.fit(dataset)
|
|
|
|
tvs.setParallelism(2)
|
|
|
|
tvsParallelModel = tvs.fit(dataset)
|
|
|
|
self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)
|
|
|
|
|
|
|
|
def test_expose_sub_models(self):
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
|
|
|
|
collectSubModels=True)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
self.assertEqual(len(tvsModel.subModels), len(grid))
|
|
|
|
|
|
|
|
# Test the default value for option "persistSubModel" to be "true"
|
|
|
|
testSubPath = temp_path + "/testTrainValidationSplitSubModels"
|
|
|
|
savingPathWithSubModels = testSubPath + "cvModel3"
|
|
|
|
tvsModel.save(savingPathWithSubModels)
|
|
|
|
tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels)
|
|
|
|
self.assertEqual(len(tvsModel3.subModels), len(grid))
|
|
|
|
tvsModel4 = tvsModel3.copy()
|
|
|
|
self.assertEqual(len(tvsModel4.subModels), len(grid))
|
|
|
|
|
|
|
|
savingPathWithoutSubModels = testSubPath + "cvModel2"
|
|
|
|
tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
|
|
|
|
tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels)
|
|
|
|
self.assertEqual(tvsModel2.subModels, None)
|
|
|
|
|
|
|
|
for i in range(len(grid)):
|
|
|
|
self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid)
|
|
|
|
|
|
|
|
def test_save_load_nested_estimator(self):
|
|
|
|
# This tests saving and loading the trained model only.
|
|
|
|
# Save/load for TrainValidationSplit will be added later: SPARK-13786
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
ova = OneVsRest(classifier=LogisticRegression())
|
|
|
|
lr1 = LogisticRegression().setMaxIter(100)
|
|
|
|
lr2 = LogisticRegression().setMaxIter(150)
|
|
|
|
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
|
|
|
|
evaluator = MulticlassClassificationEvaluator()
|
|
|
|
|
|
|
|
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
tvsPath = temp_path + "/tvs"
|
|
|
|
tvs.save(tvsPath)
|
|
|
|
loadedTvs = TrainValidationSplit.load(tvsPath)
|
|
|
|
self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
|
|
|
|
self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
|
|
|
|
|
|
|
|
originalParamMap = tvs.getEstimatorParamMaps()
|
|
|
|
loadedParamMap = loadedTvs.getEstimatorParamMaps()
|
|
|
|
for i, param in enumerate(loadedParamMap):
|
|
|
|
for p in param:
|
|
|
|
if p.name == "classifier":
|
|
|
|
self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
|
|
|
|
else:
|
|
|
|
self.assertEqual(param[p], originalParamMap[i][p])
|
|
|
|
|
|
|
|
tvsModelPath = temp_path + "/tvsModel"
|
|
|
|
tvsModel.save(tvsModelPath)
|
|
|
|
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
|
|
|
|
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
def test_save_load_pipeline_estimator(self):
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
training = self.spark.createDataFrame([
|
|
|
|
(0, "a b c d e spark", 1.0),
|
|
|
|
(1, "b d", 0.0),
|
|
|
|
(2, "spark f g h", 1.0),
|
|
|
|
(3, "hadoop mapreduce", 0.0),
|
|
|
|
(4, "b spark who", 1.0),
|
|
|
|
(5, "g d a y", 0.0),
|
|
|
|
(6, "spark fly", 1.0),
|
|
|
|
(7, "was mapreduce", 0.0),
|
|
|
|
], ["id", "text", "label"])
|
|
|
|
|
|
|
|
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
|
|
|
|
tokenizer = Tokenizer(inputCol="text", outputCol="words")
|
|
|
|
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
|
|
|
|
|
|
|
|
ova = OneVsRest(classifier=LogisticRegression())
|
|
|
|
lr1 = LogisticRegression().setMaxIter(5)
|
|
|
|
lr2 = LogisticRegression().setMaxIter(10)
|
|
|
|
|
|
|
|
pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
|
|
|
|
|
|
|
|
paramGrid = ParamGridBuilder() \
|
|
|
|
.addGrid(hashingTF.numFeatures, [10, 100]) \
|
|
|
|
.addGrid(ova.classifier, [lr1, lr2]) \
|
|
|
|
.build()
|
|
|
|
|
|
|
|
tvs = TrainValidationSplit(estimator=pipeline,
|
|
|
|
estimatorParamMaps=paramGrid,
|
|
|
|
evaluator=MulticlassClassificationEvaluator())
|
|
|
|
|
|
|
|
# Run train validation split, and choose the best set of parameters.
|
|
|
|
tvsModel = tvs.fit(training)
|
|
|
|
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
tvsModelPath = temp_path + "/tvsModel"
|
|
|
|
tvsModel.save(tvsModelPath)
|
|
|
|
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
|
|
|
|
self.assertEqual(len(loadedModel.bestModel.stages), len(tvsModel.bestModel.stages))
|
|
|
|
for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
|
|
|
|
tvsModel.bestModel.stages):
|
|
|
|
self.assertEqual(loadedStage.uid, originalStage.uid)
|
|
|
|
|
|
|
|
# Test nested pipeline
|
|
|
|
nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
|
|
|
|
tvs2 = TrainValidationSplit(estimator=nested_pipeline,
|
|
|
|
estimatorParamMaps=paramGrid,
|
|
|
|
evaluator=MulticlassClassificationEvaluator())
|
|
|
|
|
|
|
|
# Run train validation split, and choose the best set of parameters.
|
|
|
|
tvsModel2 = tvs2.fit(training)
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
tvsModelPath2 = temp_path + "/tvsModel2"
|
|
|
|
tvsModel2.save(tvsModelPath2)
|
|
|
|
loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2)
|
|
|
|
self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid)
|
|
|
|
loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
|
|
|
|
original_nested_pipeline_model = tvsModel2.bestModel.stages[1]
|
|
|
|
self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
|
|
|
|
self.assertEqual(len(loaded_nested_pipeline_model.stages),
|
|
|
|
len(original_nested_pipeline_model.stages))
|
|
|
|
for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
|
|
|
|
original_nested_pipeline_model.stages):
|
|
|
|
self.assertEqual(loadedStage.uid, originalStage.uid)
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
def test_copy(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="r2")
|
|
|
|
|
|
|
|
grid = ParamGridBuilder() \
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
|
|
|
|
.build()
|
|
|
|
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
tvsCopied = tvs.copy()
|
|
|
|
tvsModelCopied = tvsModel.copy()
|
|
|
|
|
|
|
|
self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid,
|
|
|
|
"Copied TrainValidationSplit has the same uid of Estimator")
|
|
|
|
|
|
|
|
self.assertEqual(tvsModel.bestModel.uid, tvsModelCopied.bestModel.uid)
|
|
|
|
self.assertEqual(len(tvsModel.validationMetrics),
|
|
|
|
len(tvsModelCopied.validationMetrics),
|
|
|
|
"Copied validationMetrics has the same size of the original")
|
|
|
|
for index in range(len(tvsModel.validationMetrics)):
|
|
|
|
self.assertEqual(tvsModel.validationMetrics[index],
|
|
|
|
tvsModelCopied.validationMetrics[index])
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
from pyspark.ml.tests.test_tuning import *
|
|
|
|
|
|
|
|
try:
|
|
|
|
import xmlrunner
|
2019-06-23 20:58:17 -04:00
|
|
|
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
|
2018-11-18 03:02:15 -05:00
|
|
|
except ImportError:
|
|
|
|
testRunner = None
|
|
|
|
unittest.main(testRunner=testRunner, verbosity=2)
|