diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 09e0748ffb..53d07ec966 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext from pyspark.ml.base import Estimator, Model, Transformer from pyspark.ml.param import Param, Params from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaParams -from pyspark.ml.common import inherit_doc +from pyspark.ml.wrapper import JavaParams, JavaWrapper +from pyspark.ml.common import inherit_doc, _java2py, _py2java @inherit_doc @@ -174,6 +174,55 @@ class Pipeline(Estimator, MLReadable, MLWritable): return _java_obj + def _make_java_param_pair(self, param, value): + """ + Makes a Java param pair. + """ + sc = SparkContext._active_spark_context + param = self._resolveParam(param) + java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent, param.name, param.doc) + if isinstance(value, Params) and hasattr(value, "_to_java"): + # Convert JavaEstimator/JavaTransformer object or Estimator/Transformer object which + # implements `_to_java` method (such as OneVsRest, Pipeline object) to java object. + # used in the case of an estimator having another estimator as a parameter + # the reason why this is not in _py2java in common.py is that importing + # Estimator and Model in common.py results in a circular import with inherit_doc + java_value = value._to_java() + else: + java_value = _py2java(sc, value) + return java_param.w(java_value) + + def _transfer_param_map_to_java(self, pyParamMap): + """ + Transforms a Python ParamMap into a Java ParamMap. + """ + paramMap = JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap") + for param in self.params: + if param in pyParamMap: + pair = self._make_java_param_pair(param, pyParamMap[param]) + paramMap.put([pair]) + return paramMap + + def _transfer_param_map_from_java(self, javaParamMap): + """ + Transforms a Java ParamMap into a Python ParamMap. + """ + sc = SparkContext._active_spark_context + paramMap = dict() + for pair in javaParamMap.toList(): + param = pair.param() + if self.hasParam(str(param.name())): + java_obj = pair.value() + if sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj): + # Note: JavaParams._from_java support both JavaEstimator/JavaTransformer class + # and Estimator/Transformer class which implements `_from_java` static method + # (such as OneVsRest, Pipeline class). + py_obj = JavaParams._from_java(java_obj) + else: + py_obj = _java2py(sc, java_obj) + paramMap[self.getParam(param.name())] = py_obj + return paramMap + @inherit_doc class PipelineWriter(MLWriter): diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index 9d8ba37c60..6bcc3f93e1 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -18,7 +18,8 @@ import tempfile import unittest -from pyspark.ml import Estimator, Model +from pyspark.ml.feature import HashingTF, Tokenizer +from pyspark.ml import Estimator, Pipeline, Model from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest from pyspark.ml.evaluation import BinaryClassificationEvaluator, \ MulticlassClassificationEvaluator, RegressionEvaluator @@ -310,6 +311,75 @@ class CrossValidatorTests(SparkSessionTestCase): loadedModel = CrossValidatorModel.load(cvModelPath) self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) + def test_save_load_pipeline_estimator(self): + temp_path = tempfile.mkdtemp() + training = self.spark.createDataFrame([ + (0, "a b c d e spark", 1.0), + (1, "b d", 0.0), + (2, "spark f g h", 1.0), + (3, "hadoop mapreduce", 0.0), + (4, "b spark who", 1.0), + (5, "g d a y", 0.0), + (6, "spark fly", 1.0), + (7, "was mapreduce", 0.0), + ], ["id", "text", "label"]) + + # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. + tokenizer = Tokenizer(inputCol="text", outputCol="words") + hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") + + ova = OneVsRest(classifier=LogisticRegression()) + lr1 = LogisticRegression().setMaxIter(5) + lr2 = LogisticRegression().setMaxIter(10) + + pipeline = Pipeline(stages=[tokenizer, hashingTF, ova]) + + paramGrid = ParamGridBuilder() \ + .addGrid(hashingTF.numFeatures, [10, 100]) \ + .addGrid(ova.classifier, [lr1, lr2]) \ + .build() + + crossval = CrossValidator(estimator=pipeline, + estimatorParamMaps=paramGrid, + evaluator=MulticlassClassificationEvaluator(), + numFolds=2) # use 3+ folds in practice + + # Run cross-validation, and choose the best set of parameters. + cvModel = crossval.fit(training) + + # test save/load of CrossValidatorModel + cvModelPath = temp_path + "/cvModel" + cvModel.save(cvModelPath) + loadedModel = CrossValidatorModel.load(cvModelPath) + self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) + self.assertEqual(len(loadedModel.bestModel.stages), len(cvModel.bestModel.stages)) + for loadedStage, originalStage in zip(loadedModel.bestModel.stages, + cvModel.bestModel.stages): + self.assertEqual(loadedStage.uid, originalStage.uid) + + # Test nested pipeline + nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])]) + crossval2 = CrossValidator(estimator=nested_pipeline, + estimatorParamMaps=paramGrid, + evaluator=MulticlassClassificationEvaluator(), + numFolds=2) # use 3+ folds in practice + + # Run cross-validation, and choose the best set of parameters. + cvModel2 = crossval2.fit(training) + # test save/load of CrossValidatorModel + cvModelPath2 = temp_path + "/cvModel2" + cvModel2.save(cvModelPath2) + loadedModel2 = CrossValidatorModel.load(cvModelPath2) + self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid) + loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1] + original_nested_pipeline_model = cvModel2.bestModel.stages[1] + self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid) + self.assertEqual(len(loaded_nested_pipeline_model.stages), + len(original_nested_pipeline_model.stages)) + for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages, + original_nested_pipeline_model.stages): + self.assertEqual(loadedStage.uid, originalStage.uid) + class TrainValidationSplitTests(SparkSessionTestCase): @@ -511,6 +581,73 @@ class TrainValidationSplitTests(SparkSessionTestCase): loadedModel = TrainValidationSplitModel.load(tvsModelPath) self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) + def test_save_load_pipeline_estimator(self): + temp_path = tempfile.mkdtemp() + training = self.spark.createDataFrame([ + (0, "a b c d e spark", 1.0), + (1, "b d", 0.0), + (2, "spark f g h", 1.0), + (3, "hadoop mapreduce", 0.0), + (4, "b spark who", 1.0), + (5, "g d a y", 0.0), + (6, "spark fly", 1.0), + (7, "was mapreduce", 0.0), + ], ["id", "text", "label"]) + + # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. + tokenizer = Tokenizer(inputCol="text", outputCol="words") + hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") + + ova = OneVsRest(classifier=LogisticRegression()) + lr1 = LogisticRegression().setMaxIter(5) + lr2 = LogisticRegression().setMaxIter(10) + + pipeline = Pipeline(stages=[tokenizer, hashingTF, ova]) + + paramGrid = ParamGridBuilder() \ + .addGrid(hashingTF.numFeatures, [10, 100]) \ + .addGrid(ova.classifier, [lr1, lr2]) \ + .build() + + tvs = TrainValidationSplit(estimator=pipeline, + estimatorParamMaps=paramGrid, + evaluator=MulticlassClassificationEvaluator()) + + # Run train validation split, and choose the best set of parameters. + tvsModel = tvs.fit(training) + + # test save/load of CrossValidatorModel + tvsModelPath = temp_path + "/tvsModel" + tvsModel.save(tvsModelPath) + loadedModel = TrainValidationSplitModel.load(tvsModelPath) + self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) + self.assertEqual(len(loadedModel.bestModel.stages), len(tvsModel.bestModel.stages)) + for loadedStage, originalStage in zip(loadedModel.bestModel.stages, + tvsModel.bestModel.stages): + self.assertEqual(loadedStage.uid, originalStage.uid) + + # Test nested pipeline + nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])]) + tvs2 = TrainValidationSplit(estimator=nested_pipeline, + estimatorParamMaps=paramGrid, + evaluator=MulticlassClassificationEvaluator()) + + # Run train validation split, and choose the best set of parameters. + tvsModel2 = tvs2.fit(training) + # test save/load of CrossValidatorModel + tvsModelPath2 = temp_path + "/tvsModel2" + tvsModel2.save(tvsModelPath2) + loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2) + self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid) + loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1] + original_nested_pipeline_model = tvsModel2.bestModel.stages[1] + self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid) + self.assertEqual(len(loaded_nested_pipeline_model.stages), + len(original_nested_pipeline_model.stages)) + for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages, + original_nested_pipeline_model.stages): + self.assertEqual(loadedStage.uid, originalStage.uid) + def test_copy(self): dataset = self.spark.createDataFrame([ (10, 10.0),