spark-instrumented-optimizer/python/pyspark/ml/tests.py
JeremyNixon 511d4929c8 [SPARK-12877][ML] Add train-validation-split to pyspark
## What changes were proposed in this pull request?
The changes proposed were to add train-validation-split to pyspark.ml.tuning.

## How was the this patch tested?
This patch was tested through unit tests located in pyspark/ml/test.py.

This is my original work and I license it to Spark.

Author: JeremyNixon <jnixon2@gmail.com>

Closes #11335 from JeremyNixon/tvs_pyspark.
2016-03-03 09:50:05 -08:00

503 lines
18 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Unit tests for Spark ML Python APIs.
"""
import sys
try:
import xmlrunner
except ImportError:
xmlrunner = None
if sys.version_info[:2] <= (2, 6):
try:
import unittest2 as unittest
except ImportError:
sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
sys.exit(1)
else:
import unittest
from shutil import rmtree
import tempfile
from pyspark.ml import Estimator, Model, Pipeline, Transformer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import *
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import *
from pyspark.ml.util import keyword_only
from pyspark.mllib.linalg import DenseVector
from pyspark.sql import DataFrame, SQLContext, Row
from pyspark.sql.functions import rand
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
class MockDataset(DataFrame):
def __init__(self):
self.index = 0
class HasFake(Params):
def __init__(self):
super(HasFake, self).__init__()
self.fake = Param(self, "fake", "fake param")
def getFake(self):
return self.getOrDefault(self.fake)
class MockTransformer(Transformer, HasFake):
def __init__(self):
super(MockTransformer, self).__init__()
self.dataset_index = None
def _transform(self, dataset):
self.dataset_index = dataset.index
dataset.index += 1
return dataset
class MockEstimator(Estimator, HasFake):
def __init__(self):
super(MockEstimator, self).__init__()
self.dataset_index = None
def _fit(self, dataset):
self.dataset_index = dataset.index
model = MockModel()
self._copyValues(model)
return model
class MockModel(MockTransformer, Model, HasFake):
pass
class ParamTypeConversionTests(PySparkTestCase):
"""
Test that param type conversion happens.
"""
def test_int_to_float(self):
from pyspark.mllib.linalg import Vectors
df = self.sc.parallelize([
Row(label=1.0, weight=2.0, features=Vectors.dense(1.0))]).toDF()
lr = LogisticRegression(elasticNetParam=0)
lr.fit(df)
lr.setElasticNetParam(0)
lr.fit(df)
def test_invalid_to_float(self):
from pyspark.mllib.linalg import Vectors
self.assertRaises(Exception, lambda: LogisticRegression(elasticNetParam="happy"))
lr = LogisticRegression(elasticNetParam=0)
self.assertRaises(Exception, lambda: lr.setElasticNetParam("panda"))
class PipelineTests(PySparkTestCase):
def test_pipeline(self):
dataset = MockDataset()
estimator0 = MockEstimator()
transformer1 = MockTransformer()
estimator2 = MockEstimator()
transformer3 = MockTransformer()
pipeline = Pipeline(stages=[estimator0, transformer1, estimator2, transformer3])
pipeline_model = pipeline.fit(dataset, {estimator0.fake: 0, transformer1.fake: 1})
model0, transformer1, model2, transformer3 = pipeline_model.stages
self.assertEqual(0, model0.dataset_index)
self.assertEqual(0, model0.getFake())
self.assertEqual(1, transformer1.dataset_index)
self.assertEqual(1, transformer1.getFake())
self.assertEqual(2, dataset.index)
self.assertIsNone(model2.dataset_index, "The last model shouldn't be called in fit.")
self.assertIsNone(transformer3.dataset_index,
"The last transformer shouldn't be called in fit.")
dataset = pipeline_model.transform(dataset)
self.assertEqual(2, model0.dataset_index)
self.assertEqual(3, transformer1.dataset_index)
self.assertEqual(4, model2.dataset_index)
self.assertEqual(5, transformer3.dataset_index)
self.assertEqual(6, dataset.index)
class TestParams(HasMaxIter, HasInputCol, HasSeed):
"""
A subclass of Params mixed with HasMaxIter, HasInputCol and HasSeed.
"""
@keyword_only
def __init__(self, seed=None):
super(TestParams, self).__init__()
self._setDefault(maxIter=10)
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, seed=None):
"""
setParams(self, seed=None)
Sets params for this test.
"""
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
class OtherTestParams(HasMaxIter, HasInputCol, HasSeed):
"""
A subclass of Params mixed with HasMaxIter, HasInputCol and HasSeed.
"""
@keyword_only
def __init__(self, seed=None):
super(OtherTestParams, self).__init__()
self._setDefault(maxIter=10)
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, seed=None):
"""
setParams(self, seed=None)
Sets params for this test.
"""
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
class ParamTests(PySparkTestCase):
def test_copy_new_parent(self):
testParams = TestParams()
# Copying an instantiated param should fail
with self.assertRaises(ValueError):
testParams.maxIter._copy_new_parent(testParams)
# Copying a dummy param should succeed
TestParams.maxIter._copy_new_parent(testParams)
maxIter = testParams.maxIter
self.assertEqual(maxIter.name, "maxIter")
self.assertEqual(maxIter.doc, "max number of iterations (>= 0).")
self.assertTrue(maxIter.parent == testParams.uid)
def test_param(self):
testParams = TestParams()
maxIter = testParams.maxIter
self.assertEqual(maxIter.name, "maxIter")
self.assertEqual(maxIter.doc, "max number of iterations (>= 0).")
self.assertTrue(maxIter.parent == testParams.uid)
def test_hasparam(self):
testParams = TestParams()
self.assertTrue(all([testParams.hasParam(p.name) for p in testParams.params]))
self.assertFalse(testParams.hasParam("notAParameter"))
def test_params(self):
testParams = TestParams()
maxIter = testParams.maxIter
inputCol = testParams.inputCol
seed = testParams.seed
params = testParams.params
self.assertEqual(params, [inputCol, maxIter, seed])
self.assertTrue(testParams.hasParam(maxIter.name))
self.assertTrue(testParams.hasDefault(maxIter))
self.assertFalse(testParams.isSet(maxIter))
self.assertTrue(testParams.isDefined(maxIter))
self.assertEqual(testParams.getMaxIter(), 10)
testParams.setMaxIter(100)
self.assertTrue(testParams.isSet(maxIter))
self.assertEqual(testParams.getMaxIter(), 100)
self.assertTrue(testParams.hasParam(inputCol.name))
self.assertFalse(testParams.hasDefault(inputCol))
self.assertFalse(testParams.isSet(inputCol))
self.assertFalse(testParams.isDefined(inputCol))
with self.assertRaises(KeyError):
testParams.getInputCol()
# Since the default is normally random, set it to a known number for debug str
testParams._setDefault(seed=41)
testParams.setSeed(43)
self.assertEqual(
testParams.explainParams(),
"\n".join(["inputCol: input column name. (undefined)",
"maxIter: max number of iterations (>= 0). (default: 10, current: 100)",
"seed: random seed. (default: 41, current: 43)"]))
def test_kmeans_param(self):
algo = KMeans()
self.assertEqual(algo.getInitMode(), "k-means||")
algo.setK(10)
self.assertEqual(algo.getK(), 10)
algo.setInitSteps(10)
self.assertEqual(algo.getInitSteps(), 10)
def test_hasseed(self):
noSeedSpecd = TestParams()
withSeedSpecd = TestParams(seed=42)
other = OtherTestParams()
# Check that we no longer use 42 as the magic number
self.assertNotEqual(noSeedSpecd.getSeed(), 42)
origSeed = noSeedSpecd.getSeed()
# Check that we only compute the seed once
self.assertEqual(noSeedSpecd.getSeed(), origSeed)
# Check that a specified seed is honored
self.assertEqual(withSeedSpecd.getSeed(), 42)
# Check that a different class has a different seed
self.assertNotEqual(other.getSeed(), noSeedSpecd.getSeed())
class FeatureTests(PySparkTestCase):
def test_binarizer(self):
b0 = Binarizer()
self.assertListEqual(b0.params, [b0.inputCol, b0.outputCol, b0.threshold])
self.assertTrue(all([~b0.isSet(p) for p in b0.params]))
self.assertTrue(b0.hasDefault(b0.threshold))
self.assertEqual(b0.getThreshold(), 0.0)
b0.setParams(inputCol="input", outputCol="output").setThreshold(1.0)
self.assertTrue(all([b0.isSet(p) for p in b0.params]))
self.assertEqual(b0.getThreshold(), 1.0)
self.assertEqual(b0.getInputCol(), "input")
self.assertEqual(b0.getOutputCol(), "output")
b0c = b0.copy({b0.threshold: 2.0})
self.assertEqual(b0c.uid, b0.uid)
self.assertListEqual(b0c.params, b0.params)
self.assertEqual(b0c.getThreshold(), 2.0)
b1 = Binarizer(threshold=2.0, inputCol="input", outputCol="output")
self.assertNotEqual(b1.uid, b0.uid)
self.assertEqual(b1.getThreshold(), 2.0)
self.assertEqual(b1.getInputCol(), "input")
self.assertEqual(b1.getOutputCol(), "output")
def test_idf(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(DenseVector([1.0, 2.0]),),
(DenseVector([0.0, 1.0]),),
(DenseVector([3.0, 0.2]),)], ["tf"])
idf0 = IDF(inputCol="tf")
self.assertListEqual(idf0.params, [idf0.inputCol, idf0.minDocFreq, idf0.outputCol])
idf0m = idf0.fit(dataset, {idf0.outputCol: "idf"})
self.assertEqual(idf0m.uid, idf0.uid,
"Model should inherit the UID from its parent estimator.")
output = idf0m.transform(dataset)
self.assertIsNotNone(output.head().idf)
def test_ngram(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
Row(input=["a", "b", "c", "d", "e"])])
ngram0 = NGram(n=4, inputCol="input", outputCol="output")
self.assertEqual(ngram0.getN(), 4)
self.assertEqual(ngram0.getInputCol(), "input")
self.assertEqual(ngram0.getOutputCol(), "output")
transformedDF = ngram0.transform(dataset)
self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"])
def test_stopwordsremover(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])])
stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output")
# Default
self.assertEqual(stopWordRemover.getInputCol(), "input")
transformedDF = stopWordRemover.transform(dataset)
self.assertEqual(transformedDF.head().output, ["panda"])
# Custom
stopwords = ["panda"]
stopWordRemover.setStopWords(stopwords)
self.assertEqual(stopWordRemover.getInputCol(), "input")
self.assertEqual(stopWordRemover.getStopWords(), stopwords)
transformedDF = stopWordRemover.transform(dataset)
self.assertEqual(transformedDF.head().output, ["a"])
class HasInducedError(Params):
def __init__(self):
super(HasInducedError, self).__init__()
self.inducedError = Param(self, "inducedError",
"Uniformly-distributed error added to feature")
def getInducedError(self):
return self.getOrDefault(self.inducedError)
class InducedErrorModel(Model, HasInducedError):
def __init__(self):
super(InducedErrorModel, self).__init__()
def _transform(self, dataset):
return dataset.withColumn("prediction",
dataset.feature + (rand(0) * self.getInducedError()))
class InducedErrorEstimator(Estimator, HasInducedError):
def __init__(self, inducedError=1.0):
super(InducedErrorEstimator, self).__init__()
self._set(inducedError=inducedError)
def _fit(self, dataset):
model = InducedErrorModel()
self._copyValues(model)
return model
class CrossValidatorTests(PySparkTestCase):
def test_fit_minimize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
def test_fit_maximize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
class TrainValidationSplitTests(PySparkTestCase):
def test_fit_minimize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
tvsModel = tvs.fit(dataset)
bestModel = tvsModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
def test_fit_maximize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
tvsModel = tvs.fit(dataset)
bestModel = tvsModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
class PersistenceTest(PySparkTestCase):
def test_linear_regression(self):
lr = LinearRegression(maxIter=1)
path = tempfile.mkdtemp()
lr_path = path + "/lr"
lr.save(lr_path)
lr2 = LinearRegression.load(lr_path)
self.assertEqual(lr2.uid, lr2.maxIter.parent,
"Loaded LinearRegression instance uid (%s) did not match Param's uid (%s)"
% (lr2.uid, lr2.maxIter.parent))
self.assertEqual(lr._defaultParamMap[lr.maxIter], lr2._defaultParamMap[lr2.maxIter],
"Loaded LinearRegression instance default params did not match " +
"original defaults")
try:
rmtree(path)
except OSError:
pass
if __name__ == "__main__":
from pyspark.ml.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
else:
unittest.main()