2018-11-18 03:02:15 -05:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
|
|
|
import tempfile
|
2018-11-18 20:22:32 -05:00
|
|
|
import unittest
|
2018-11-18 03:02:15 -05:00
|
|
|
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
from pyspark.ml.feature import HashingTF, Tokenizer
|
|
|
|
from pyspark.ml import Estimator, Pipeline, Model
|
2018-11-18 03:02:15 -05:00
|
|
|
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
|
|
|
|
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
|
|
|
|
MulticlassClassificationEvaluator, RegressionEvaluator
|
|
|
|
from pyspark.ml.linalg import Vectors
|
|
|
|
from pyspark.ml.param import Param, Params
|
|
|
|
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
|
|
|
|
TrainValidationSplit, TrainValidationSplitModel
|
|
|
|
from pyspark.sql.functions import rand
|
2020-12-03 19:35:50 -05:00
|
|
|
from pyspark.testing.mlutils import DummyEvaluator, DummyLogisticRegression, \
|
|
|
|
DummyLogisticRegressionModel, SparkSessionTestCase
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
|
|
|
|
class HasInducedError(Params):
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
super(HasInducedError, self).__init__()
|
|
|
|
self.inducedError = Param(self, "inducedError",
|
|
|
|
"Uniformly-distributed error added to feature")
|
|
|
|
|
|
|
|
def getInducedError(self):
|
|
|
|
return self.getOrDefault(self.inducedError)
|
|
|
|
|
|
|
|
|
|
|
|
class InducedErrorModel(Model, HasInducedError):
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
super(InducedErrorModel, self).__init__()
|
|
|
|
|
|
|
|
def _transform(self, dataset):
|
|
|
|
return dataset.withColumn("prediction",
|
|
|
|
dataset.feature + (rand(0) * self.getInducedError()))
|
|
|
|
|
|
|
|
|
|
|
|
class InducedErrorEstimator(Estimator, HasInducedError):
|
|
|
|
|
|
|
|
def __init__(self, inducedError=1.0):
|
|
|
|
super(InducedErrorEstimator, self).__init__()
|
|
|
|
self._set(inducedError=inducedError)
|
|
|
|
|
|
|
|
def _fit(self, dataset):
|
|
|
|
model = InducedErrorModel()
|
|
|
|
self._copyValues(model)
|
|
|
|
return model
|
|
|
|
|
|
|
|
|
2019-11-19 17:15:00 -05:00
|
|
|
class ParamGridBuilderTests(SparkSessionTestCase):
|
|
|
|
|
|
|
|
def test_addGrid(self):
|
|
|
|
with self.assertRaises(TypeError):
|
|
|
|
grid = (ParamGridBuilder()
|
|
|
|
.addGrid("must be an instance of Param", ["not", "string"])
|
|
|
|
.build())
|
|
|
|
|
|
|
|
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
class ValidatorTestUtilsMixin:
|
|
|
|
def assert_param_maps_equal(self, paramMaps1, paramMaps2):
|
|
|
|
self.assertEqual(len(paramMaps1), len(paramMaps2))
|
|
|
|
for paramMap1, paramMap2 in zip(paramMaps1, paramMaps2):
|
|
|
|
self.assertEqual(set(paramMap1.keys()), set(paramMap2.keys()))
|
|
|
|
for param in paramMap1.keys():
|
|
|
|
v1 = paramMap1[param]
|
|
|
|
v2 = paramMap2[param]
|
|
|
|
if isinstance(v1, Params):
|
|
|
|
self.assertEqual(v1.uid, v2.uid)
|
|
|
|
else:
|
|
|
|
self.assertEqual(v1, v2)
|
|
|
|
|
|
|
|
|
|
|
|
class CrossValidatorTests(SparkSessionTestCase, ValidatorTestUtilsMixin):
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
def test_copy(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="rmse")
|
|
|
|
|
|
|
|
grid = (ParamGridBuilder()
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
|
|
|
|
.build())
|
2020-08-22 10:27:31 -04:00
|
|
|
cv = CrossValidator(
|
|
|
|
estimator=iee,
|
|
|
|
estimatorParamMaps=grid,
|
|
|
|
evaluator=evaluator,
|
|
|
|
collectSubModels=True,
|
|
|
|
numFolds=2
|
|
|
|
)
|
2018-11-18 03:02:15 -05:00
|
|
|
cvCopied = cv.copy()
|
2020-08-22 10:27:31 -04:00
|
|
|
for param in [
|
|
|
|
lambda x: x.getEstimator().uid,
|
|
|
|
# SPARK-32092: CrossValidator.copy() needs to copy all existing params
|
|
|
|
lambda x: x.getNumFolds(),
|
|
|
|
lambda x: x.getFoldCol(),
|
|
|
|
lambda x: x.getCollectSubModels(),
|
|
|
|
lambda x: x.getParallelism(),
|
|
|
|
lambda x: x.getSeed()
|
|
|
|
]:
|
|
|
|
self.assertEqual(param(cv), param(cvCopied))
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
cvModelCopied = cvModel.copy()
|
|
|
|
for index in range(len(cvModel.avgMetrics)):
|
|
|
|
self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index])
|
|
|
|
< 0.0001)
|
2020-08-22 10:27:31 -04:00
|
|
|
# SPARK-32092: CrossValidatorModel.copy() needs to copy all existing params
|
|
|
|
for param in [
|
|
|
|
lambda x: x.getNumFolds(),
|
|
|
|
lambda x: x.getFoldCol(),
|
|
|
|
lambda x: x.getSeed()
|
|
|
|
]:
|
|
|
|
self.assertEqual(param(cvModel), param(cvModelCopied))
|
|
|
|
|
|
|
|
cvModel.avgMetrics[0] = 'foo'
|
|
|
|
self.assertNotEqual(
|
|
|
|
cvModelCopied.avgMetrics[0],
|
|
|
|
'foo',
|
|
|
|
"Changing the original avgMetrics should not affect the copied model"
|
|
|
|
)
|
2020-08-28 13:15:16 -04:00
|
|
|
cvModel.subModels[0][0].getInducedError = lambda: 'foo'
|
2020-08-22 10:27:31 -04:00
|
|
|
self.assertNotEqual(
|
2020-08-28 13:15:16 -04:00
|
|
|
cvModelCopied.subModels[0][0].getInducedError(),
|
2020-08-22 10:27:31 -04:00
|
|
|
'foo',
|
|
|
|
"Changing the original subModels should not affect the copied model"
|
|
|
|
)
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
def test_fit_minimize_metric(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="rmse")
|
|
|
|
|
|
|
|
grid = (ParamGridBuilder()
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
|
|
|
|
.build())
|
|
|
|
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
bestModel = cvModel.bestModel
|
|
|
|
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
|
|
|
|
|
|
|
|
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
|
|
|
|
"Best model should have zero induced error")
|
|
|
|
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
|
|
|
|
|
|
|
|
def test_fit_maximize_metric(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="r2")
|
|
|
|
|
|
|
|
grid = (ParamGridBuilder()
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
|
|
|
|
.build())
|
|
|
|
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
bestModel = cvModel.bestModel
|
|
|
|
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
|
|
|
|
|
|
|
|
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
|
|
|
|
"Best model should have zero induced error")
|
|
|
|
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
|
|
|
|
|
|
|
|
def test_param_grid_type_coercion(self):
|
|
|
|
lr = LogisticRegression(maxIter=10)
|
|
|
|
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.5, 1]).build()
|
|
|
|
for param in paramGrid:
|
|
|
|
for v in param.values():
|
|
|
|
assert(type(v) == float)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def _run_test_save_load_trained_model(self, LogisticRegressionCls, LogisticRegressionModelCls):
|
2018-11-18 03:02:15 -05:00
|
|
|
# This tests saving and loading the trained model only.
|
|
|
|
# Save/load for CrossValidator will be added later: SPARK-13786
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
2020-12-03 19:35:50 -05:00
|
|
|
lr = LogisticRegressionCls()
|
2018-11-18 03:02:15 -05:00
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
2020-08-22 10:27:31 -04:00
|
|
|
cv = CrossValidator(
|
|
|
|
estimator=lr,
|
|
|
|
estimatorParamMaps=grid,
|
|
|
|
evaluator=evaluator,
|
|
|
|
collectSubModels=True,
|
|
|
|
numFolds=4,
|
|
|
|
seed=42
|
|
|
|
)
|
2018-11-18 03:02:15 -05:00
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
lrModel = cvModel.bestModel
|
|
|
|
|
2020-08-22 10:27:31 -04:00
|
|
|
lrModelPath = temp_path + "/lrModel"
|
|
|
|
lrModel.save(lrModelPath)
|
2020-12-03 19:35:50 -05:00
|
|
|
loadedLrModel = LogisticRegressionModelCls.load(lrModelPath)
|
2018-11-18 03:02:15 -05:00
|
|
|
self.assertEqual(loadedLrModel.uid, lrModel.uid)
|
|
|
|
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
|
|
|
|
|
2020-08-22 10:27:31 -04:00
|
|
|
# SPARK-32092: Saving and then loading CrossValidatorModel should not change the params
|
|
|
|
cvModelPath = temp_path + "/cvModel"
|
|
|
|
cvModel.save(cvModelPath)
|
|
|
|
loadedCvModel = CrossValidatorModel.load(cvModelPath)
|
|
|
|
for param in [
|
|
|
|
lambda x: x.getNumFolds(),
|
|
|
|
lambda x: x.getFoldCol(),
|
|
|
|
lambda x: x.getSeed(),
|
|
|
|
lambda x: len(x.subModels)
|
|
|
|
]:
|
|
|
|
self.assertEqual(param(cvModel), param(loadedCvModel))
|
|
|
|
|
|
|
|
self.assertTrue(all(
|
|
|
|
loadedCvModel.isSet(param) for param in loadedCvModel.params
|
|
|
|
))
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def test_save_load_trained_model(self):
|
|
|
|
self._run_test_save_load_trained_model(LogisticRegression, LogisticRegressionModel)
|
|
|
|
self._run_test_save_load_trained_model(DummyLogisticRegression,
|
|
|
|
DummyLogisticRegressionModel)
|
|
|
|
|
|
|
|
def _run_test_save_load_simple_estimator(self, LogisticRegressionCls, evaluatorCls):
|
2018-11-18 03:02:15 -05:00
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
lr = LogisticRegressionCls()
|
2018-11-18 03:02:15 -05:00
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
2020-12-03 19:35:50 -05:00
|
|
|
evaluator = evaluatorCls()
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
# test save/load of CrossValidator
|
|
|
|
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
cvPath = temp_path + "/cv"
|
|
|
|
cv.save(cvPath)
|
|
|
|
loadedCV = CrossValidator.load(cvPath)
|
|
|
|
self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
|
|
|
|
self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
self.assert_param_maps_equal(loadedCV.getEstimatorParamMaps(), cv.getEstimatorParamMaps())
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
cvModelPath = temp_path + "/cvModel"
|
|
|
|
cvModel.save(cvModelPath)
|
|
|
|
loadedModel = CrossValidatorModel.load(cvModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def test_save_load_simple_estimator(self):
|
|
|
|
self._run_test_save_load_simple_estimator(
|
|
|
|
LogisticRegression, BinaryClassificationEvaluator)
|
|
|
|
self._run_test_save_load_simple_estimator(
|
|
|
|
DummyLogisticRegression, DummyEvaluator)
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
def test_parallel_evaluation(self):
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
|
|
|
|
# test save/load of CrossValidator
|
|
|
|
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cv.setParallelism(1)
|
|
|
|
cvSerialModel = cv.fit(dataset)
|
|
|
|
cv.setParallelism(2)
|
|
|
|
cvParallelModel = cv.fit(dataset)
|
|
|
|
self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
|
|
|
|
|
|
|
|
def test_expose_sub_models(self):
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
|
|
|
|
numFolds = 3
|
|
|
|
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
|
|
|
|
numFolds=numFolds, collectSubModels=True)
|
|
|
|
|
|
|
|
def checkSubModels(subModels):
|
|
|
|
self.assertEqual(len(subModels), numFolds)
|
|
|
|
for i in range(numFolds):
|
|
|
|
self.assertEqual(len(subModels[i]), len(grid))
|
|
|
|
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
checkSubModels(cvModel.subModels)
|
|
|
|
|
|
|
|
# Test the default value for option "persistSubModel" to be "true"
|
|
|
|
testSubPath = temp_path + "/testCrossValidatorSubModels"
|
|
|
|
savingPathWithSubModels = testSubPath + "cvModel3"
|
|
|
|
cvModel.save(savingPathWithSubModels)
|
|
|
|
cvModel3 = CrossValidatorModel.load(savingPathWithSubModels)
|
|
|
|
checkSubModels(cvModel3.subModels)
|
|
|
|
cvModel4 = cvModel3.copy()
|
|
|
|
checkSubModels(cvModel4.subModels)
|
|
|
|
|
|
|
|
savingPathWithoutSubModels = testSubPath + "cvModel2"
|
|
|
|
cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
|
|
|
|
cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels)
|
|
|
|
self.assertEqual(cvModel2.subModels, None)
|
|
|
|
|
|
|
|
for i in range(numFolds):
|
|
|
|
for j in range(len(grid)):
|
|
|
|
self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def _run_test_save_load_nested_estimator(self, LogisticRegressionCls):
|
2018-11-18 03:02:15 -05:00
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
ova = OneVsRest(classifier=LogisticRegressionCls())
|
|
|
|
lr1 = LogisticRegressionCls().setMaxIter(100)
|
|
|
|
lr2 = LogisticRegressionCls().setMaxIter(150)
|
2018-11-18 03:02:15 -05:00
|
|
|
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
|
|
|
|
evaluator = MulticlassClassificationEvaluator()
|
|
|
|
|
|
|
|
# test save/load of CrossValidator
|
|
|
|
cv = CrossValidator(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
cvModel = cv.fit(dataset)
|
|
|
|
cvPath = temp_path + "/cv"
|
|
|
|
cv.save(cvPath)
|
|
|
|
loadedCV = CrossValidator.load(cvPath)
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
self.assert_param_maps_equal(loadedCV.getEstimatorParamMaps(), grid)
|
2018-11-18 03:02:15 -05:00
|
|
|
self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
|
|
|
|
self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
|
|
|
|
|
|
|
|
originalParamMap = cv.getEstimatorParamMaps()
|
|
|
|
loadedParamMap = loadedCV.getEstimatorParamMaps()
|
|
|
|
for i, param in enumerate(loadedParamMap):
|
|
|
|
for p in param:
|
|
|
|
if p.name == "classifier":
|
|
|
|
self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
|
|
|
|
else:
|
|
|
|
self.assertEqual(param[p], originalParamMap[i][p])
|
|
|
|
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
cvModelPath = temp_path + "/cvModel"
|
|
|
|
cvModel.save(cvModelPath)
|
|
|
|
loadedModel = CrossValidatorModel.load(cvModelPath)
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
self.assert_param_maps_equal(loadedModel.getEstimatorParamMaps(), grid)
|
2018-11-18 03:02:15 -05:00
|
|
|
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def test_save_load_nested_estimator(self):
|
|
|
|
self._run_test_save_load_nested_estimator(LogisticRegression)
|
|
|
|
self._run_test_save_load_nested_estimator(DummyLogisticRegression)
|
|
|
|
|
|
|
|
def _run_test_save_load_pipeline_estimator(self, LogisticRegressionCls):
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
training = self.spark.createDataFrame([
|
|
|
|
(0, "a b c d e spark", 1.0),
|
|
|
|
(1, "b d", 0.0),
|
|
|
|
(2, "spark f g h", 1.0),
|
|
|
|
(3, "hadoop mapreduce", 0.0),
|
|
|
|
(4, "b spark who", 1.0),
|
|
|
|
(5, "g d a y", 0.0),
|
|
|
|
(6, "spark fly", 1.0),
|
|
|
|
(7, "was mapreduce", 0.0),
|
|
|
|
], ["id", "text", "label"])
|
|
|
|
|
|
|
|
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
|
|
|
|
tokenizer = Tokenizer(inputCol="text", outputCol="words")
|
|
|
|
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
ova = OneVsRest(classifier=LogisticRegressionCls())
|
|
|
|
lr1 = LogisticRegressionCls().setMaxIter(5)
|
|
|
|
lr2 = LogisticRegressionCls().setMaxIter(10)
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
|
|
|
|
pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
|
|
|
|
|
|
|
|
paramGrid = ParamGridBuilder() \
|
|
|
|
.addGrid(hashingTF.numFeatures, [10, 100]) \
|
|
|
|
.addGrid(ova.classifier, [lr1, lr2]) \
|
|
|
|
.build()
|
|
|
|
|
|
|
|
crossval = CrossValidator(estimator=pipeline,
|
|
|
|
estimatorParamMaps=paramGrid,
|
|
|
|
evaluator=MulticlassClassificationEvaluator(),
|
|
|
|
numFolds=2) # use 3+ folds in practice
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
cvPath = temp_path + "/cv"
|
|
|
|
crossval.save(cvPath)
|
|
|
|
loadedCV = CrossValidator.load(cvPath)
|
|
|
|
self.assert_param_maps_equal(loadedCV.getEstimatorParamMaps(), paramGrid)
|
|
|
|
self.assertEqual(loadedCV.getEstimator().uid, crossval.getEstimator().uid)
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
|
|
|
|
# Run cross-validation, and choose the best set of parameters.
|
|
|
|
cvModel = crossval.fit(training)
|
|
|
|
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
cvModelPath = temp_path + "/cvModel"
|
|
|
|
cvModel.save(cvModelPath)
|
|
|
|
loadedModel = CrossValidatorModel.load(cvModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
|
|
|
|
self.assertEqual(len(loadedModel.bestModel.stages), len(cvModel.bestModel.stages))
|
|
|
|
for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
|
|
|
|
cvModel.bestModel.stages):
|
|
|
|
self.assertEqual(loadedStage.uid, originalStage.uid)
|
|
|
|
|
|
|
|
# Test nested pipeline
|
|
|
|
nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
|
|
|
|
crossval2 = CrossValidator(estimator=nested_pipeline,
|
|
|
|
estimatorParamMaps=paramGrid,
|
|
|
|
evaluator=MulticlassClassificationEvaluator(),
|
|
|
|
numFolds=2) # use 3+ folds in practice
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
cv2Path = temp_path + "/cv2"
|
|
|
|
crossval2.save(cv2Path)
|
|
|
|
loadedCV2 = CrossValidator.load(cv2Path)
|
|
|
|
self.assert_param_maps_equal(loadedCV2.getEstimatorParamMaps(), paramGrid)
|
|
|
|
self.assertEqual(loadedCV2.getEstimator().uid, crossval2.getEstimator().uid)
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
|
|
|
|
# Run cross-validation, and choose the best set of parameters.
|
|
|
|
cvModel2 = crossval2.fit(training)
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
cvModelPath2 = temp_path + "/cvModel2"
|
|
|
|
cvModel2.save(cvModelPath2)
|
|
|
|
loadedModel2 = CrossValidatorModel.load(cvModelPath2)
|
|
|
|
self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid)
|
|
|
|
loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
|
|
|
|
original_nested_pipeline_model = cvModel2.bestModel.stages[1]
|
|
|
|
self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
|
|
|
|
self.assertEqual(len(loaded_nested_pipeline_model.stages),
|
|
|
|
len(original_nested_pipeline_model.stages))
|
|
|
|
for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
|
|
|
|
original_nested_pipeline_model.stages):
|
|
|
|
self.assertEqual(loadedStage.uid, originalStage.uid)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def test_save_load_pipeline_estimator(self):
|
|
|
|
self._run_test_save_load_pipeline_estimator(LogisticRegression)
|
|
|
|
self._run_test_save_load_pipeline_estimator(DummyLogisticRegression)
|
|
|
|
|
2020-06-16 19:46:32 -04:00
|
|
|
def test_user_specified_folds(self):
|
|
|
|
from pyspark.sql import functions as F
|
|
|
|
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"]).repartition(2, "features")
|
|
|
|
|
|
|
|
dataset_with_folds = dataset.repartition(1).withColumn("random", rand(100)) \
|
|
|
|
.withColumn("fold", F.when(F.col("random") < 0.33, 0)
|
|
|
|
.when(F.col("random") < 0.66, 1)
|
|
|
|
.otherwise(2)).repartition(2, "features")
|
|
|
|
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [20]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
|
|
|
|
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3)
|
|
|
|
cv_with_user_folds = CrossValidator(estimator=lr,
|
|
|
|
estimatorParamMaps=grid,
|
|
|
|
evaluator=evaluator,
|
|
|
|
numFolds=3,
|
|
|
|
foldCol="fold")
|
|
|
|
|
|
|
|
self.assertEqual(cv.getEstimator().uid, cv_with_user_folds.getEstimator().uid)
|
|
|
|
|
|
|
|
cvModel1 = cv.fit(dataset)
|
|
|
|
cvModel2 = cv_with_user_folds.fit(dataset_with_folds)
|
|
|
|
for index in range(len(cvModel1.avgMetrics)):
|
|
|
|
print(abs(cvModel1.avgMetrics[index] - cvModel2.avgMetrics[index]))
|
|
|
|
self.assertTrue(abs(cvModel1.avgMetrics[index] - cvModel2.avgMetrics[index])
|
|
|
|
< 0.1)
|
|
|
|
|
|
|
|
# test save/load of CrossValidator
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
cvPath = temp_path + "/cv"
|
|
|
|
cv_with_user_folds.save(cvPath)
|
|
|
|
loadedCV = CrossValidator.load(cvPath)
|
|
|
|
self.assertEqual(loadedCV.getFoldCol(), cv_with_user_folds.getFoldCol())
|
|
|
|
|
|
|
|
def test_invalid_user_specified_folds(self):
|
|
|
|
dataset_with_folds = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0, 0),
|
|
|
|
(Vectors.dense([0.4]), 1.0, 1),
|
|
|
|
(Vectors.dense([0.5]), 0.0, 2),
|
|
|
|
(Vectors.dense([0.6]), 1.0, 0),
|
|
|
|
(Vectors.dense([1.0]), 1.0, 1)] * 10,
|
|
|
|
["features", "label", "fold"])
|
|
|
|
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [20]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
|
|
|
|
cv = CrossValidator(estimator=lr,
|
|
|
|
estimatorParamMaps=grid,
|
|
|
|
evaluator=evaluator,
|
|
|
|
numFolds=2,
|
|
|
|
foldCol="fold")
|
2020-11-30 20:34:40 -05:00
|
|
|
with self.assertRaisesRegex(Exception, "Fold number must be in range"):
|
2020-06-16 19:46:32 -04:00
|
|
|
cv.fit(dataset_with_folds)
|
|
|
|
|
|
|
|
cv = CrossValidator(estimator=lr,
|
|
|
|
estimatorParamMaps=grid,
|
|
|
|
evaluator=evaluator,
|
|
|
|
numFolds=4,
|
|
|
|
foldCol="fold")
|
2020-11-30 20:34:40 -05:00
|
|
|
with self.assertRaisesRegex(Exception, "The validation data at fold 3 is empty"):
|
2020-06-16 19:46:32 -04:00
|
|
|
cv.fit(dataset_with_folds)
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
class TrainValidationSplitTests(SparkSessionTestCase, ValidatorTestUtilsMixin):
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
def test_fit_minimize_metric(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="rmse")
|
|
|
|
|
|
|
|
grid = ParamGridBuilder() \
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
|
|
|
|
.build()
|
|
|
|
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
bestModel = tvsModel.bestModel
|
|
|
|
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
|
|
|
|
validationMetrics = tvsModel.validationMetrics
|
|
|
|
|
|
|
|
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
|
|
|
|
"Best model should have zero induced error")
|
|
|
|
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
|
|
|
|
self.assertEqual(len(grid), len(validationMetrics),
|
|
|
|
"validationMetrics has the same size of grid parameter")
|
|
|
|
self.assertEqual(0.0, min(validationMetrics))
|
|
|
|
|
|
|
|
def test_fit_maximize_metric(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="r2")
|
|
|
|
|
|
|
|
grid = ParamGridBuilder() \
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
|
|
|
|
.build()
|
|
|
|
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
bestModel = tvsModel.bestModel
|
|
|
|
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
|
|
|
|
validationMetrics = tvsModel.validationMetrics
|
|
|
|
|
|
|
|
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
|
|
|
|
"Best model should have zero induced error")
|
|
|
|
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
|
|
|
|
self.assertEqual(len(grid), len(validationMetrics),
|
|
|
|
"validationMetrics has the same size of grid parameter")
|
|
|
|
self.assertEqual(1.0, max(validationMetrics))
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def _run_test_save_load_trained_model(self, LogisticRegressionCls, LogisticRegressionModelCls):
|
2018-11-18 03:02:15 -05:00
|
|
|
# This tests saving and loading the trained model only.
|
|
|
|
# Save/load for TrainValidationSplit will be added later: SPARK-13786
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
2020-12-03 19:35:50 -05:00
|
|
|
lr = LogisticRegressionCls()
|
2018-11-18 03:02:15 -05:00
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
2020-08-22 10:27:31 -04:00
|
|
|
tvs = TrainValidationSplit(
|
|
|
|
estimator=lr,
|
|
|
|
estimatorParamMaps=grid,
|
|
|
|
evaluator=evaluator,
|
|
|
|
collectSubModels=True,
|
|
|
|
seed=42
|
|
|
|
)
|
2018-11-18 03:02:15 -05:00
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
lrModel = tvsModel.bestModel
|
|
|
|
|
2020-08-22 10:27:31 -04:00
|
|
|
lrModelPath = temp_path + "/lrModel"
|
|
|
|
lrModel.save(lrModelPath)
|
2020-12-03 19:35:50 -05:00
|
|
|
loadedLrModel = LogisticRegressionModelCls.load(lrModelPath)
|
2018-11-18 03:02:15 -05:00
|
|
|
self.assertEqual(loadedLrModel.uid, lrModel.uid)
|
|
|
|
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
|
|
|
|
|
2020-08-22 10:27:31 -04:00
|
|
|
tvsModelPath = temp_path + "/tvsModel"
|
|
|
|
tvsModel.save(tvsModelPath)
|
|
|
|
loadedTvsModel = TrainValidationSplitModel.load(tvsModelPath)
|
|
|
|
for param in [
|
|
|
|
lambda x: x.getSeed(),
|
|
|
|
lambda x: x.getTrainRatio(),
|
|
|
|
]:
|
|
|
|
self.assertEqual(param(tvsModel), param(loadedTvsModel))
|
|
|
|
|
|
|
|
self.assertTrue(all(
|
|
|
|
loadedTvsModel.isSet(param) for param in loadedTvsModel.params
|
|
|
|
))
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def test_save_load_trained_model(self):
|
|
|
|
self._run_test_save_load_trained_model(LogisticRegression, LogisticRegressionModel)
|
|
|
|
self._run_test_save_load_trained_model(DummyLogisticRegression,
|
|
|
|
DummyLogisticRegressionModel)
|
|
|
|
|
|
|
|
def _run_test_save_load_simple_estimator(self, LogisticRegressionCls, evaluatorCls):
|
2018-11-18 03:02:15 -05:00
|
|
|
# This tests saving and loading the trained model only.
|
|
|
|
# Save/load for TrainValidationSplit will be added later: SPARK-13786
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
2020-12-03 19:35:50 -05:00
|
|
|
lr = LogisticRegressionCls()
|
2018-11-18 03:02:15 -05:00
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
2020-12-03 19:35:50 -05:00
|
|
|
evaluator = evaluatorCls()
|
2018-11-18 03:02:15 -05:00
|
|
|
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
|
|
|
|
tvsPath = temp_path + "/tvs"
|
|
|
|
tvs.save(tvsPath)
|
|
|
|
loadedTvs = TrainValidationSplit.load(tvsPath)
|
|
|
|
self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
|
|
|
|
self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
self.assert_param_maps_equal(
|
|
|
|
loadedTvs.getEstimatorParamMaps(), tvs.getEstimatorParamMaps())
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
tvsModelPath = temp_path + "/tvsModel"
|
|
|
|
tvsModel.save(tvsModelPath)
|
|
|
|
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def test_save_load_simple_estimator(self):
|
|
|
|
self._run_test_save_load_simple_estimator(
|
|
|
|
LogisticRegression, BinaryClassificationEvaluator)
|
|
|
|
self._run_test_save_load_simple_estimator(
|
|
|
|
DummyLogisticRegression, DummyEvaluator)
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
def test_parallel_evaluation(self):
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvs.setParallelism(1)
|
|
|
|
tvsSerialModel = tvs.fit(dataset)
|
|
|
|
tvs.setParallelism(2)
|
|
|
|
tvsParallelModel = tvs.fit(dataset)
|
|
|
|
self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)
|
|
|
|
|
|
|
|
def test_expose_sub_models(self):
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
|
|
|
lr = LogisticRegression()
|
|
|
|
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
|
|
evaluator = BinaryClassificationEvaluator()
|
|
|
|
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
|
|
|
|
collectSubModels=True)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
self.assertEqual(len(tvsModel.subModels), len(grid))
|
|
|
|
|
|
|
|
# Test the default value for option "persistSubModel" to be "true"
|
|
|
|
testSubPath = temp_path + "/testTrainValidationSplitSubModels"
|
|
|
|
savingPathWithSubModels = testSubPath + "cvModel3"
|
|
|
|
tvsModel.save(savingPathWithSubModels)
|
|
|
|
tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels)
|
|
|
|
self.assertEqual(len(tvsModel3.subModels), len(grid))
|
|
|
|
tvsModel4 = tvsModel3.copy()
|
|
|
|
self.assertEqual(len(tvsModel4.subModels), len(grid))
|
|
|
|
|
|
|
|
savingPathWithoutSubModels = testSubPath + "cvModel2"
|
|
|
|
tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
|
|
|
|
tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels)
|
|
|
|
self.assertEqual(tvsModel2.subModels, None)
|
|
|
|
|
|
|
|
for i in range(len(grid)):
|
|
|
|
self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def _run_test_save_load_nested_estimator(self, LogisticRegressionCls):
|
2018-11-18 03:02:15 -05:00
|
|
|
# This tests saving and loading the trained model only.
|
|
|
|
# Save/load for TrainValidationSplit will be added later: SPARK-13786
|
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
dataset = self.spark.createDataFrame(
|
|
|
|
[(Vectors.dense([0.0]), 0.0),
|
|
|
|
(Vectors.dense([0.4]), 1.0),
|
|
|
|
(Vectors.dense([0.5]), 0.0),
|
|
|
|
(Vectors.dense([0.6]), 1.0),
|
|
|
|
(Vectors.dense([1.0]), 1.0)] * 10,
|
|
|
|
["features", "label"])
|
2020-12-03 19:35:50 -05:00
|
|
|
ova = OneVsRest(classifier=LogisticRegressionCls())
|
|
|
|
lr1 = LogisticRegressionCls().setMaxIter(100)
|
|
|
|
lr2 = LogisticRegressionCls().setMaxIter(150)
|
2018-11-18 03:02:15 -05:00
|
|
|
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
|
|
|
|
evaluator = MulticlassClassificationEvaluator()
|
|
|
|
|
|
|
|
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
|
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
tvsPath = temp_path + "/tvs"
|
|
|
|
tvs.save(tvsPath)
|
|
|
|
loadedTvs = TrainValidationSplit.load(tvsPath)
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
self.assert_param_maps_equal(loadedTvs.getEstimatorParamMaps(), grid)
|
2018-11-18 03:02:15 -05:00
|
|
|
self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
|
|
|
|
self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
|
|
|
|
|
|
|
|
originalParamMap = tvs.getEstimatorParamMaps()
|
|
|
|
loadedParamMap = loadedTvs.getEstimatorParamMaps()
|
|
|
|
for i, param in enumerate(loadedParamMap):
|
|
|
|
for p in param:
|
|
|
|
if p.name == "classifier":
|
|
|
|
self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
|
|
|
|
else:
|
|
|
|
self.assertEqual(param[p], originalParamMap[i][p])
|
|
|
|
|
|
|
|
tvsModelPath = temp_path + "/tvsModel"
|
|
|
|
tvsModel.save(tvsModelPath)
|
|
|
|
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
self.assert_param_maps_equal(loadedModel.getEstimatorParamMaps(), grid)
|
2018-11-18 03:02:15 -05:00
|
|
|
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def test_save_load_nested_estimator(self):
|
|
|
|
self._run_test_save_load_nested_estimator(LogisticRegression)
|
|
|
|
self._run_test_save_load_nested_estimator(DummyLogisticRegression)
|
|
|
|
|
|
|
|
def _run_test_save_load_pipeline_estimator(self, LogisticRegressionCls):
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
temp_path = tempfile.mkdtemp()
|
|
|
|
training = self.spark.createDataFrame([
|
|
|
|
(0, "a b c d e spark", 1.0),
|
|
|
|
(1, "b d", 0.0),
|
|
|
|
(2, "spark f g h", 1.0),
|
|
|
|
(3, "hadoop mapreduce", 0.0),
|
|
|
|
(4, "b spark who", 1.0),
|
|
|
|
(5, "g d a y", 0.0),
|
|
|
|
(6, "spark fly", 1.0),
|
|
|
|
(7, "was mapreduce", 0.0),
|
|
|
|
], ["id", "text", "label"])
|
|
|
|
|
|
|
|
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
|
|
|
|
tokenizer = Tokenizer(inputCol="text", outputCol="words")
|
|
|
|
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
ova = OneVsRest(classifier=LogisticRegressionCls())
|
|
|
|
lr1 = LogisticRegressionCls().setMaxIter(5)
|
|
|
|
lr2 = LogisticRegressionCls().setMaxIter(10)
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
|
|
|
|
pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
|
|
|
|
|
|
|
|
paramGrid = ParamGridBuilder() \
|
|
|
|
.addGrid(hashingTF.numFeatures, [10, 100]) \
|
|
|
|
.addGrid(ova.classifier, [lr1, lr2]) \
|
|
|
|
.build()
|
|
|
|
|
|
|
|
tvs = TrainValidationSplit(estimator=pipeline,
|
|
|
|
estimatorParamMaps=paramGrid,
|
|
|
|
evaluator=MulticlassClassificationEvaluator())
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
tvsPath = temp_path + "/tvs"
|
|
|
|
tvs.save(tvsPath)
|
|
|
|
loadedTvs = TrainValidationSplit.load(tvsPath)
|
|
|
|
self.assert_param_maps_equal(loadedTvs.getEstimatorParamMaps(), paramGrid)
|
|
|
|
self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
|
|
|
|
# Run train validation split, and choose the best set of parameters.
|
|
|
|
tvsModel = tvs.fit(training)
|
|
|
|
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
tvsModelPath = temp_path + "/tvsModel"
|
|
|
|
tvsModel.save(tvsModelPath)
|
|
|
|
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
|
|
|
|
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
|
|
|
|
self.assertEqual(len(loadedModel.bestModel.stages), len(tvsModel.bestModel.stages))
|
|
|
|
for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
|
|
|
|
tvsModel.bestModel.stages):
|
|
|
|
self.assertEqual(loadedStage.uid, originalStage.uid)
|
|
|
|
|
|
|
|
# Test nested pipeline
|
|
|
|
nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
|
|
|
|
tvs2 = TrainValidationSplit(estimator=nested_pipeline,
|
|
|
|
estimatorParamMaps=paramGrid,
|
|
|
|
evaluator=MulticlassClassificationEvaluator())
|
[SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
### What changes were proposed in this pull request?
Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading
When saving validator estimatorParamMaps, will check all nested stages in tuned estimator to get correct param parent.
Two typical cases to manually test:
~~~python
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(lr.maxIter, [100, 200]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
~~~python
lr = LogisticRegression()
ova = OneVsRest(classifier=lr)
grid = ParamGridBuilder().addGrid(lr.maxIter, [100, 200]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
# check `loadedTvs.getEstimatorParamMaps()` restored correctly.
~~~
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30539 from WeichenXu123/fix_tuning_param_maps_io.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2020-11-30 20:36:42 -05:00
|
|
|
tvs2Path = temp_path + "/tvs2"
|
|
|
|
tvs2.save(tvs2Path)
|
|
|
|
loadedTvs2 = TrainValidationSplit.load(tvs2Path)
|
|
|
|
self.assert_param_maps_equal(loadedTvs2.getEstimatorParamMaps(), paramGrid)
|
|
|
|
self.assertEqual(loadedTvs2.getEstimator().uid, tvs2.getEstimator().uid)
|
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model
### What changes were proposed in this pull request?
Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.
Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers):
* Pipeline
* OneVsRest
* CrossValidator
* TrainValidationSplit
But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.
Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write).
OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug.
In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Manually test in pyspark shell:
1) CrossValidator with Simple Pipeline estimator
```
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
cvModel.save('/tmp/cv_model001')
CrossValidatorModel.load('/tmp/cv_model001')
```
2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
```
from pyspark.ml.linalg import Vectors
from pyspark.ml import Estimator, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
pipeline = Pipeline(stages=[ova])
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvModel.save('/tmp/model002')
cvModel2 = CrossValidatorModel.load('/tmp/model002')
```
TrainValidationSplit testing code are similar so I do not paste them.
Closes #28279 from WeichenXu123/fix_pipeline_tuning.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
|
|
|
|
|
|
|
# Run train validation split, and choose the best set of parameters.
|
|
|
|
tvsModel2 = tvs2.fit(training)
|
|
|
|
# test save/load of CrossValidatorModel
|
|
|
|
tvsModelPath2 = temp_path + "/tvsModel2"
|
|
|
|
tvsModel2.save(tvsModelPath2)
|
|
|
|
loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2)
|
|
|
|
self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid)
|
|
|
|
loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
|
|
|
|
original_nested_pipeline_model = tvsModel2.bestModel.stages[1]
|
|
|
|
self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
|
|
|
|
self.assertEqual(len(loaded_nested_pipeline_model.stages),
|
|
|
|
len(original_nested_pipeline_model.stages))
|
|
|
|
for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
|
|
|
|
original_nested_pipeline_model.stages):
|
|
|
|
self.assertEqual(loadedStage.uid, originalStage.uid)
|
|
|
|
|
2020-12-03 19:35:50 -05:00
|
|
|
def test_save_load_pipeline_estimator(self):
|
|
|
|
self._run_test_save_load_pipeline_estimator(LogisticRegression)
|
|
|
|
self._run_test_save_load_pipeline_estimator(DummyLogisticRegression)
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
def test_copy(self):
|
|
|
|
dataset = self.spark.createDataFrame([
|
|
|
|
(10, 10.0),
|
|
|
|
(50, 50.0),
|
|
|
|
(100, 100.0),
|
|
|
|
(500, 500.0)] * 10,
|
|
|
|
["feature", "label"])
|
|
|
|
|
|
|
|
iee = InducedErrorEstimator()
|
|
|
|
evaluator = RegressionEvaluator(metricName="r2")
|
|
|
|
|
|
|
|
grid = ParamGridBuilder() \
|
|
|
|
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
|
|
|
|
.build()
|
2020-08-22 10:27:31 -04:00
|
|
|
tvs = TrainValidationSplit(
|
|
|
|
estimator=iee,
|
|
|
|
estimatorParamMaps=grid,
|
|
|
|
evaluator=evaluator,
|
|
|
|
collectSubModels=True
|
|
|
|
)
|
2018-11-18 03:02:15 -05:00
|
|
|
tvsModel = tvs.fit(dataset)
|
|
|
|
tvsCopied = tvs.copy()
|
|
|
|
tvsModelCopied = tvsModel.copy()
|
|
|
|
|
2020-08-22 10:27:31 -04:00
|
|
|
for param in [
|
|
|
|
lambda x: x.getCollectSubModels(),
|
|
|
|
lambda x: x.getParallelism(),
|
|
|
|
lambda x: x.getSeed(),
|
|
|
|
lambda x: x.getTrainRatio(),
|
|
|
|
]:
|
|
|
|
self.assertEqual(param(tvs), param(tvsCopied))
|
|
|
|
|
|
|
|
for param in [
|
|
|
|
lambda x: x.getSeed(),
|
|
|
|
lambda x: x.getTrainRatio(),
|
|
|
|
]:
|
|
|
|
self.assertEqual(param(tvsModel), param(tvsModelCopied))
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid,
|
|
|
|
"Copied TrainValidationSplit has the same uid of Estimator")
|
|
|
|
|
|
|
|
self.assertEqual(tvsModel.bestModel.uid, tvsModelCopied.bestModel.uid)
|
|
|
|
self.assertEqual(len(tvsModel.validationMetrics),
|
|
|
|
len(tvsModelCopied.validationMetrics),
|
|
|
|
"Copied validationMetrics has the same size of the original")
|
|
|
|
for index in range(len(tvsModel.validationMetrics)):
|
|
|
|
self.assertEqual(tvsModel.validationMetrics[index],
|
|
|
|
tvsModelCopied.validationMetrics[index])
|
|
|
|
|
2020-08-22 10:27:31 -04:00
|
|
|
tvsModel.validationMetrics[0] = 'foo'
|
|
|
|
self.assertNotEqual(
|
|
|
|
tvsModelCopied.validationMetrics[0],
|
|
|
|
'foo',
|
|
|
|
"Changing the original validationMetrics should not affect the copied model"
|
|
|
|
)
|
2020-08-28 13:15:16 -04:00
|
|
|
tvsModel.subModels[0].getInducedError = lambda: 'foo'
|
2020-08-22 10:27:31 -04:00
|
|
|
self.assertNotEqual(
|
2020-08-28 13:15:16 -04:00
|
|
|
tvsModelCopied.subModels[0].getInducedError(),
|
2020-08-22 10:27:31 -04:00
|
|
|
'foo',
|
|
|
|
"Changing the original subModels should not affect the copied model"
|
|
|
|
)
|
|
|
|
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2020-08-08 11:51:57 -04:00
|
|
|
from pyspark.ml.tests.test_tuning import * # noqa: F401
|
2018-11-18 03:02:15 -05:00
|
|
|
|
|
|
|
try:
|
2020-09-24 01:15:36 -04:00
|
|
|
import xmlrunner # type: ignore[import]
|
2019-06-23 20:58:17 -04:00
|
|
|
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
|
2018-11-18 03:02:15 -05:00
|
|
|
except ImportError:
|
|
|
|
testRunner = None
|
|
|
|
unittest.main(testRunner=testRunner, verbosity=2)
|