spark-instrumented-optimizer/python/pyspark/mllib/tree.py
Davies Liu 04e44b37cc [SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4).

Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped.

TODO: ec2/spark-ec2.py is not fully tested with python3.

Author: Davies Liu <davies@databricks.com>
Author: twneale <twneale@gmail.com>
Author: Josh Rosen <joshrosen@databricks.com>

Closes #5173 from davies/python3 and squashes the following commits:

d7d6323 [Davies Liu] fix tests
6c52a98 [Davies Liu] fix mllib test
99e334f [Davies Liu] update timeout
b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
cafd5ec [Davies Liu] adddress comments from @mengxr
bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
179fc8d [Davies Liu] tuning flaky tests
8c8b957 [Davies Liu] fix ResourceWarning in Python 3
5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
4006829 [Davies Liu] fix test
2fc0066 [Davies Liu] add python3 path
71535e9 [Davies Liu] fix xrange and divide
5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ed498c8 [Davies Liu] fix compatibility with python 3
820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ad7c374 [Davies Liu] fix mllib test and warning
ef1fc2f [Davies Liu] fix tests
4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
59bb492 [Davies Liu] fix tests
1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ca0fdd3 [Davies Liu] fix code style
9563a15 [Davies Liu] add imap back for python 2
0b1ec04 [Davies Liu] make python examples work with Python 3
d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
a716d34 [Davies Liu] test with python 3.4
f1700e8 [Davies Liu] fix test in python3
671b1db [Davies Liu] fix test in python3
692ff47 [Davies Liu] fix flaky test
7b9699f [Davies Liu] invalidate import cache for Python 3.3+
9c58497 [Davies Liu] fix kill worker
309bfbf [Davies Liu] keep compatibility
5707476 [Davies Liu] cleanup, fix hash of string in 3.3+
8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
f53e1f0 [Davies Liu] fix tests
70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3
a39167e [Davies Liu] support customize class in __main__
814c77b [Davies Liu] run unittests with python 3
7f4476e [Davies Liu] mllib tests passed
d737924 [Davies Liu] pass ml tests
375ea17 [Davies Liu] SQL tests pass
6cc42a9 [Davies Liu] rename
431a8de [Davies Liu] streaming tests pass
78901a7 [Davies Liu] fix hash of serializer in Python 3
24b2f2e [Davies Liu] pass all RDD tests
35f48fe [Davies Liu] run future again
1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py
6e3c21d [Davies Liu] make cloudpickle work with Python3
2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run
1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out
7354371 [twneale] buffer --> memoryview  I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work.
b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?).
f40d925 [twneale] xrange --> range
e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206
79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper
2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3
854be27 [Josh Rosen] Run `futurize` on Python code:
7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
2015-04-16 16:20:57 -07:00

