Revert "[SPARK-22797][PYSPARK] Bucketizer support multi-column"
This reverts commit c22eaa94e8
.
This commit is contained in:
parent
dd8e257d1c
commit
a8a3e9b7cf
|
@ -317,33 +317,26 @@ class BucketedRandomProjectionLSHModel(LSHModel, JavaMLReadable, JavaMLWritable)
|
|||
|
||||
|
||||
@inherit_doc
|
||||
class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasInputCols, HasOutputCols,
|
||||
HasHandleInvalid, JavaMLReadable, JavaMLWritable):
|
||||
class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasHandleInvalid,
|
||||
JavaMLReadable, JavaMLWritable):
|
||||
"""
|
||||
Maps a column of continuous features to a column of feature buckets. Since 2.3.0,
|
||||
:py:class:`Bucketizer` can map multiple columns at once by setting the :py:attr:`inputCols`
|
||||
parameter. Note that when both the :py:attr:`inputCol` and :py:attr:`inputCols` parameters
|
||||
are set, an Exception will be thrown. The :py:attr:`splits` parameter is only used for single
|
||||
column usage, and :py:attr:`splitsArray` is for multiple columns.
|
||||
Maps a column of continuous features to a column of feature buckets.
|
||||
|
||||
>>> values = [(0.1, 0.0), (0.4, 1.0), (1.2, 1.3), (1.5, float("nan")),
|
||||
... (float("nan"), 1.0), (float("nan"), 0.0)]
|
||||
>>> df = spark.createDataFrame(values, ["values1", "values2"])
|
||||
>>> values = [(0.1,), (0.4,), (1.2,), (1.5,), (float("nan"),), (float("nan"),)]
|
||||
>>> df = spark.createDataFrame(values, ["values"])
|
||||
>>> bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")],
|
||||
... inputCol="values1", outputCol="buckets")
|
||||
>>> bucketed = bucketizer.setHandleInvalid("keep").transform(df.select("values1"))
|
||||
>>> bucketed.show(truncate=False)
|
||||
+-------+-------+
|
||||
|values1|buckets|
|
||||
+-------+-------+
|
||||
|0.1 |0.0 |
|
||||
|0.4 |0.0 |
|
||||
|1.2 |1.0 |
|
||||
|1.5 |2.0 |
|
||||
|NaN |3.0 |
|
||||
|NaN |3.0 |
|
||||
+-------+-------+
|
||||
...
|
||||
... inputCol="values", outputCol="buckets")
|
||||
>>> bucketed = bucketizer.setHandleInvalid("keep").transform(df).collect()
|
||||
>>> len(bucketed)
|
||||
6
|
||||
>>> bucketed[0].buckets
|
||||
0.0
|
||||
>>> bucketed[1].buckets
|
||||
0.0
|
||||
>>> bucketed[2].buckets
|
||||
1.0
|
||||
>>> bucketed[3].buckets
|
||||
2.0
|
||||
>>> bucketizer.setParams(outputCol="b").transform(df).head().b
|
||||
0.0
|
||||
>>> bucketizerPath = temp_path + "/bucketizer"
|
||||
|
@ -354,22 +347,6 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasInputCols, HasOu
|
|||
>>> bucketed = bucketizer.setHandleInvalid("skip").transform(df).collect()
|
||||
>>> len(bucketed)
|
||||
4
|
||||
>>> bucketizer2 = Bucketizer(splitsArray=
|
||||
... [[-float("inf"), 0.5, 1.4, float("inf")], [-float("inf"), 0.5, float("inf")]],
|
||||
... inputCols=["values1", "values2"], outputCols=["buckets1", "buckets2"])
|
||||
>>> bucketed2 = bucketizer2.setHandleInvalid("keep").transform(df)
|
||||
>>> bucketed2.show(truncate=False)
|
||||
+-------+-------+--------+--------+
|
||||
|values1|values2|buckets1|buckets2|
|
||||
+-------+-------+--------+--------+
|
||||
|0.1 |0.0 |0.0 |0.0 |
|
||||
|0.4 |1.0 |0.0 |1.0 |
|
||||
|1.2 |1.3 |1.0 |1.0 |
|
||||
|1.5 |NaN |2.0 |2.0 |
|
||||
|NaN |1.0 |3.0 |1.0 |
|
||||
|NaN |0.0 |3.0 |0.0 |
|
||||
+-------+-------+--------+--------+
|
||||
...
|
||||
|
||||
.. versionadded:: 1.4.0
|
||||
"""
|
||||
|
@ -386,30 +363,14 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasInputCols, HasOu
|
|||
|
||||
handleInvalid = Param(Params._dummy(), "handleInvalid", "how to handle invalid entries. " +
|
||||
"Options are 'skip' (filter out rows with invalid values), " +
|
||||
"'error' (throw an error), or 'keep' (keep invalid values in a " +
|
||||
"special additional bucket). Note that in the multiple column " +
|
||||
"case, the invalid handling is applied to all columns. That said " +
|
||||
"for 'error' it will throw an error if any invalids are found in " +
|
||||
"any column, for 'skip' it will skip rows with any invalids in " +
|
||||
"any columns, etc.",
|
||||
"'error' (throw an error), or 'keep' (keep invalid values in a special " +
|
||||
"additional bucket).",
|
||||
typeConverter=TypeConverters.toString)
|
||||
|
||||
splitsArray = Param(Params._dummy(), "splitsArray", "The array of split points for mapping " +
|
||||
"continuous features into buckets for multiple columns. For each input " +
|
||||
"column, with n+1 splits, there are n buckets. A bucket defined by " +
|
||||
"splits x,y holds values in the range [x,y) except the last bucket, " +
|
||||
"which also includes y. The splits should be of length >= 3 and " +
|
||||
"strictly increasing. Values at -inf, inf must be explicitly provided " +
|
||||
"to cover all Double values; otherwise, values outside the splits " +
|
||||
"specified will be treated as errors.",
|
||||
typeConverter=TypeConverters.toListListFloat)
|
||||
|
||||
@keyword_only
|
||||
def __init__(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error",
|
||||
splitsArray=None, inputCols=None, outputCols=None):
|
||||
def __init__(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error"):
|
||||
"""
|
||||
__init__(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error", \
|
||||
splitsArray=None, inputCols=None, outputCols=None)
|
||||
__init__(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error")
|
||||
"""
|
||||
super(Bucketizer, self).__init__()
|
||||
self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.Bucketizer", self.uid)
|
||||
|
@ -419,11 +380,9 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasInputCols, HasOu
|
|||
|
||||
@keyword_only
|
||||
@since("1.4.0")
|
||||
def setParams(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error",
|
||||
splitsArray=None, inputCols=None, outputCols=None):
|
||||
def setParams(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error"):
|
||||
"""
|
||||
setParams(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error", \
|
||||
splitsArray=None, inputCols=None, outputCols=None)
|
||||
setParams(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error")
|
||||
Sets params for this Bucketizer.
|
||||
"""
|
||||
kwargs = self._input_kwargs
|
||||
|
@ -443,20 +402,6 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasInputCols, HasOu
|
|||
"""
|
||||
return self.getOrDefault(self.splits)
|
||||
|
||||
@since("2.3.0")
|
||||
def setSplitsArray(self, value):
|
||||
"""
|
||||
Sets the value of :py:attr:`splitsArray`.
|
||||
"""
|
||||
return self._set(splitsArray=value)
|
||||
|
||||
@since("2.3.0")
|
||||
def getSplitsArray(self):
|
||||
"""
|
||||
Gets the array of split points or its default value.
|
||||
"""
|
||||
return self.getOrDefault(self.splitsArray)
|
||||
|
||||
|
||||
@inherit_doc
|
||||
class CountVectorizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
|
||||
|
|
|
@ -134,16 +134,6 @@ class TypeConverters(object):
|
|||
return [float(v) for v in value]
|
||||
raise TypeError("Could not convert %s to list of floats" % value)
|
||||
|
||||
@staticmethod
|
||||
def toListListFloat(value):
|
||||
"""
|
||||
Convert a value to list of list of floats, if possible.
|
||||
"""
|
||||
if TypeConverters._can_convert_to_list(value):
|
||||
value = TypeConverters.toList(value)
|
||||
return [TypeConverters.toListFloat(v) for v in value]
|
||||
raise TypeError("Could not convert %s to list of list of floats" % value)
|
||||
|
||||
@staticmethod
|
||||
def toListInt(value):
|
||||
"""
|
||||
|
|
|
@ -238,15 +238,6 @@ class ParamTypeConversionTests(PySparkTestCase):
|
|||
self.assertRaises(TypeError, lambda: LogisticRegression(fitIntercept=1))
|
||||
self.assertRaises(TypeError, lambda: LogisticRegression(fitIntercept="false"))
|
||||
|
||||
def test_list_list_float(self):
|
||||
b = Bucketizer(splitsArray=[[-0.1, 0.5, 3], [-5, 1.5]])
|
||||
self.assertEqual(b.getSplitsArray(), [[-0.1, 0.5, 3.0], [-5.0, 1.5]])
|
||||
self.assertTrue(all([type(v) == list for v in b.getSplitsArray()]))
|
||||
self.assertTrue(all([type(v) == float for v in b.getSplitsArray()[0]]))
|
||||
self.assertTrue(all([type(v) == float for v in b.getSplitsArray()[1]]))
|
||||
self.assertRaises(TypeError, lambda: Bucketizer(splitsArray=["a", 1.0]))
|
||||
self.assertRaises(TypeError, lambda: Bucketizer(splitsArray=[[-5, 1.5], ["a", 1.0]]))
|
||||
|
||||
|
||||
class PipelineTests(PySparkTestCase):
|
||||
|
||||
|
|
Loading…
Reference in a new issue