spark-instrumented-optimizer/python/pyspark/ml/tests/test_tuning.py

872 lines
36 KiB
Python
Raw Normal View History

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import tempfile
import unittest
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model ### What changes were proposed in this pull request? Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model. Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers): * Pipeline * OneVsRest * CrossValidator * TrainValidationSplit But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer. Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write). OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug. In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test. Manually test in pyspark shell: 1) CrossValidator with Simple Pipeline estimator ``` from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), (4, "b spark who", 1.0), (5, "g d a y", 0.0), (6, "spark fly", 1.0), (7, "was mapreduce", 0.0), ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) paramGrid = ParamGridBuilder() \ .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \ .addGrid(lr.regParam, [0.1, 0.01]) \ .build() crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2) # use 3+ folds in practice # Run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(training) cvModel.save('/tmp/cv_model001') CrossValidatorModel.load('/tmp/cv_model001') ``` 2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator. ``` from pyspark.ml.linalg import Vectors from pyspark.ml import Estimator, Model from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest from pyspark.ml.evaluation import BinaryClassificationEvaluator, \ MulticlassClassificationEvaluator, RegressionEvaluator from pyspark.ml.linalg import Vectors from pyspark.ml.param import Param, Params from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \ TrainValidationSplit, TrainValidationSplitModel from pyspark.sql.functions import rand from pyspark.testing.mlutils import SparkSessionTestCase dataset = spark.createDataFrame( [(Vectors.dense([0.0]), 0.0), (Vectors.dense([0.4]), 1.0), (Vectors.dense([0.5]), 0.0), (Vectors.dense([0.6]), 1.0), (Vectors.dense([1.0]), 1.0)] * 10, ["features", "label"]) ova = OneVsRest(classifier=LogisticRegression()) lr1 = LogisticRegression().setMaxIter(100) lr2 = LogisticRegression().setMaxIter(150) grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build() evaluator = MulticlassClassificationEvaluator() pipeline = Pipeline(stages=[ova]) cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) cvModel.save('/tmp/model002') cvModel2 = CrossValidatorModel.load('/tmp/model002') ``` TrainValidationSplit testing code are similar so I do not paste them. Closes #28279 from WeichenXu123/fix_pipeline_tuning. Authored-by: Weichen Xu <weichen.xu@databricks.com> Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml import Estimator, Pipeline, Model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.param import Param, Params
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
TrainValidationSplit, TrainValidationSplitModel
from pyspark.sql.functions import rand
from pyspark.testing.mlutils import SparkSessionTestCase
class HasInducedError(Params):
def __init__(self):
super(HasInducedError, self).__init__()
self.inducedError = Param(self, "inducedError",
"Uniformly-distributed error added to feature")
def getInducedError(self):
return self.getOrDefault(self.inducedError)
class InducedErrorModel(Model, HasInducedError):
def __init__(self):
super(InducedErrorModel, self).__init__()
def _transform(self, dataset):
return dataset.withColumn("prediction",
dataset.feature + (rand(0) * self.getInducedError()))
class InducedErrorEstimator(Estimator, HasInducedError):
def __init__(self, inducedError=1.0):
super(InducedErrorEstimator, self).__init__()
self._set(inducedError=inducedError)
def _fit(self, dataset):
model = InducedErrorModel()
self._copyValues(model)
return model
2019-11-19 17:15:00 -05:00
class ParamGridBuilderTests(SparkSessionTestCase):
def test_addGrid(self):
with self.assertRaises(TypeError):
grid = (ParamGridBuilder()
.addGrid("must be an instance of Param", ["not", "string"])
.build())
class CrossValidatorTests(SparkSessionTestCase):
def test_copy(self):
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(
estimator=iee,
estimatorParamMaps=grid,
evaluator=evaluator,
collectSubModels=True,
numFolds=2
)
cvCopied = cv.copy()
for param in [
lambda x: x.getEstimator().uid,
# SPARK-32092: CrossValidator.copy() needs to copy all existing params
lambda x: x.getNumFolds(),
lambda x: x.getFoldCol(),
lambda x: x.getCollectSubModels(),
lambda x: x.getParallelism(),
lambda x: x.getSeed()
]:
self.assertEqual(param(cv), param(cvCopied))
cvModel = cv.fit(dataset)
cvModelCopied = cvModel.copy()
for index in range(len(cvModel.avgMetrics)):
self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index])
< 0.0001)
# SPARK-32092: CrossValidatorModel.copy() needs to copy all existing params
for param in [
lambda x: x.getNumFolds(),
lambda x: x.getFoldCol(),
lambda x: x.getSeed()
]:
self.assertEqual(param(cvModel), param(cvModelCopied))
cvModel.avgMetrics[0] = 'foo'
self.assertNotEqual(
cvModelCopied.avgMetrics[0],
'foo',
"Changing the original avgMetrics should not affect the copied model"
)
cvModel.subModels[0][0].getInducedError = lambda: 'foo'
self.assertNotEqual(
cvModelCopied.subModels[0][0].getInducedError(),
'foo',
"Changing the original subModels should not affect the copied model"
)
def test_fit_minimize_metric(self):
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
def test_fit_maximize_metric(self):
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
def test_param_grid_type_coercion(self):
lr = LogisticRegression(maxIter=10)
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.5, 1]).build()
for param in paramGrid:
for v in param.values():
assert(type(v) == float)
def test_save_load_trained_model(self):
# This tests saving and loading the trained model only.
# Save/load for CrossValidator will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(
estimator=lr,
estimatorParamMaps=grid,
evaluator=evaluator,
collectSubModels=True,
numFolds=4,
seed=42
)
cvModel = cv.fit(dataset)
lrModel = cvModel.bestModel
lrModelPath = temp_path + "/lrModel"
lrModel.save(lrModelPath)
loadedLrModel = LogisticRegressionModel.load(lrModelPath)
self.assertEqual(loadedLrModel.uid, lrModel.uid)
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
# SPARK-32092: Saving and then loading CrossValidatorModel should not change the params
cvModelPath = temp_path + "/cvModel"
cvModel.save(cvModelPath)
loadedCvModel = CrossValidatorModel.load(cvModelPath)
for param in [
lambda x: x.getNumFolds(),
lambda x: x.getFoldCol(),
lambda x: x.getSeed(),
lambda x: len(x.subModels)
]:
self.assertEqual(param(cvModel), param(loadedCvModel))
self.assertTrue(all(
loadedCvModel.isSet(param) for param in loadedCvModel.params
))
def test_save_load_simple_estimator(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
# test save/load of CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvPath = temp_path + "/cv"
cv.save(cvPath)
loadedCV = CrossValidator.load(cvPath)
self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
self.assertEqual(loadedCV.getEstimatorParamMaps(), cv.getEstimatorParamMaps())
# test save/load of CrossValidatorModel
cvModelPath = temp_path + "/cvModel"
cvModel.save(cvModelPath)
loadedModel = CrossValidatorModel.load(cvModelPath)
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
def test_parallel_evaluation(self):
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
evaluator = BinaryClassificationEvaluator()
# test save/load of CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
cv.setParallelism(1)
cvSerialModel = cv.fit(dataset)
cv.setParallelism(2)
cvParallelModel = cv.fit(dataset)
self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
def test_expose_sub_models(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
numFolds = 3
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
numFolds=numFolds, collectSubModels=True)
def checkSubModels(subModels):
self.assertEqual(len(subModels), numFolds)
for i in range(numFolds):
self.assertEqual(len(subModels[i]), len(grid))
cvModel = cv.fit(dataset)
checkSubModels(cvModel.subModels)
# Test the default value for option "persistSubModel" to be "true"
testSubPath = temp_path + "/testCrossValidatorSubModels"
savingPathWithSubModels = testSubPath + "cvModel3"
cvModel.save(savingPathWithSubModels)
cvModel3 = CrossValidatorModel.load(savingPathWithSubModels)
checkSubModels(cvModel3.subModels)
cvModel4 = cvModel3.copy()
checkSubModels(cvModel4.subModels)
savingPathWithoutSubModels = testSubPath + "cvModel2"
cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels)
self.assertEqual(cvModel2.subModels, None)
for i in range(numFolds):
for j in range(len(grid)):
self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid)
def test_save_load_nested_estimator(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
# test save/load of CrossValidator
cv = CrossValidator(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
cvPath = temp_path + "/cv"
cv.save(cvPath)
loadedCV = CrossValidator.load(cvPath)
self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
originalParamMap = cv.getEstimatorParamMaps()
loadedParamMap = loadedCV.getEstimatorParamMaps()
for i, param in enumerate(loadedParamMap):
for p in param:
if p.name == "classifier":
self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
else:
self.assertEqual(param[p], originalParamMap[i][p])
# test save/load of CrossValidatorModel
cvModelPath = temp_path + "/cvModel"
cvModel.save(cvModelPath)
loadedModel = CrossValidatorModel.load(cvModelPath)
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model ### What changes were proposed in this pull request? Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model. Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers): * Pipeline * OneVsRest * CrossValidator * TrainValidationSplit But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer. Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write). OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug. In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test. Manually test in pyspark shell: 1) CrossValidator with Simple Pipeline estimator ``` from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), (4, "b spark who", 1.0), (5, "g d a y", 0.0), (6, "spark fly", 1.0), (7, "was mapreduce", 0.0), ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) paramGrid = ParamGridBuilder() \ .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \ .addGrid(lr.regParam, [0.1, 0.01]) \ .build() crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2) # use 3+ folds in practice # Run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(training) cvModel.save('/tmp/cv_model001') CrossValidatorModel.load('/tmp/cv_model001') ``` 2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator. ``` from pyspark.ml.linalg import Vectors from pyspark.ml import Estimator, Model from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest from pyspark.ml.evaluation import BinaryClassificationEvaluator, \ MulticlassClassificationEvaluator, RegressionEvaluator from pyspark.ml.linalg import Vectors from pyspark.ml.param import Param, Params from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \ TrainValidationSplit, TrainValidationSplitModel from pyspark.sql.functions import rand from pyspark.testing.mlutils import SparkSessionTestCase dataset = spark.createDataFrame( [(Vectors.dense([0.0]), 0.0), (Vectors.dense([0.4]), 1.0), (Vectors.dense([0.5]), 0.0), (Vectors.dense([0.6]), 1.0), (Vectors.dense([1.0]), 1.0)] * 10, ["features", "label"]) ova = OneVsRest(classifier=LogisticRegression()) lr1 = LogisticRegression().setMaxIter(100) lr2 = LogisticRegression().setMaxIter(150) grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build() evaluator = MulticlassClassificationEvaluator() pipeline = Pipeline(stages=[ova]) cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) cvModel.save('/tmp/model002') cvModel2 = CrossValidatorModel.load('/tmp/model002') ``` TrainValidationSplit testing code are similar so I do not paste them. Closes #28279 from WeichenXu123/fix_pipeline_tuning. Authored-by: Weichen Xu <weichen.xu@databricks.com> Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
def test_save_load_pipeline_estimator(self):
temp_path = tempfile.mkdtemp()
training = self.spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(5)
lr2 = LogisticRegression().setMaxIter(10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(ova.classifier, [lr1, lr2]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
# test save/load of CrossValidatorModel
cvModelPath = temp_path + "/cvModel"
cvModel.save(cvModelPath)
loadedModel = CrossValidatorModel.load(cvModelPath)
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
self.assertEqual(len(loadedModel.bestModel.stages), len(cvModel.bestModel.stages))
for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
cvModel.bestModel.stages):
self.assertEqual(loadedStage.uid, originalStage.uid)
# Test nested pipeline
nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
crossval2 = CrossValidator(estimator=nested_pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel2 = crossval2.fit(training)
# test save/load of CrossValidatorModel
cvModelPath2 = temp_path + "/cvModel2"
cvModel2.save(cvModelPath2)
loadedModel2 = CrossValidatorModel.load(cvModelPath2)
self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid)
loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
original_nested_pipeline_model = cvModel2.bestModel.stages[1]
self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
self.assertEqual(len(loaded_nested_pipeline_model.stages),
len(original_nested_pipeline_model.stages))
for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
original_nested_pipeline_model.stages):
self.assertEqual(loadedStage.uid, originalStage.uid)
def test_user_specified_folds(self):
from pyspark.sql import functions as F
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"]).repartition(2, "features")
dataset_with_folds = dataset.repartition(1).withColumn("random", rand(100)) \
.withColumn("fold", F.when(F.col("random") < 0.33, 0)
.when(F.col("random") < 0.66, 1)
.otherwise(2)).repartition(2, "features")
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [20]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3)
cv_with_user_folds = CrossValidator(estimator=lr,
estimatorParamMaps=grid,
evaluator=evaluator,
numFolds=3,
foldCol="fold")
self.assertEqual(cv.getEstimator().uid, cv_with_user_folds.getEstimator().uid)
cvModel1 = cv.fit(dataset)
cvModel2 = cv_with_user_folds.fit(dataset_with_folds)
for index in range(len(cvModel1.avgMetrics)):
print(abs(cvModel1.avgMetrics[index] - cvModel2.avgMetrics[index]))
self.assertTrue(abs(cvModel1.avgMetrics[index] - cvModel2.avgMetrics[index])
< 0.1)
# test save/load of CrossValidator
temp_path = tempfile.mkdtemp()
cvPath = temp_path + "/cv"
cv_with_user_folds.save(cvPath)
loadedCV = CrossValidator.load(cvPath)
self.assertEqual(loadedCV.getFoldCol(), cv_with_user_folds.getFoldCol())
def test_invalid_user_specified_folds(self):
dataset_with_folds = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0, 0),
(Vectors.dense([0.4]), 1.0, 1),
(Vectors.dense([0.5]), 0.0, 2),
(Vectors.dense([0.6]), 1.0, 0),
(Vectors.dense([1.0]), 1.0, 1)] * 10,
["features", "label", "fold"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [20]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr,
estimatorParamMaps=grid,
evaluator=evaluator,
numFolds=2,
foldCol="fold")
with self.assertRaisesRegexp(Exception, "Fold number must be in range"):
cv.fit(dataset_with_folds)
cv = CrossValidator(estimator=lr,
estimatorParamMaps=grid,
evaluator=evaluator,
numFolds=4,
foldCol="fold")
with self.assertRaisesRegexp(Exception, "The validation data at fold 3 is empty"):
cv.fit(dataset_with_folds)
class TrainValidationSplitTests(SparkSessionTestCase):
def test_fit_minimize_metric(self):
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")
grid = ParamGridBuilder() \
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
.build()
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
tvsModel = tvs.fit(dataset)
bestModel = tvsModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
validationMetrics = tvsModel.validationMetrics
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
self.assertEqual(len(grid), len(validationMetrics),
"validationMetrics has the same size of grid parameter")
self.assertEqual(0.0, min(validationMetrics))
def test_fit_maximize_metric(self):
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")
grid = ParamGridBuilder() \
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
.build()
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
tvsModel = tvs.fit(dataset)
bestModel = tvsModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
validationMetrics = tvsModel.validationMetrics
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
self.assertEqual(len(grid), len(validationMetrics),
"validationMetrics has the same size of grid parameter")
self.assertEqual(1.0, max(validationMetrics))
def test_save_load_trained_model(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(
estimator=lr,
estimatorParamMaps=grid,
evaluator=evaluator,
collectSubModels=True,
seed=42
)
tvsModel = tvs.fit(dataset)
lrModel = tvsModel.bestModel
lrModelPath = temp_path + "/lrModel"
lrModel.save(lrModelPath)
loadedLrModel = LogisticRegressionModel.load(lrModelPath)
self.assertEqual(loadedLrModel.uid, lrModel.uid)
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
tvsModelPath = temp_path + "/tvsModel"
tvsModel.save(tvsModelPath)
loadedTvsModel = TrainValidationSplitModel.load(tvsModelPath)
for param in [
lambda x: x.getSeed(),
lambda x: x.getTrainRatio(),
]:
self.assertEqual(param(tvsModel), param(loadedTvsModel))
self.assertTrue(all(
loadedTvsModel.isSet(param) for param in loadedTvsModel.params
))
def test_save_load_simple_estimator(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
tvsModel = tvs.fit(dataset)
tvsPath = temp_path + "/tvs"
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
self.assertEqual(loadedTvs.getEstimatorParamMaps(), tvs.getEstimatorParamMaps())
tvsModelPath = temp_path + "/tvsModel"
tvsModel.save(tvsModelPath)
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
def test_parallel_evaluation(self):
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
tvs.setParallelism(1)
tvsSerialModel = tvs.fit(dataset)
tvs.setParallelism(2)
tvsParallelModel = tvs.fit(dataset)
self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)
def test_expose_sub_models(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
collectSubModels=True)
tvsModel = tvs.fit(dataset)
self.assertEqual(len(tvsModel.subModels), len(grid))
# Test the default value for option "persistSubModel" to be "true"
testSubPath = temp_path + "/testTrainValidationSplitSubModels"
savingPathWithSubModels = testSubPath + "cvModel3"
tvsModel.save(savingPathWithSubModels)
tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels)
self.assertEqual(len(tvsModel3.subModels), len(grid))
tvsModel4 = tvsModel3.copy()
self.assertEqual(len(tvsModel4.subModels), len(grid))
savingPathWithoutSubModels = testSubPath + "cvModel2"
tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels)
self.assertEqual(tvsModel2.subModels, None)
for i in range(len(grid)):
self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid)
def test_save_load_nested_estimator(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(100)
lr2 = LogisticRegression().setMaxIter(150)
grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
evaluator = MulticlassClassificationEvaluator()
tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
tvsModel = tvs.fit(dataset)
tvsPath = temp_path + "/tvs"
tvs.save(tvsPath)
loadedTvs = TrainValidationSplit.load(tvsPath)
self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
originalParamMap = tvs.getEstimatorParamMaps()
loadedParamMap = loadedTvs.getEstimatorParamMaps()
for i, param in enumerate(loadedParamMap):
for p in param:
if p.name == "classifier":
self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
else:
self.assertEqual(param[p], originalParamMap[i][p])
tvsModelPath = temp_path + "/tvsModel"
tvsModel.save(tvsModelPath)
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
[SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model ### What changes were proposed in this pull request? Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model. Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers): * Pipeline * OneVsRest * CrossValidator * TrainValidationSplit But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer. Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write). OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug. In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test. Manually test in pyspark shell: 1) CrossValidator with Simple Pipeline estimator ``` from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), (4, "b spark who", 1.0), (5, "g d a y", 0.0), (6, "spark fly", 1.0), (7, "was mapreduce", 0.0), ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) paramGrid = ParamGridBuilder() \ .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \ .addGrid(lr.regParam, [0.1, 0.01]) \ .build() crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2) # use 3+ folds in practice # Run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(training) cvModel.save('/tmp/cv_model001') CrossValidatorModel.load('/tmp/cv_model001') ``` 2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator. ``` from pyspark.ml.linalg import Vectors from pyspark.ml import Estimator, Model from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest from pyspark.ml.evaluation import BinaryClassificationEvaluator, \ MulticlassClassificationEvaluator, RegressionEvaluator from pyspark.ml.linalg import Vectors from pyspark.ml.param import Param, Params from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \ TrainValidationSplit, TrainValidationSplitModel from pyspark.sql.functions import rand from pyspark.testing.mlutils import SparkSessionTestCase dataset = spark.createDataFrame( [(Vectors.dense([0.0]), 0.0), (Vectors.dense([0.4]), 1.0), (Vectors.dense([0.5]), 0.0), (Vectors.dense([0.6]), 1.0), (Vectors.dense([1.0]), 1.0)] * 10, ["features", "label"]) ova = OneVsRest(classifier=LogisticRegression()) lr1 = LogisticRegression().setMaxIter(100) lr2 = LogisticRegression().setMaxIter(150) grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build() evaluator = MulticlassClassificationEvaluator() pipeline = Pipeline(stages=[ova]) cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) cvModel.save('/tmp/model002') cvModel2 = CrossValidatorModel.load('/tmp/model002') ``` TrainValidationSplit testing code are similar so I do not paste them. Closes #28279 from WeichenXu123/fix_pipeline_tuning. Authored-by: Weichen Xu <weichen.xu@databricks.com> Signed-off-by: Xiangrui Meng <meng@databricks.com>
2020-04-27 00:04:14 -04:00
def test_save_load_pipeline_estimator(self):
temp_path = tempfile.mkdtemp()
training = self.spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
ova = OneVsRest(classifier=LogisticRegression())
lr1 = LogisticRegression().setMaxIter(5)
lr2 = LogisticRegression().setMaxIter(10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100]) \
.addGrid(ova.classifier, [lr1, lr2]) \
.build()
tvs = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
# Run train validation split, and choose the best set of parameters.
tvsModel = tvs.fit(training)
# test save/load of CrossValidatorModel
tvsModelPath = temp_path + "/tvsModel"
tvsModel.save(tvsModelPath)
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
self.assertEqual(len(loadedModel.bestModel.stages), len(tvsModel.bestModel.stages))
for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
tvsModel.bestModel.stages):
self.assertEqual(loadedStage.uid, originalStage.uid)
# Test nested pipeline
nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
tvs2 = TrainValidationSplit(estimator=nested_pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator())
# Run train validation split, and choose the best set of parameters.
tvsModel2 = tvs2.fit(training)
# test save/load of CrossValidatorModel
tvsModelPath2 = temp_path + "/tvsModel2"
tvsModel2.save(tvsModelPath2)
loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2)
self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid)
loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
original_nested_pipeline_model = tvsModel2.bestModel.stages[1]
self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
self.assertEqual(len(loaded_nested_pipeline_model.stages),
len(original_nested_pipeline_model.stages))
for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
original_nested_pipeline_model.stages):
self.assertEqual(loadedStage.uid, originalStage.uid)
def test_copy(self):
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")
grid = ParamGridBuilder() \
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
.build()
tvs = TrainValidationSplit(
estimator=iee,
estimatorParamMaps=grid,
evaluator=evaluator,
collectSubModels=True
)
tvsModel = tvs.fit(dataset)
tvsCopied = tvs.copy()
tvsModelCopied = tvsModel.copy()
for param in [
lambda x: x.getCollectSubModels(),
lambda x: x.getParallelism(),
lambda x: x.getSeed(),
lambda x: x.getTrainRatio(),
]:
self.assertEqual(param(tvs), param(tvsCopied))
for param in [
lambda x: x.getSeed(),
lambda x: x.getTrainRatio(),
]:
self.assertEqual(param(tvsModel), param(tvsModelCopied))
self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid,
"Copied TrainValidationSplit has the same uid of Estimator")
self.assertEqual(tvsModel.bestModel.uid, tvsModelCopied.bestModel.uid)
self.assertEqual(len(tvsModel.validationMetrics),
len(tvsModelCopied.validationMetrics),
"Copied validationMetrics has the same size of the original")
for index in range(len(tvsModel.validationMetrics)):
self.assertEqual(tvsModel.validationMetrics[index],
tvsModelCopied.validationMetrics[index])
tvsModel.validationMetrics[0] = 'foo'
self.assertNotEqual(
tvsModelCopied.validationMetrics[0],
'foo',
"Changing the original validationMetrics should not affect the copied model"
)
tvsModel.subModels[0].getInducedError = lambda: 'foo'
self.assertNotEqual(
tvsModelCopied.subModels[0].getInducedError(),
'foo',
"Changing the original subModels should not affect the copied model"
)
if __name__ == "__main__":
[SPARK-32319][PYSPARK] Disallow the use of unused imports Disallow the use of unused imports: - Unnecessary increases the memory footprint of the application - Removes the imports that are required for the examples in the docstring from the file-scope to the example itself. This keeps the files itself clean, and gives a more complete example as it also includes the imports :) ``` fokkodriesprongFan spark % flake8 python | grep -i "imported but unused" python/pyspark/cloudpickle.py:46:1: F401 'functools.partial' imported but unused python/pyspark/cloudpickle.py:55:1: F401 'traceback' imported but unused python/pyspark/heapq3.py:868:5: F401 '_heapq.*' imported but unused python/pyspark/__init__.py:61:1: F401 'pyspark.version.__version__' imported but unused python/pyspark/__init__.py:62:1: F401 'pyspark._globals._NoValue' imported but unused python/pyspark/__init__.py:115:1: F401 'pyspark.sql.SQLContext' imported but unused python/pyspark/__init__.py:115:1: F401 'pyspark.sql.HiveContext' imported but unused python/pyspark/__init__.py:115:1: F401 'pyspark.sql.Row' imported but unused python/pyspark/rdd.py:21:1: F401 're' imported but unused python/pyspark/rdd.py:29:1: F401 'tempfile.NamedTemporaryFile' imported but unused python/pyspark/mllib/regression.py:26:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/clustering.py:28:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/clustering.py:28:1: F401 'pyspark.mllib.linalg.DenseVector' imported but unused python/pyspark/mllib/classification.py:26:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/feature.py:28:1: F401 'pyspark.mllib.linalg.DenseVector' imported but unused python/pyspark/mllib/feature.py:28:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/feature.py:30:1: F401 'pyspark.mllib.regression.LabeledPoint' imported but unused python/pyspark/mllib/tests/test_linalg.py:18:1: F401 'sys' imported but unused python/pyspark/mllib/tests/test_linalg.py:642:5: F401 'pyspark.mllib.tests.test_linalg.*' imported but unused python/pyspark/mllib/tests/test_feature.py:21:1: F401 'numpy.random' imported but unused python/pyspark/mllib/tests/test_feature.py:21:1: F401 'numpy.exp' imported but unused python/pyspark/mllib/tests/test_feature.py:23:1: F401 'pyspark.mllib.linalg.Vector' imported but unused python/pyspark/mllib/tests/test_feature.py:23:1: F401 'pyspark.mllib.linalg.VectorUDT' imported but unused python/pyspark/mllib/tests/test_feature.py:185:5: F401 'pyspark.mllib.tests.test_feature.*' imported but unused python/pyspark/mllib/tests/test_util.py:97:5: F401 'pyspark.mllib.tests.test_util.*' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.Vector' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.DenseVector' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.VectorUDT' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg._convert_to_vector' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.DenseMatrix' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.SparseMatrix' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.MatrixUDT' imported but unused python/pyspark/mllib/tests/test_stat.py:181:5: F401 'pyspark.mllib.tests.test_stat.*' imported but unused python/pyspark/mllib/tests/test_streaming_algorithms.py:18:1: F401 'time.time' imported but unused python/pyspark/mllib/tests/test_streaming_algorithms.py:18:1: F401 'time.sleep' imported but unused python/pyspark/mllib/tests/test_streaming_algorithms.py:470:5: F401 'pyspark.mllib.tests.test_streaming_algorithms.*' imported but unused python/pyspark/mllib/tests/test_algorithms.py:295:5: F401 'pyspark.mllib.tests.test_algorithms.*' imported but unused python/pyspark/tests/test_serializers.py:90:13: F401 'xmlrunner' imported but unused python/pyspark/tests/test_rdd.py:21:1: F401 'sys' imported but unused python/pyspark/tests/test_rdd.py:29:1: F401 'pyspark.resource.ResourceProfile' imported but unused python/pyspark/tests/test_rdd.py:885:5: F401 'pyspark.tests.test_rdd.*' imported but unused python/pyspark/tests/test_readwrite.py:19:1: F401 'sys' imported but unused python/pyspark/tests/test_readwrite.py:22:1: F401 'array.array' imported but unused python/pyspark/tests/test_readwrite.py:309:5: F401 'pyspark.tests.test_readwrite.*' imported but unused python/pyspark/tests/test_join.py:62:5: F401 'pyspark.tests.test_join.*' imported but unused python/pyspark/tests/test_taskcontext.py:19:1: F401 'shutil' imported but unused python/pyspark/tests/test_taskcontext.py:325:5: F401 'pyspark.tests.test_taskcontext.*' imported but unused python/pyspark/tests/test_conf.py:36:5: F401 'pyspark.tests.test_conf.*' imported but unused python/pyspark/tests/test_broadcast.py:148:5: F401 'pyspark.tests.test_broadcast.*' imported but unused python/pyspark/tests/test_daemon.py:76:5: F401 'pyspark.tests.test_daemon.*' imported but unused python/pyspark/tests/test_util.py:77:5: F401 'pyspark.tests.test_util.*' imported but unused python/pyspark/tests/test_pin_thread.py:19:1: F401 'random' imported but unused python/pyspark/tests/test_pin_thread.py:149:5: F401 'pyspark.tests.test_pin_thread.*' imported but unused python/pyspark/tests/test_worker.py:19:1: F401 'sys' imported but unused python/pyspark/tests/test_worker.py:26:5: F401 'resource' imported but unused python/pyspark/tests/test_worker.py:203:5: F401 'pyspark.tests.test_worker.*' imported but unused python/pyspark/tests/test_profiler.py:101:5: F401 'pyspark.tests.test_profiler.*' imported but unused python/pyspark/tests/test_shuffle.py:18:1: F401 'sys' imported but unused python/pyspark/tests/test_shuffle.py:171:5: F401 'pyspark.tests.test_shuffle.*' imported but unused python/pyspark/tests/test_rddbarrier.py:43:5: F401 'pyspark.tests.test_rddbarrier.*' imported but unused python/pyspark/tests/test_context.py:129:13: F401 'userlibrary.UserClass' imported but unused python/pyspark/tests/test_context.py:140:13: F401 'userlib.UserClass' imported but unused python/pyspark/tests/test_context.py:310:5: F401 'pyspark.tests.test_context.*' imported but unused python/pyspark/tests/test_appsubmit.py:241:5: F401 'pyspark.tests.test_appsubmit.*' imported but unused python/pyspark/streaming/dstream.py:18:1: F401 'sys' imported but unused python/pyspark/streaming/tests/test_dstream.py:27:1: F401 'pyspark.RDD' imported but unused python/pyspark/streaming/tests/test_dstream.py:647:5: F401 'pyspark.streaming.tests.test_dstream.*' imported but unused python/pyspark/streaming/tests/test_kinesis.py:83:5: F401 'pyspark.streaming.tests.test_kinesis.*' imported but unused python/pyspark/streaming/tests/test_listener.py:152:5: F401 'pyspark.streaming.tests.test_listener.*' imported but unused python/pyspark/streaming/tests/test_context.py:178:5: F401 'pyspark.streaming.tests.test_context.*' imported but unused python/pyspark/testing/utils.py:30:5: F401 'scipy.sparse' imported but unused python/pyspark/testing/utils.py:36:5: F401 'numpy as np' imported but unused python/pyspark/ml/regression.py:25:1: F401 'pyspark.ml.tree._TreeEnsembleParams' imported but unused python/pyspark/ml/regression.py:25:1: F401 'pyspark.ml.tree._HasVarianceImpurity' imported but unused python/pyspark/ml/regression.py:29:1: F401 'pyspark.ml.wrapper.JavaParams' imported but unused python/pyspark/ml/util.py:19:1: F401 'sys' imported but unused python/pyspark/ml/__init__.py:25:1: F401 'pyspark.ml.pipeline' imported but unused python/pyspark/ml/pipeline.py:18:1: F401 'sys' imported but unused python/pyspark/ml/stat.py:22:1: F401 'pyspark.ml.linalg.DenseMatrix' imported but unused python/pyspark/ml/stat.py:22:1: F401 'pyspark.ml.linalg.Vectors' imported but unused python/pyspark/ml/tests/test_training_summary.py:18:1: F401 'sys' imported but unused python/pyspark/ml/tests/test_training_summary.py:364:5: F401 'pyspark.ml.tests.test_training_summary.*' imported but unused python/pyspark/ml/tests/test_linalg.py:381:5: F401 'pyspark.ml.tests.test_linalg.*' imported but unused python/pyspark/ml/tests/test_tuning.py:427:9: F401 'pyspark.sql.functions as F' imported but unused python/pyspark/ml/tests/test_tuning.py:757:5: F401 'pyspark.ml.tests.test_tuning.*' imported but unused python/pyspark/ml/tests/test_wrapper.py:120:5: F401 'pyspark.ml.tests.test_wrapper.*' imported but unused python/pyspark/ml/tests/test_feature.py:19:1: F401 'sys' imported but unused python/pyspark/ml/tests/test_feature.py:304:5: F401 'pyspark.ml.tests.test_feature.*' imported but unused python/pyspark/ml/tests/test_image.py:19:1: F401 'py4j' imported but unused python/pyspark/ml/tests/test_image.py:22:1: F401 'pyspark.testing.mlutils.PySparkTestCase' imported but unused python/pyspark/ml/tests/test_image.py:71:5: F401 'pyspark.ml.tests.test_image.*' imported but unused python/pyspark/ml/tests/test_persistence.py:456:5: F401 'pyspark.ml.tests.test_persistence.*' imported but unused python/pyspark/ml/tests/test_evaluation.py:56:5: F401 'pyspark.ml.tests.test_evaluation.*' imported but unused python/pyspark/ml/tests/test_stat.py:43:5: F401 'pyspark.ml.tests.test_stat.*' imported but unused python/pyspark/ml/tests/test_base.py:70:5: F401 'pyspark.ml.tests.test_base.*' imported but unused python/pyspark/ml/tests/test_param.py:20:1: F401 'sys' imported but unused python/pyspark/ml/tests/test_param.py:375:5: F401 'pyspark.ml.tests.test_param.*' imported but unused python/pyspark/ml/tests/test_pipeline.py:62:5: F401 'pyspark.ml.tests.test_pipeline.*' imported but unused python/pyspark/ml/tests/test_algorithms.py:333:5: F401 'pyspark.ml.tests.test_algorithms.*' imported but unused python/pyspark/ml/param/__init__.py:18:1: F401 'sys' imported but unused python/pyspark/resource/tests/test_resources.py:17:1: F401 'random' imported but unused python/pyspark/resource/tests/test_resources.py:20:1: F401 'pyspark.resource.ResourceProfile' imported but unused python/pyspark/resource/tests/test_resources.py:75:5: F401 'pyspark.resource.tests.test_resources.*' imported but unused python/pyspark/sql/functions.py:32:1: F401 'pyspark.sql.udf.UserDefinedFunction' imported but unused python/pyspark/sql/functions.py:34:1: F401 'pyspark.sql.pandas.functions.pandas_udf' imported but unused python/pyspark/sql/session.py:30:1: F401 'pyspark.sql.types.Row' imported but unused python/pyspark/sql/session.py:30:1: F401 'pyspark.sql.types.StringType' imported but unused python/pyspark/sql/readwriter.py:1084:5: F401 'pyspark.sql.Row' imported but unused python/pyspark/sql/context.py:26:1: F401 'pyspark.sql.types.IntegerType' imported but unused python/pyspark/sql/context.py:26:1: F401 'pyspark.sql.types.Row' imported but unused python/pyspark/sql/context.py:26:1: F401 'pyspark.sql.types.StringType' imported but unused python/pyspark/sql/context.py:27:1: F401 'pyspark.sql.udf.UDFRegistration' imported but unused python/pyspark/sql/streaming.py:1212:5: F401 'pyspark.sql.Row' imported but unused python/pyspark/sql/tests/test_utils.py:55:5: F401 'pyspark.sql.tests.test_utils.*' imported but unused python/pyspark/sql/tests/test_pandas_map.py:18:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_pandas_map.py:22:1: F401 'pyspark.sql.functions.pandas_udf' imported but unused python/pyspark/sql/tests/test_pandas_map.py:22:1: F401 'pyspark.sql.functions.PandasUDFType' imported but unused python/pyspark/sql/tests/test_pandas_map.py:119:5: F401 'pyspark.sql.tests.test_pandas_map.*' imported but unused python/pyspark/sql/tests/test_catalog.py:193:5: F401 'pyspark.sql.tests.test_catalog.*' imported but unused python/pyspark/sql/tests/test_group.py:39:5: F401 'pyspark.sql.tests.test_group.*' imported but unused python/pyspark/sql/tests/test_session.py:361:5: F401 'pyspark.sql.tests.test_session.*' imported but unused python/pyspark/sql/tests/test_conf.py:49:5: F401 'pyspark.sql.tests.test_conf.*' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:19:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:21:1: F401 'pyspark.sql.functions.sum' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:21:1: F401 'pyspark.sql.functions.PandasUDFType' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:29:5: F401 'pandas.util.testing.assert_series_equal' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:32:5: F401 'pyarrow as pa' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:248:5: F401 'pyspark.sql.tests.test_pandas_cogrouped_map.*' imported but unused python/pyspark/sql/tests/test_udf.py:24:1: F401 'py4j' imported but unused python/pyspark/sql/tests/test_pandas_udf_typehints.py:246:5: F401 'pyspark.sql.tests.test_pandas_udf_typehints.*' imported but unused python/pyspark/sql/tests/test_functions.py:19:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_functions.py:362:9: F401 'pyspark.sql.functions.exists' imported but unused python/pyspark/sql/tests/test_functions.py:387:5: F401 'pyspark.sql.tests.test_functions.*' imported but unused python/pyspark/sql/tests/test_pandas_udf_scalar.py:21:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_pandas_udf_scalar.py:45:5: F401 'pyarrow as pa' imported but unused python/pyspark/sql/tests/test_pandas_udf_window.py:355:5: F401 'pyspark.sql.tests.test_pandas_udf_window.*' imported but unused python/pyspark/sql/tests/test_arrow.py:38:5: F401 'pyarrow as pa' imported but unused python/pyspark/sql/tests/test_pandas_grouped_map.py:20:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_pandas_grouped_map.py:38:5: F401 'pyarrow as pa' imported but unused python/pyspark/sql/tests/test_dataframe.py:382:9: F401 'pyspark.sql.DataFrame' imported but unused python/pyspark/sql/avro/functions.py:125:5: F401 'pyspark.sql.Row' imported but unused python/pyspark/sql/pandas/functions.py:19:1: F401 'sys' imported but unused ``` After: ``` fokkodriesprongFan spark % flake8 python | grep -i "imported but unused" fokkodriesprongFan spark % ``` ### What changes were proposed in this pull request? Removing unused imports from the Python files to keep everything nice and tidy. ### Why are the changes needed? Cleaning up of the imports that aren't used, and suppressing the imports that are used as references to other modules, preserving backward compatibility. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Adding the rule to the existing Flake8 checks. Closes #29121 from Fokko/SPARK-32319. Authored-by: Fokko Driesprong <fokko@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-08 11:51:57 -04:00
from pyspark.ml.tests.test_tuning import * # noqa: F401
try:
import xmlrunner # type: ignore[import]
[SPARK-28130][PYTHON] Print pretty messages for skipped tests when xmlrunner is available in PySpark ## What changes were proposed in this pull request? Currently, pretty skipped message added by https://github.com/apache/spark/commit/f7435bec6a9348cfbbe26b13c230c08545d16067 mechanism seems not working when xmlrunner is installed apparently. This PR fixes two things: 1. When `xmlrunner` is installed, seems `xmlrunner` does not respect `vervosity` level in unittests (default is level 1). So the output looks as below ``` Running tests... ---------------------------------------------------------------------- SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ---------------------------------------------------------------------- ``` So it is not caught by our message detection mechanism. 2. If we manually set the `vervocity` level to `xmlrunner`, it prints messages as below: ``` test_mixed_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s) test_mixed_udf_and_sql (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s) ... ``` This is different in our Jenkins machine: ``` test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.' test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.' ... ``` Note that last `SKIP` is different. This PR fixes the regular expression to catch `SKIP` case as well. ## How was this patch tested? Manually tested. **Before:** ``` Starting test(python2.7): pyspark.... Finished test(python2.7): pyspark.... (0s) ... Tests passed in 562 seconds ======================================================================== ... ``` **After:** ``` Starting test(python2.7): pyspark.... Finished test(python2.7): pyspark.... (48s) ... 93 tests were skipped ... Tests passed in 560 seconds Skipped tests pyspark.... with python2.7: pyspark...(...) ... SKIP (0.000s) ... ======================================================================== ... ``` Closes #24927 from HyukjinKwon/SPARK-28130. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-23 20:58:17 -04:00
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)