572 lines
22 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import random
from pyspark import SparkContext, RDD
from pyspark.mllib.common import callMLlibFunc, inherit_doc, JavaModelWrapper
from pyspark.mllib.linalg import _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import JavaLoader, JavaSaveable
__all__ = ['DecisionTreeModel', 'DecisionTree', 'RandomForestModel',
'RandomForest', 'GradientBoostedTreesModel', 'GradientBoostedTrees']
class TreeEnsembleModel(JavaModelWrapper, JavaSaveable):
def predict(self, x):
"""
Predict values for a single data point or an RDD of points using
the model trained.
Note: In Python, predict cannot currently be used within an RDD
transformation or action.
Call predict directly on the RDD instead.
"""
if isinstance(x, RDD):
return self.call("predict", x.map(_convert_to_vector))
else:
return self.call("predict", _convert_to_vector(x))
def numTrees(self):
"""
Get number of trees in ensemble.
"""
return self.call("numTrees")
def totalNumNodes(self):
"""
Get total number of nodes, summed over all trees in the
ensemble.
"""
return self.call("totalNumNodes")
def __repr__(self):
""" Summary of model """
return self._java_model.toString()
def toDebugString(self):
""" Full model """
return self._java_model.toDebugString()
class DecisionTreeModel(JavaModelWrapper, JavaSaveable, JavaLoader):
"""
.. note:: Experimental
A decision tree model for classification or regression.
"""
def predict(self, x):
"""
Predict the label of one or more examples.
Note: In Python, predict cannot currently be used within an RDD
transformation or action.
Call predict directly on the RDD instead.
:param x: Data point (feature vector),
or an RDD of data points (feature vectors).
"""
if isinstance(x, RDD):
return self.call("predict", x.map(_convert_to_vector))
else:
return self.call("predict", _convert_to_vector(x))
def numNodes(self):
return self._java_model.numNodes()
def depth(self):
return self._java_model.depth()
def __repr__(self):
""" summary of model. """
return self._java_model.toString()
def toDebugString(self):
""" full model. """
return self._java_model.toDebugString()
@classmethod
def _java_loader_class(cls):
return "org.apache.spark.mllib.tree.model.DecisionTreeModel"
class DecisionTree(object):
"""
.. note:: Experimental
Learning algorithm for a decision tree model for classification or
regression.
"""
@classmethod
def _train(cls, data, type, numClasses, features, impurity="gini", maxDepth=5, maxBins=32,
minInstancesPerNode=1, minInfoGain=0.0):
first = data.first()
assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
model = callMLlibFunc("trainDecisionTreeModel", data, type, numClasses, features,
impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
return DecisionTreeModel(model)
@classmethod
def trainClassifier(cls, data, numClasses, categoricalFeaturesInfo,
impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
minInfoGain=0.0):
"""
Train a DecisionTreeModel for classification.
:param data: Training data: RDD of LabeledPoint.
Labels are integers {0,1,...,numClasses}.
:param numClasses: Number of classes for classification.
:param categoricalFeaturesInfo: Map from categorical feature index
to number of categories.
Any feature not in this map
is treated as continuous.
:param impurity: Supported values: "entropy" or "gini"
:param maxDepth: Max depth of tree.
E.g., depth 0 means 1 leaf node.
Depth 1 means 1 internal node + 2 leaf nodes.
:param maxBins: Number of bins used for finding splits at each node.
:param minInstancesPerNode: Min number of instances required at child
nodes to create the parent split
:param minInfoGain: Min info gain required to create a split
:return: DecisionTreeModel
Example usage:
>>> from numpy import array
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import DecisionTree
>>>
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
>>> print(model)
DecisionTreeModel classifier of depth 1 with 3 nodes
>>> print(model.toDebugString())
DecisionTreeModel classifier of depth 1 with 3 nodes
If (feature 0 <= 0.0)
Predict: 0.0
Else (feature 0 > 0.0)
Predict: 1.0
<BLANKLINE>
>>> model.predict(array([1.0]))
1.0
>>> model.predict(array([0.0]))
0.0
>>> rdd = sc.parallelize([[1.0], [0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
"""
return cls._train(data, "classification", numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
@classmethod
def trainRegressor(cls, data, categoricalFeaturesInfo,
impurity="variance", maxDepth=5, maxBins=32, minInstancesPerNode=1,
minInfoGain=0.0):
"""
Train a DecisionTreeModel for regression.
:param data: Training data: RDD of LabeledPoint.
Labels are real numbers.
:param categoricalFeaturesInfo: Map from categorical feature
index to number of categories.
Any feature not in this map is treated as continuous.
:param impurity: Supported values: "variance"
:param maxDepth: Max depth of tree.
E.g., depth 0 means 1 leaf node.
Depth 1 means 1 internal node + 2 leaf nodes.
:param maxBins: Number of bins used for finding splits at each
node.
:param minInstancesPerNode: Min number of instances required at
child nodes to create the parent split
:param minInfoGain: Min info gain required to create a split
:return: DecisionTreeModel
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import DecisionTree
>>> from pyspark.mllib.linalg import SparseVector
>>>
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
>>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {})
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {1: 0.0}))
0.0
>>> rdd = sc.parallelize([[0.0, 1.0], [0.0, 0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
"""
return cls._train(data, "regression", 0, categoricalFeaturesInfo,
impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
@inherit_doc
class RandomForestModel(TreeEnsembleModel, JavaLoader):
"""
.. note:: Experimental
Represents a random forest model.
"""
@classmethod
def _java_loader_class(cls):
return "org.apache.spark.mllib.tree.model.RandomForestModel"
class RandomForest(object):
"""
.. note:: Experimental
Learning algorithm for a random forest model for classification or
regression.
"""
supportedFeatureSubsetStrategies = ("auto", "all", "sqrt", "log2", "onethird")
@classmethod
def _train(cls, data, algo, numClasses, categoricalFeaturesInfo, numTrees,
featureSubsetStrategy, impurity, maxDepth, maxBins, seed):
first = data.first()
assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
if featureSubsetStrategy not in cls.supportedFeatureSubsetStrategies:
raise ValueError("unsupported featureSubsetStrategy: %s" % featureSubsetStrategy)
if seed is None:
seed = random.randint(0, 1 << 30)
model = callMLlibFunc("trainRandomForestModel", data, algo, numClasses,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity,
maxDepth, maxBins, seed)
return RandomForestModel(model)
@classmethod
def trainClassifier(cls, data, numClasses, categoricalFeaturesInfo, numTrees,
featureSubsetStrategy="auto", impurity="gini", maxDepth=4, maxBins=32,
seed=None):
"""
Method to train a decision tree model for binary or multiclass
classification.
:param data: Training dataset: RDD of LabeledPoint. Labels
should take values {0, 1, ..., numClasses-1}.
:param numClasses: number of classes for classification.
:param categoricalFeaturesInfo: Map storing arity of categorical
features. E.g., an entry (n -> k) indicates that
feature n is categorical with k categories indexed
from 0: {0, 1, ..., k-1}.
:param numTrees: Number of trees in the random forest.
:param featureSubsetStrategy: Number of features to consider for
splits at each node.
Supported: "auto" (default), "all", "sqrt", "log2", "onethird".
If "auto" is set, this parameter is set based on numTrees:
if numTrees == 1, set to "all";
if numTrees > 1 (forest) set to "sqrt".
:param impurity: Criterion used for information gain calculation.
Supported values: "gini" (recommended) or "entropy".
:param maxDepth: Maximum depth of the tree.
E.g., depth 0 means 1 leaf node; depth 1 means
1 internal node + 2 leaf nodes. (default: 4)
:param maxBins: maximum number of bins used for splitting
features
(default: 100)
:param seed: Random seed for bootstrapping and choosing feature
subsets.
:return: RandomForestModel that can be used for prediction
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import RandomForest
>>>
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(0.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>> model = RandomForest.trainClassifier(sc.parallelize(data), 2, {}, 3, seed=42)
>>> model.numTrees()
3
>>> model.totalNumNodes()
7
>>> print(model)
TreeEnsembleModel classifier with 3 trees
<BLANKLINE>
>>> print(model.toDebugString())
TreeEnsembleModel classifier with 3 trees
<BLANKLINE>
Tree 0:
Predict: 1.0
Tree 1:
If (feature 0 <= 1.0)
Predict: 0.0
Else (feature 0 > 1.0)
Predict: 1.0
Tree 2:
If (feature 0 <= 1.0)
Predict: 0.0
Else (feature 0 > 1.0)
Predict: 1.0
<BLANKLINE>
>>> model.predict([2.0])
1.0
>>> model.predict([0.0])
0.0
>>> rdd = sc.parallelize([[3.0], [1.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
"""
return cls._train(data, "classification", numClasses,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity,
maxDepth, maxBins, seed)
@classmethod
def trainRegressor(cls, data, categoricalFeaturesInfo, numTrees, featureSubsetStrategy="auto",
impurity="variance", maxDepth=4, maxBins=32, seed=None):
"""
Method to train a decision tree model for regression.
:param data: Training dataset: RDD of LabeledPoint. Labels are
real numbers.
:param categoricalFeaturesInfo: Map storing arity of categorical
features. E.g., an entry (n -> k) indicates that feature
n is categorical with k categories indexed from 0:
{0, 1, ..., k-1}.
:param numTrees: Number of trees in the random forest.
:param featureSubsetStrategy: Number of features to consider for
splits at each node.
Supported: "auto" (default), "all", "sqrt", "log2", "onethird".
If "auto" is set, this parameter is set based on numTrees:
if numTrees == 1, set to "all";
if numTrees > 1 (forest) set to "onethird" for regression.
:param impurity: Criterion used for information gain
calculation.
Supported values: "variance".
:param maxDepth: Maximum depth of the tree. E.g., depth 0 means
1 leaf node; depth 1 means 1 internal node + 2 leaf
nodes. (default: 4)
:param maxBins: maximum number of bins used for splitting
features (default: 100)
:param seed: Random seed for bootstrapping and choosing feature
subsets.
:return: RandomForestModel that can be used for prediction
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import RandomForest
>>> from pyspark.mllib.linalg import SparseVector
>>>
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
>>> model = RandomForest.trainRegressor(sc.parallelize(sparse_data), {}, 2, seed=42)
>>> model.numTrees()
2
>>> model.totalNumNodes()
4
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {0: 1.0}))
0.5
>>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.5]
"""
return cls._train(data, "regression", 0, categoricalFeaturesInfo, numTrees,
featureSubsetStrategy, impurity, maxDepth, maxBins, seed)
@inherit_doc
class GradientBoostedTreesModel(TreeEnsembleModel, JavaLoader):
"""
.. note:: Experimental
Represents a gradient-boosted tree model.
"""
@classmethod
def _java_loader_class(cls):
return "org.apache.spark.mllib.tree.model.GradientBoostedTreesModel"
class GradientBoostedTrees(object):
"""
.. note:: Experimental
Learning algorithm for a gradient boosted trees model for
classification or regression.
"""
@classmethod
def _train(cls, data, algo, categoricalFeaturesInfo,
loss, numIterations, learningRate, maxDepth):
first = data.first()
assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
model = callMLlibFunc("trainGradientBoostedTreesModel", data, algo, categoricalFeaturesInfo,
loss, numIterations, learningRate, maxDepth)
return GradientBoostedTreesModel(model)
@classmethod
def trainClassifier(cls, data, categoricalFeaturesInfo,
loss="logLoss", numIterations=100, learningRate=0.1, maxDepth=3):
"""
Method to train a gradient-boosted trees model for
classification.
:param data: Training dataset: RDD of LabeledPoint.
Labels should take values {0, 1}.
:param categoricalFeaturesInfo: Map storing arity of categorical
features. E.g., an entry (n -> k) indicates that feature
n is categorical with k categories indexed from 0:
{0, 1, ..., k-1}.
:param loss: Loss function used for minimization during gradient
boosting. Supported: {"logLoss" (default),
"leastSquaresError", "leastAbsoluteError"}.
:param numIterations: Number of iterations of boosting.
(default: 100)
:param learningRate: Learning rate for shrinking the
contribution of each estimator. The learning rate
should be between in the interval (0, 1].
(default: 0.1)
:param maxDepth: Maximum depth of the tree. E.g., depth 0 means
1 leaf node; depth 1 means 1 internal node + 2 leaf
nodes. (default: 3)
:return: GradientBoostedTreesModel that can be used for
prediction
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import GradientBoostedTrees
>>>
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(0.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>>
>>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {})
>>> model.numTrees()
100
>>> model.totalNumNodes()
300
>>> print(model) # it already has newline
TreeEnsembleModel classifier with 100 trees
<BLANKLINE>
>>> model.predict([2.0])
1.0
>>> model.predict([0.0])
0.0
>>> rdd = sc.parallelize([[2.0], [0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
"""
return cls._train(data, "classification", categoricalFeaturesInfo,
loss, numIterations, learningRate, maxDepth)
@classmethod
def trainRegressor(cls, data, categoricalFeaturesInfo,
loss="leastSquaresError", numIterations=100, learningRate=0.1, maxDepth=3):
"""
Method to train a gradient-boosted trees model for regression.
:param data: Training dataset: RDD of LabeledPoint. Labels are
real numbers.
:param categoricalFeaturesInfo: Map storing arity of categorical
features. E.g., an entry (n -> k) indicates that feature
n is categorical with k categories indexed from 0:
{0, 1, ..., k-1}.
:param loss: Loss function used for minimization during gradient
boosting. Supported: {"logLoss" (default),
"leastSquaresError", "leastAbsoluteError"}.
:param numIterations: Number of iterations of boosting.
(default: 100)
:param learningRate: Learning rate for shrinking the
contribution of each estimator. The learning rate
should be between in the interval (0, 1].
(default: 0.1)
:param maxDepth: Maximum depth of the tree. E.g., depth 0 means
1 leaf node; depth 1 means 1 internal node + 2 leaf
nodes. (default: 3)
:return: GradientBoostedTreesModel that can be used for
prediction
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import GradientBoostedTrees
>>> from pyspark.mllib.linalg import SparseVector
>>>
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
>>> model = GradientBoostedTrees.trainRegressor(sc.parallelize(sparse_data), {})
>>> model.numTrees()
100
>>> model.totalNumNodes()
102
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {0: 1.0}))
0.0
>>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
"""
return cls._train(data, "regression", categoricalFeaturesInfo,
loss, numIterations, learningRate, maxDepth)
def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()