9434280cfd
Changes: pyspark.ml Estimators can take either a list of param maps or a dict of params. This change allows the CrossValidator and TrainValidationSplit Estimators to pass through lists of param maps to the underlying estimators so that those estimators can handle parallelization when appropriate (eg distributed hyper parameter tuning). Testing: Existing unit tests. Author: Bago Amirbekian <bago@databricks.com> Closes #18077 from MrBago/delegate_params.
479 lines
17 KiB
Python
479 lines
17 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import itertools
|
|
import numpy as np
|
|
|
|
from pyspark import since, keyword_only
|
|
from pyspark.ml import Estimator, Model
|
|
from pyspark.ml.param import Params, Param, TypeConverters
|
|
from pyspark.ml.param.shared import HasSeed
|
|
from pyspark.sql.functions import rand
|
|
|
|
__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit',
|
|
'TrainValidationSplitModel']
|
|
|
|
|
|
class ParamGridBuilder(object):
|
|
r"""
|
|
Builder for a param grid used in grid search-based model selection.
|
|
|
|
>>> from pyspark.ml.classification import LogisticRegression
|
|
>>> lr = LogisticRegression()
|
|
>>> output = ParamGridBuilder() \
|
|
... .baseOn({lr.labelCol: 'l'}) \
|
|
... .baseOn([lr.predictionCol, 'p']) \
|
|
... .addGrid(lr.regParam, [1.0, 2.0]) \
|
|
... .addGrid(lr.maxIter, [1, 5]) \
|
|
... .build()
|
|
>>> expected = [
|
|
... {lr.regParam: 1.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'},
|
|
... {lr.regParam: 2.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'},
|
|
... {lr.regParam: 1.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'},
|
|
... {lr.regParam: 2.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}]
|
|
>>> len(output) == len(expected)
|
|
True
|
|
>>> all([m in expected for m in output])
|
|
True
|
|
|
|
.. versionadded:: 1.4.0
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._param_grid = {}
|
|
|
|
@since("1.4.0")
|
|
def addGrid(self, param, values):
|
|
"""
|
|
Sets the given parameters in this grid to fixed values.
|
|
"""
|
|
self._param_grid[param] = values
|
|
|
|
return self
|
|
|
|
@since("1.4.0")
|
|
def baseOn(self, *args):
|
|
"""
|
|
Sets the given parameters in this grid to fixed values.
|
|
Accepts either a parameter dictionary or a list of (parameter, value) pairs.
|
|
"""
|
|
if isinstance(args[0], dict):
|
|
self.baseOn(*args[0].items())
|
|
else:
|
|
for (param, value) in args:
|
|
self.addGrid(param, [value])
|
|
|
|
return self
|
|
|
|
@since("1.4.0")
|
|
def build(self):
|
|
"""
|
|
Builds and returns all combinations of parameters specified
|
|
by the param grid.
|
|
"""
|
|
keys = self._param_grid.keys()
|
|
grid_values = self._param_grid.values()
|
|
return [dict(zip(keys, prod)) for prod in itertools.product(*grid_values)]
|
|
|
|
|
|
class ValidatorParams(HasSeed):
|
|
"""
|
|
Common params for TrainValidationSplit and CrossValidator.
|
|
"""
|
|
|
|
estimator = Param(Params._dummy(), "estimator", "estimator to be cross-validated")
|
|
estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps")
|
|
evaluator = Param(
|
|
Params._dummy(), "evaluator",
|
|
"evaluator used to select hyper-parameters that maximize the validator metric")
|
|
|
|
def setEstimator(self, value):
|
|
"""
|
|
Sets the value of :py:attr:`estimator`.
|
|
"""
|
|
return self._set(estimator=value)
|
|
|
|
def getEstimator(self):
|
|
"""
|
|
Gets the value of estimator or its default value.
|
|
"""
|
|
return self.getOrDefault(self.estimator)
|
|
|
|
def setEstimatorParamMaps(self, value):
|
|
"""
|
|
Sets the value of :py:attr:`estimatorParamMaps`.
|
|
"""
|
|
return self._set(estimatorParamMaps=value)
|
|
|
|
def getEstimatorParamMaps(self):
|
|
"""
|
|
Gets the value of estimatorParamMaps or its default value.
|
|
"""
|
|
return self.getOrDefault(self.estimatorParamMaps)
|
|
|
|
def setEvaluator(self, value):
|
|
"""
|
|
Sets the value of :py:attr:`evaluator`.
|
|
"""
|
|
return self._set(evaluator=value)
|
|
|
|
def getEvaluator(self):
|
|
"""
|
|
Gets the value of evaluator or its default value.
|
|
"""
|
|
return self.getOrDefault(self.evaluator)
|
|
|
|
|
|
class CrossValidator(Estimator, ValidatorParams):
|
|
"""
|
|
|
|
K-fold cross validation performs model selection by splitting the dataset into a set of
|
|
non-overlapping randomly partitioned folds which are used as separate training and test datasets
|
|
e.g., with k=3 folds, K-fold cross validation will generate 3 (training, test) dataset pairs,
|
|
each of which uses 2/3 of the data for training and 1/3 for testing. Each fold is used as the
|
|
test set exactly once.
|
|
|
|
|
|
>>> from pyspark.ml.classification import LogisticRegression
|
|
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
|
|
>>> from pyspark.ml.linalg import Vectors
|
|
>>> dataset = spark.createDataFrame(
|
|
... [(Vectors.dense([0.0]), 0.0),
|
|
... (Vectors.dense([0.4]), 1.0),
|
|
... (Vectors.dense([0.5]), 0.0),
|
|
... (Vectors.dense([0.6]), 1.0),
|
|
... (Vectors.dense([1.0]), 1.0)] * 10,
|
|
... ["features", "label"])
|
|
>>> lr = LogisticRegression()
|
|
>>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
>>> evaluator = BinaryClassificationEvaluator()
|
|
>>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
>>> cvModel = cv.fit(dataset)
|
|
>>> cvModel.avgMetrics[0]
|
|
0.5
|
|
>>> evaluator.evaluate(cvModel.transform(dataset))
|
|
0.8333...
|
|
|
|
.. versionadded:: 1.4.0
|
|
"""
|
|
|
|
numFolds = Param(Params._dummy(), "numFolds", "number of folds for cross validation",
|
|
typeConverter=TypeConverters.toInt)
|
|
|
|
@keyword_only
|
|
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
|
|
seed=None):
|
|
"""
|
|
__init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
|
|
seed=None)
|
|
"""
|
|
super(CrossValidator, self).__init__()
|
|
self._setDefault(numFolds=3)
|
|
kwargs = self._input_kwargs
|
|
self._set(**kwargs)
|
|
|
|
@keyword_only
|
|
@since("1.4.0")
|
|
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
|
|
seed=None):
|
|
"""
|
|
setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
|
|
seed=None):
|
|
Sets params for cross validator.
|
|
"""
|
|
kwargs = self._input_kwargs
|
|
return self._set(**kwargs)
|
|
|
|
@since("1.4.0")
|
|
def setNumFolds(self, value):
|
|
"""
|
|
Sets the value of :py:attr:`numFolds`.
|
|
"""
|
|
return self._set(numFolds=value)
|
|
|
|
@since("1.4.0")
|
|
def getNumFolds(self):
|
|
"""
|
|
Gets the value of numFolds or its default value.
|
|
"""
|
|
return self.getOrDefault(self.numFolds)
|
|
|
|
def _fit(self, dataset):
|
|
est = self.getOrDefault(self.estimator)
|
|
epm = self.getOrDefault(self.estimatorParamMaps)
|
|
numModels = len(epm)
|
|
eva = self.getOrDefault(self.evaluator)
|
|
nFolds = self.getOrDefault(self.numFolds)
|
|
seed = self.getOrDefault(self.seed)
|
|
h = 1.0 / nFolds
|
|
randCol = self.uid + "_rand"
|
|
df = dataset.select("*", rand(seed).alias(randCol))
|
|
metrics = [0.0] * numModels
|
|
for i in range(nFolds):
|
|
validateLB = i * h
|
|
validateUB = (i + 1) * h
|
|
condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
|
|
validation = df.filter(condition)
|
|
train = df.filter(~condition)
|
|
models = est.fit(train, epm)
|
|
for j in range(numModels):
|
|
model = models[j]
|
|
# TODO: duplicate evaluator to take extra params from input
|
|
metric = eva.evaluate(model.transform(validation, epm[j]))
|
|
metrics[j] += metric/nFolds
|
|
|
|
if eva.isLargerBetter():
|
|
bestIndex = np.argmax(metrics)
|
|
else:
|
|
bestIndex = np.argmin(metrics)
|
|
bestModel = est.fit(dataset, epm[bestIndex])
|
|
return self._copyValues(CrossValidatorModel(bestModel, metrics))
|
|
|
|
@since("1.4.0")
|
|
def copy(self, extra=None):
|
|
"""
|
|
Creates a copy of this instance with a randomly generated uid
|
|
and some extra params. This copies creates a deep copy of
|
|
the embedded paramMap, and copies the embedded and extra parameters over.
|
|
|
|
:param extra: Extra parameters to copy to the new instance
|
|
:return: Copy of this instance
|
|
"""
|
|
if extra is None:
|
|
extra = dict()
|
|
newCV = Params.copy(self, extra)
|
|
if self.isSet(self.estimator):
|
|
newCV.setEstimator(self.getEstimator().copy(extra))
|
|
# estimatorParamMaps remain the same
|
|
if self.isSet(self.evaluator):
|
|
newCV.setEvaluator(self.getEvaluator().copy(extra))
|
|
return newCV
|
|
|
|
|
|
class CrossValidatorModel(Model, ValidatorParams):
|
|
"""
|
|
|
|
CrossValidatorModel contains the model with the highest average cross-validation
|
|
metric across folds and uses this model to transform input data. CrossValidatorModel
|
|
also tracks the metrics for each param map evaluated.
|
|
|
|
.. versionadded:: 1.4.0
|
|
"""
|
|
|
|
def __init__(self, bestModel, avgMetrics=[]):
|
|
super(CrossValidatorModel, self).__init__()
|
|
#: best model from cross validation
|
|
self.bestModel = bestModel
|
|
#: Average cross-validation metrics for each paramMap in
|
|
#: CrossValidator.estimatorParamMaps, in the corresponding order.
|
|
self.avgMetrics = avgMetrics
|
|
|
|
def _transform(self, dataset):
|
|
return self.bestModel.transform(dataset)
|
|
|
|
@since("1.4.0")
|
|
def copy(self, extra=None):
|
|
"""
|
|
Creates a copy of this instance with a randomly generated uid
|
|
and some extra params. This copies the underlying bestModel,
|
|
creates a deep copy of the embedded paramMap, and
|
|
copies the embedded and extra parameters over.
|
|
|
|
:param extra: Extra parameters to copy to the new instance
|
|
:return: Copy of this instance
|
|
"""
|
|
if extra is None:
|
|
extra = dict()
|
|
bestModel = self.bestModel.copy(extra)
|
|
avgMetrics = self.avgMetrics
|
|
return CrossValidatorModel(bestModel, avgMetrics)
|
|
|
|
|
|
class TrainValidationSplit(Estimator, ValidatorParams):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Validation for hyper-parameter tuning. Randomly splits the input dataset into train and
|
|
validation sets, and uses evaluation metric on the validation set to select the best model.
|
|
Similar to :class:`CrossValidator`, but only splits the set once.
|
|
|
|
>>> from pyspark.ml.classification import LogisticRegression
|
|
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
|
|
>>> from pyspark.ml.linalg import Vectors
|
|
>>> dataset = spark.createDataFrame(
|
|
... [(Vectors.dense([0.0]), 0.0),
|
|
... (Vectors.dense([0.4]), 1.0),
|
|
... (Vectors.dense([0.5]), 0.0),
|
|
... (Vectors.dense([0.6]), 1.0),
|
|
... (Vectors.dense([1.0]), 1.0)] * 10,
|
|
... ["features", "label"])
|
|
>>> lr = LogisticRegression()
|
|
>>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
|
|
>>> evaluator = BinaryClassificationEvaluator()
|
|
>>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
|
|
>>> tvsModel = tvs.fit(dataset)
|
|
>>> evaluator.evaluate(tvsModel.transform(dataset))
|
|
0.8333...
|
|
|
|
.. versionadded:: 2.0.0
|
|
"""
|
|
|
|
trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\
|
|
validation data. Must be between 0 and 1.", typeConverter=TypeConverters.toFloat)
|
|
|
|
@keyword_only
|
|
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
|
|
seed=None):
|
|
"""
|
|
__init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
|
|
seed=None)
|
|
"""
|
|
super(TrainValidationSplit, self).__init__()
|
|
self._setDefault(trainRatio=0.75)
|
|
kwargs = self._input_kwargs
|
|
self._set(**kwargs)
|
|
|
|
@since("2.0.0")
|
|
@keyword_only
|
|
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
|
|
seed=None):
|
|
"""
|
|
setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
|
|
seed=None):
|
|
Sets params for the train validation split.
|
|
"""
|
|
kwargs = self._input_kwargs
|
|
return self._set(**kwargs)
|
|
|
|
@since("2.0.0")
|
|
def setTrainRatio(self, value):
|
|
"""
|
|
Sets the value of :py:attr:`trainRatio`.
|
|
"""
|
|
return self._set(trainRatio=value)
|
|
|
|
@since("2.0.0")
|
|
def getTrainRatio(self):
|
|
"""
|
|
Gets the value of trainRatio or its default value.
|
|
"""
|
|
return self.getOrDefault(self.trainRatio)
|
|
|
|
def _fit(self, dataset):
|
|
est = self.getOrDefault(self.estimator)
|
|
epm = self.getOrDefault(self.estimatorParamMaps)
|
|
numModels = len(epm)
|
|
eva = self.getOrDefault(self.evaluator)
|
|
tRatio = self.getOrDefault(self.trainRatio)
|
|
seed = self.getOrDefault(self.seed)
|
|
randCol = self.uid + "_rand"
|
|
df = dataset.select("*", rand(seed).alias(randCol))
|
|
metrics = [0.0] * numModels
|
|
condition = (df[randCol] >= tRatio)
|
|
validation = df.filter(condition)
|
|
train = df.filter(~condition)
|
|
models = est.fit(train, epm)
|
|
for j in range(numModels):
|
|
model = models[j]
|
|
metric = eva.evaluate(model.transform(validation, epm[j]))
|
|
metrics[j] += metric
|
|
if eva.isLargerBetter():
|
|
bestIndex = np.argmax(metrics)
|
|
else:
|
|
bestIndex = np.argmin(metrics)
|
|
bestModel = est.fit(dataset, epm[bestIndex])
|
|
return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
|
|
|
|
@since("2.0.0")
|
|
def copy(self, extra=None):
|
|
"""
|
|
Creates a copy of this instance with a randomly generated uid
|
|
and some extra params. This copies creates a deep copy of
|
|
the embedded paramMap, and copies the embedded and extra parameters over.
|
|
|
|
:param extra: Extra parameters to copy to the new instance
|
|
:return: Copy of this instance
|
|
"""
|
|
if extra is None:
|
|
extra = dict()
|
|
newTVS = Params.copy(self, extra)
|
|
if self.isSet(self.estimator):
|
|
newTVS.setEstimator(self.getEstimator().copy(extra))
|
|
# estimatorParamMaps remain the same
|
|
if self.isSet(self.evaluator):
|
|
newTVS.setEvaluator(self.getEvaluator().copy(extra))
|
|
return newTVS
|
|
|
|
|
|
class TrainValidationSplitModel(Model, ValidatorParams):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Model from train validation split.
|
|
|
|
.. versionadded:: 2.0.0
|
|
"""
|
|
|
|
def __init__(self, bestModel, validationMetrics=[]):
|
|
super(TrainValidationSplitModel, self).__init__()
|
|
#: best model from cross validation
|
|
self.bestModel = bestModel
|
|
#: evaluated validation metrics
|
|
self.validationMetrics = validationMetrics
|
|
|
|
def _transform(self, dataset):
|
|
return self.bestModel.transform(dataset)
|
|
|
|
@since("2.0.0")
|
|
def copy(self, extra=None):
|
|
"""
|
|
Creates a copy of this instance with a randomly generated uid
|
|
and some extra params. This copies the underlying bestModel,
|
|
creates a deep copy of the embedded paramMap, and
|
|
copies the embedded and extra parameters over.
|
|
And, this creates a shallow copy of the validationMetrics.
|
|
|
|
:param extra: Extra parameters to copy to the new instance
|
|
:return: Copy of this instance
|
|
"""
|
|
if extra is None:
|
|
extra = dict()
|
|
bestModel = self.bestModel.copy(extra)
|
|
validationMetrics = list(self.validationMetrics)
|
|
return TrainValidationSplitModel(bestModel, validationMetrics)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import doctest
|
|
|
|
from pyspark.sql import SparkSession
|
|
globs = globals().copy()
|
|
|
|
# The small batch size here ensures that we see multiple batches,
|
|
# even in these small test examples:
|
|
spark = SparkSession.builder\
|
|
.master("local[2]")\
|
|
.appName("ml.tuning tests")\
|
|
.getOrCreate()
|
|
sc = spark.sparkContext
|
|
globs['sc'] = sc
|
|
globs['spark'] = spark
|
|
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
|
|
spark.stop()
|
|
if failure_count:
|
|
exit(-1)
|