[SPARK-26133][ML] Remove deprecated OneHotEncoder and rename OneHotEncoderEstimator to OneHotEncoder

## What changes were proposed in this pull request?

We have deprecated `OneHotEncoder` at Spark 2.3.0 and introduced `OneHotEncoderEstimator`. At 3.0.0, we remove deprecated `OneHotEncoder` and rename `OneHotEncoderEstimator` to `OneHotEncoder`.

TODO: According to ML migration guide, we need to keep `OneHotEncoderEstimator` as an alias after renaming. This is not done at this patch in order to facilitate review.

## How was this patch tested?

Existing tests.

Closes #23100 from viirya/remove_one_hot_encoder.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
This commit is contained in:
Liang-Chi Hsieh 2018-11-29 01:54:06 +00:00 committed by DB Tsai
parent fa0d4bf699
commit 8bfea86b1c
No known key found for this signature in database
GPG key ID: E6FD79DA81FE14FD
12 changed files with 847 additions and 1228 deletions

View file

@ -779,43 +779,37 @@ for more details on the API.
</div>
</div>
## OneHotEncoder (Deprecated since 2.3.0)
Because this existing `OneHotEncoder` is a stateless transformer, it is not usable on new data where the number of categories may differ from the training data. In order to fix this, a new `OneHotEncoderEstimator` was created that produces an `OneHotEncoderModel` when fitting. For more detail, please see [SPARK-13030](https://issues.apache.org/jira/browse/SPARK-13030).
`OneHotEncoder` has been deprecated in 2.3.0 and will be removed in 3.0.0. Please use [OneHotEncoderEstimator](ml-features.html#onehotencoderestimator) instead.
## OneHotEncoderEstimator
## OneHotEncoder
[One-hot encoding](http://en.wikipedia.org/wiki/One-hot) maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using [StringIndexer](ml-features.html#stringindexer) first.
`OneHotEncoderEstimator` can transform multiple columns, returning an one-hot-encoded output vector column for each input column. It is common to merge these vectors into a single feature vector using [VectorAssembler](ml-features.html#vectorassembler).
`OneHotEncoder` can transform multiple columns, returning an one-hot-encoded output vector column for each input column. It is common to merge these vectors into a single feature vector using [VectorAssembler](ml-features.html#vectorassembler).
`OneHotEncoderEstimator` supports the `handleInvalid` parameter to choose how to handle invalid input during transforming data. Available options include 'keep' (any invalid inputs are assigned to an extra categorical index) and 'error' (throw an error).
`OneHotEncoder` supports the `handleInvalid` parameter to choose how to handle invalid input during transforming data. Available options include 'keep' (any invalid inputs are assigned to an extra categorical index) and 'error' (throw an error).
**Examples**
<div class="codetabs">
<div data-lang="scala" markdown="1">
Refer to the [OneHotEncoderEstimator Scala docs](api/scala/index.html#org.apache.spark.ml.feature.OneHotEncoderEstimator) for more details on the API.
Refer to the [OneHotEncoder Scala docs](api/scala/index.html#org.apache.spark.ml.feature.OneHotEncoder) for more details on the API.
{% include_example scala/org/apache/spark/examples/ml/OneHotEncoderEstimatorExample.scala %}
{% include_example scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala %}
</div>
<div data-lang="java" markdown="1">
Refer to the [OneHotEncoderEstimator Java docs](api/java/org/apache/spark/ml/feature/OneHotEncoderEstimator.html)
Refer to the [OneHotEncoder Java docs](api/java/org/apache/spark/ml/feature/OneHotEncoder.html)
for more details on the API.
{% include_example java/org/apache/spark/examples/ml/JavaOneHotEncoderEstimatorExample.java %}
{% include_example java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java %}
</div>
<div data-lang="python" markdown="1">
Refer to the [OneHotEncoderEstimator Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoderEstimator) for more details on the API.
Refer to the [OneHotEncoder Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoder) for more details on the API.
{% include_example python/ml/onehot_encoder_estimator_example.py %}
{% include_example python/ml/onehot_encoder_example.py %}
</div>
</div>

View file

@ -104,6 +104,12 @@ MLlib is under active development.
The APIs marked `Experimental`/`DeveloperApi` may change in future releases,
and the migration guide below will explain all changes between releases.
## From 2.4 to 3.0
### Breaking changes
* `OneHotEncoder` which is deprecated in 2.3, is removed in 3.0 and `OneHotEncoderEstimator` is now renamed to `OneHotEncoder`.
## From 2.2 to 2.3
### Breaking changes

View file

@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.OneHotEncoderEstimator;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@ -34,11 +34,11 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// $example off$
public class JavaOneHotEncoderEstimatorExample {
public class JavaOneHotEncoderExample {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaOneHotEncoderEstimatorExample")
.appName("JavaOneHotEncoderExample")
.getOrCreate();
// Note: categorical features are usually first encoded with StringIndexer
@ -59,7 +59,7 @@ public class JavaOneHotEncoderEstimatorExample {
Dataset<Row> df = spark.createDataFrame(data, schema);
OneHotEncoderEstimator encoder = new OneHotEncoderEstimator()
OneHotEncoder encoder = new OneHotEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryVec1", "categoryVec2"});

View file

@ -18,14 +18,14 @@
from __future__ import print_function
# $example on$
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import OneHotEncoder
# $example off$
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("OneHotEncoderEstimatorExample")\
.appName("OneHotEncoderExample")\
.getOrCreate()
# Note: categorical features are usually first encoded with StringIndexer
@ -39,8 +39,8 @@ if __name__ == "__main__":
(2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])
encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryVec1", "categoryVec2"])
encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()

View file

@ -19,15 +19,15 @@
package org.apache.spark.examples.ml
// $example on$
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.OneHotEncoder
// $example off$
import org.apache.spark.sql.SparkSession
object OneHotEncoderEstimatorExample {
object OneHotEncoderExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("OneHotEncoderEstimatorExample")
.appName("OneHotEncoderExample")
.getOrCreate()
// Note: categorical features are usually first encoded with StringIndexer
@ -41,7 +41,7 @@ object OneHotEncoderEstimatorExample {
(2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")
val encoder = new OneHotEncoderEstimator()
val encoder = new OneHotEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)

View file

@ -17,126 +17,512 @@
package org.apache.spark.ml.feature
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCols, HasOutputCols}
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{DoubleType, NumericType, StructType}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, lit, udf}
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
/** Private trait for params and common methods for OneHotEncoder and OneHotEncoderModel */
private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid
with HasInputCols with HasOutputCols {
/**
* Param for how to handle invalid data during transform().
* Options are 'keep' (invalid data presented as an extra categorical feature) or
* 'error' (throw an error).
* Note that this Param is only used during transform; during fitting, invalid data
* will result in an error.
* Default: "error"
* @group param
*/
@Since("2.3.0")
override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid",
"How to handle invalid data during transform(). " +
"Options are 'keep' (invalid data presented as an extra categorical feature) " +
"or error (throw an error). Note that this Param is only used during transform; " +
"during fitting, invalid data will result in an error.",
ParamValidators.inArray(OneHotEncoder.supportedHandleInvalids))
setDefault(handleInvalid, OneHotEncoder.ERROR_INVALID)
/**
* Whether to drop the last category in the encoded vector (default: true)
* @group param
*/
@Since("2.3.0")
final val dropLast: BooleanParam =
new BooleanParam(this, "dropLast", "whether to drop the last category")
setDefault(dropLast -> true)
/** @group getParam */
@Since("2.3.0")
def getDropLast: Boolean = $(dropLast)
protected def validateAndTransformSchema(
schema: StructType,
dropLast: Boolean,
keepInvalid: Boolean): StructType = {
val inputColNames = $(inputCols)
val outputColNames = $(outputCols)
require(inputColNames.length == outputColNames.length,
s"The number of input columns ${inputColNames.length} must be the same as the number of " +
s"output columns ${outputColNames.length}.")
// Input columns must be NumericType.
inputColNames.foreach(SchemaUtils.checkNumericType(schema, _))
// Prepares output columns with proper attributes by examining input columns.
val inputFields = $(inputCols).map(schema(_))
val outputFields = inputFields.zip(outputColNames).map { case (inputField, outputColName) =>
OneHotEncoderCommon.transformOutputColumnSchema(
inputField, outputColName, dropLast, keepInvalid)
}
outputFields.foldLeft(schema) { case (newSchema, outputField) =>
SchemaUtils.appendColumn(newSchema, outputField)
}
}
}
/**
* A one-hot encoder that maps a column of category indices to a column of binary vectors, with
* at most a single one-value per row that indicates the input category index.
* For example with 5 categories, an input value of 2.0 would map to an output vector of
* `[0.0, 0.0, 1.0, 0.0]`.
* The last category is not included by default (configurable via `OneHotEncoder!.dropLast`
* The last category is not included by default (configurable via `dropLast`),
* because it makes the vector entries sum up to one, and hence linearly dependent.
* So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`.
*
* @note This is different from scikit-learn's OneHotEncoder, which keeps all categories.
* The output vectors are sparse.
*
* When `handleInvalid` is configured to 'keep', an extra "category" indicating invalid values is
* added as last category. So when `dropLast` is true, invalid values are encoded as all-zeros
* vector.
*
* @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols
* come in pairs, specified by the order in the arrays, and each pair is treated independently.
*
* @see `StringIndexer` for converting categorical values into category indices
* @deprecated `OneHotEncoderEstimator` will be renamed `OneHotEncoder` and this `OneHotEncoder`
* will be removed in 3.0.0.
*/
@Since("1.4.0")
@deprecated("`OneHotEncoderEstimator` will be renamed `OneHotEncoder` and this `OneHotEncoder`" +
" will be removed in 3.0.0.", "2.3.0")
class OneHotEncoder @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Transformer
with HasInputCol with HasOutputCol with DefaultParamsWritable {
@Since("3.0.0")
class OneHotEncoder @Since("3.0.0") (@Since("3.0.0") override val uid: String)
extends Estimator[OneHotEncoderModel] with OneHotEncoderBase with DefaultParamsWritable {
@Since("1.4.0")
def this() = this(Identifiable.randomUID("oneHot"))
/**
* Whether to drop the last category in the encoded vector (default: true)
* @group param
*/
@Since("1.4.0")
final val dropLast: BooleanParam =
new BooleanParam(this, "dropLast", "whether to drop the last category")
setDefault(dropLast -> true)
/** @group getParam */
@Since("2.0.0")
def getDropLast: Boolean = $(dropLast)
@Since("3.0.0")
def this() = this(Identifiable.randomUID("oneHotEncoder"))
/** @group setParam */
@Since("1.4.0")
@Since("3.0.0")
def setInputCols(values: Array[String]): this.type = set(inputCols, values)
/** @group setParam */
@Since("3.0.0")
def setOutputCols(values: Array[String]): this.type = set(outputCols, values)
/** @group setParam */
@Since("3.0.0")
def setDropLast(value: Boolean): this.type = set(dropLast, value)
/** @group setParam */
@Since("1.4.0")
def setInputCol(value: String): this.type = set(inputCol, value)
@Since("3.0.0")
def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
/** @group setParam */
@Since("1.4.0")
def setOutputCol(value: String): this.type = set(outputCol, value)
@Since("1.4.0")
@Since("3.0.0")
override def transformSchema(schema: StructType): StructType = {
val inputColName = $(inputCol)
val outputColName = $(outputCol)
val inputFields = schema.fields
require(schema(inputColName).dataType.isInstanceOf[NumericType],
s"Input column must be of type ${NumericType.simpleString} but got " +
schema(inputColName).dataType.catalogString)
require(!inputFields.exists(_.name == outputColName),
s"Output column $outputColName already exists.")
val outputField = OneHotEncoderCommon.transformOutputColumnSchema(
schema(inputColName), outputColName, $(dropLast))
val outputFields = inputFields :+ outputField
StructType(outputFields)
val keepInvalid = $(handleInvalid) == OneHotEncoder.KEEP_INVALID
validateAndTransformSchema(schema, dropLast = $(dropLast),
keepInvalid = keepInvalid)
}
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
// schema transformation
val inputColName = $(inputCol)
val outputColName = $(outputCol)
@Since("3.0.0")
override def fit(dataset: Dataset[_]): OneHotEncoderModel = {
transformSchema(dataset.schema)
val outputAttrGroupFromSchema = AttributeGroup.fromStructField(
transformSchema(dataset.schema)(outputColName))
// Compute the plain number of categories without `handleInvalid` and
// `dropLast` taken into account.
val transformedSchema = validateAndTransformSchema(dataset.schema, dropLast = false,
keepInvalid = false)
val categorySizes = new Array[Int]($(outputCols).length)
val outputAttrGroup = if (outputAttrGroupFromSchema.size < 0) {
OneHotEncoderCommon.getOutputAttrGroupFromData(
dataset, Seq(inputColName), Seq(outputColName), $(dropLast))(0)
} else {
outputAttrGroupFromSchema
}
val metadata = outputAttrGroup.toMetadata()
// data transformation
val size = outputAttrGroup.size
val oneValue = Array(1.0)
val emptyValues = Array.empty[Double]
val emptyIndices = Array.empty[Int]
val encode = udf { label: Double =>
if (label < size) {
Vectors.sparse(size, Array(label.toInt), oneValue)
val columnToScanIndices = $(outputCols).zipWithIndex.flatMap { case (outputColName, idx) =>
val numOfAttrs = AttributeGroup.fromStructField(
transformedSchema(outputColName)).size
if (numOfAttrs < 0) {
Some(idx)
} else {
Vectors.sparse(size, emptyIndices, emptyValues)
categorySizes(idx) = numOfAttrs
None
}
}
dataset.select(col("*"), encode(col(inputColName).cast(DoubleType)).as(outputColName, metadata))
// Some input columns don't have attributes or their attributes don't have necessary info.
// We need to scan the data to get the number of values for each column.
if (columnToScanIndices.length > 0) {
val inputColNames = columnToScanIndices.map($(inputCols)(_))
val outputColNames = columnToScanIndices.map($(outputCols)(_))
// When fitting data, we want the plain number of categories without `handleInvalid` and
// `dropLast` taken into account.
val attrGroups = OneHotEncoderCommon.getOutputAttrGroupFromData(
dataset, inputColNames, outputColNames, dropLast = false)
attrGroups.zip(columnToScanIndices).foreach { case (attrGroup, idx) =>
categorySizes(idx) = attrGroup.size
}
}
val model = new OneHotEncoderModel(uid, categorySizes).setParent(this)
copyValues(model)
}
@Since("1.4.1")
@Since("3.0.0")
override def copy(extra: ParamMap): OneHotEncoder = defaultCopy(extra)
}
@Since("1.6.0")
@Since("3.0.0")
object OneHotEncoder extends DefaultParamsReadable[OneHotEncoder] {
@Since("1.6.0")
private[feature] val KEEP_INVALID: String = "keep"
private[feature] val ERROR_INVALID: String = "error"
private[feature] val supportedHandleInvalids: Array[String] = Array(KEEP_INVALID, ERROR_INVALID)
@Since("3.0.0")
override def load(path: String): OneHotEncoder = super.load(path)
}
/**
* @param categorySizes Original number of categories for each feature being encoded.
* The array contains one value for each input column, in order.
*/
@Since("3.0.0")
class OneHotEncoderModel private[ml] (
@Since("3.0.0") override val uid: String,
@Since("3.0.0") val categorySizes: Array[Int])
extends Model[OneHotEncoderModel] with OneHotEncoderBase with MLWritable {
import OneHotEncoderModel._
// Returns the category size for each index with `dropLast` and `handleInvalid`
// taken into account.
private def getConfigedCategorySizes: Array[Int] = {
val dropLast = getDropLast
val keepInvalid = getHandleInvalid == OneHotEncoder.KEEP_INVALID
if (!dropLast && keepInvalid) {
// When `handleInvalid` is "keep", an extra category is added as last category
// for invalid data.
categorySizes.map(_ + 1)
} else if (dropLast && !keepInvalid) {
// When `dropLast` is true, the last category is removed.
categorySizes.map(_ - 1)
} else {
// When `dropLast` is true and `handleInvalid` is "keep", the extra category for invalid
// data is removed. Thus, it is the same as the plain number of categories.
categorySizes
}
}
private def encoder: UserDefinedFunction = {
val keepInvalid = getHandleInvalid == OneHotEncoder.KEEP_INVALID
val configedSizes = getConfigedCategorySizes
val localCategorySizes = categorySizes
// The udf performed on input data. The first parameter is the input value. The second
// parameter is the index in inputCols of the column being encoded.
udf { (label: Double, colIdx: Int) =>
val origCategorySize = localCategorySizes(colIdx)
// idx: index in vector of the single 1-valued element
val idx = if (label >= 0 && label < origCategorySize) {
label
} else {
if (keepInvalid) {
origCategorySize
} else {
if (label < 0) {
throw new SparkException(s"Negative value: $label. Input can't be negative. " +
s"To handle invalid values, set Param handleInvalid to " +
s"${OneHotEncoder.KEEP_INVALID}")
} else {
throw new SparkException(s"Unseen value: $label. To handle unseen values, " +
s"set Param handleInvalid to ${OneHotEncoder.KEEP_INVALID}.")
}
}
}
val size = configedSizes(colIdx)
if (idx < size) {
Vectors.sparse(size, Array(idx.toInt), Array(1.0))
} else {
Vectors.sparse(size, Array.empty[Int], Array.empty[Double])
}
}
}
/** @group setParam */
@Since("3.0.0")
def setInputCols(values: Array[String]): this.type = set(inputCols, values)
/** @group setParam */
@Since("3.0.0")
def setOutputCols(values: Array[String]): this.type = set(outputCols, values)
/** @group setParam */
@Since("3.0.0")
def setDropLast(value: Boolean): this.type = set(dropLast, value)
/** @group setParam */
@Since("3.0.0")
def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
@Since("3.0.0")
override def transformSchema(schema: StructType): StructType = {
val inputColNames = $(inputCols)
require(inputColNames.length == categorySizes.length,
s"The number of input columns ${inputColNames.length} must be the same as the number of " +
s"features ${categorySizes.length} during fitting.")
val keepInvalid = $(handleInvalid) == OneHotEncoder.KEEP_INVALID
val transformedSchema = validateAndTransformSchema(schema, dropLast = $(dropLast),
keepInvalid = keepInvalid)
verifyNumOfValues(transformedSchema)
}
/**
* If the metadata of input columns also specifies the number of categories, we need to
* compare with expected category number with `handleInvalid` and `dropLast` taken into
* account. Mismatched numbers will cause exception.
*/
private def verifyNumOfValues(schema: StructType): StructType = {
val configedSizes = getConfigedCategorySizes
$(outputCols).zipWithIndex.foreach { case (outputColName, idx) =>
val inputColName = $(inputCols)(idx)
val attrGroup = AttributeGroup.fromStructField(schema(outputColName))
// If the input metadata specifies number of category for output column,
// comparing with expected category number with `handleInvalid` and
// `dropLast` taken into account.
if (attrGroup.attributes.nonEmpty) {
val numCategories = configedSizes(idx)
require(attrGroup.size == numCategories, "OneHotEncoderModel expected " +
s"$numCategories categorical values for input column $inputColName, " +
s"but the input column had metadata specifying ${attrGroup.size} values.")
}
}
schema
}
@Since("3.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val transformedSchema = transformSchema(dataset.schema, logging = true)
val keepInvalid = $(handleInvalid) == OneHotEncoder.KEEP_INVALID
val encodedColumns = $(inputCols).indices.map { idx =>
val inputColName = $(inputCols)(idx)
val outputColName = $(outputCols)(idx)
val outputAttrGroupFromSchema =
AttributeGroup.fromStructField(transformedSchema(outputColName))
val metadata = if (outputAttrGroupFromSchema.size < 0) {
OneHotEncoderCommon.createAttrGroupForAttrNames(outputColName,
categorySizes(idx), $(dropLast), keepInvalid).toMetadata()
} else {
outputAttrGroupFromSchema.toMetadata()
}
encoder(col(inputColName).cast(DoubleType), lit(idx))
.as(outputColName, metadata)
}
dataset.withColumns($(outputCols), encodedColumns)
}
@Since("3.0.0")
override def copy(extra: ParamMap): OneHotEncoderModel = {
val copied = new OneHotEncoderModel(uid, categorySizes)
copyValues(copied, extra).setParent(parent)
}
@Since("3.0.0")
override def write: MLWriter = new OneHotEncoderModelWriter(this)
}
@Since("3.0.0")
object OneHotEncoderModel extends MLReadable[OneHotEncoderModel] {
private[OneHotEncoderModel]
class OneHotEncoderModelWriter(instance: OneHotEncoderModel) extends MLWriter {
private case class Data(categorySizes: Array[Int])
override protected def saveImpl(path: String): Unit = {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.categorySizes)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
}
private class OneHotEncoderModelReader extends MLReader[OneHotEncoderModel] {
private val className = classOf[OneHotEncoderModel].getName
override def load(path: String): OneHotEncoderModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath)
.select("categorySizes")
.head()
val categorySizes = data.getAs[Seq[Int]](0).toArray
val model = new OneHotEncoderModel(metadata.uid, categorySizes)
metadata.getAndSetParams(model)
model
}
}
@Since("3.0.0")
override def read: MLReader[OneHotEncoderModel] = new OneHotEncoderModelReader
@Since("3.0.0")
override def load(path: String): OneHotEncoderModel = super.load(path)
}
/**
* Provides some helper methods used by `OneHotEncoder`.
*/
private[feature] object OneHotEncoderCommon {
private def genOutputAttrNames(inputCol: StructField): Option[Array[String]] = {
val inputAttr = Attribute.fromStructField(inputCol)
inputAttr match {
case nominal: NominalAttribute =>
if (nominal.values.isDefined) {
nominal.values
} else if (nominal.numValues.isDefined) {
nominal.numValues.map(n => Array.tabulate(n)(_.toString))
} else {
None
}
case binary: BinaryAttribute =>
if (binary.values.isDefined) {
binary.values
} else {
Some(Array.tabulate(2)(_.toString))
}
case _: NumericAttribute =>
throw new RuntimeException(
s"The input column ${inputCol.name} cannot be continuous-value.")
case _ =>
None // optimistic about unknown attributes
}
}
/** Creates an `AttributeGroup` filled by the `BinaryAttribute` named as required. */
private def genOutputAttrGroup(
outputAttrNames: Option[Array[String]],
outputColName: String): AttributeGroup = {
outputAttrNames.map { attrNames =>
val attrs: Array[Attribute] = attrNames.map { name =>
BinaryAttribute.defaultAttr.withName(name)
}
new AttributeGroup(outputColName, attrs)
}.getOrElse{
new AttributeGroup(outputColName)
}
}
/**
* Prepares the `StructField` with proper metadata for `OneHotEncoder`'s output column.
*/
def transformOutputColumnSchema(
inputCol: StructField,
outputColName: String,
dropLast: Boolean,
keepInvalid: Boolean = false): StructField = {
val outputAttrNames = genOutputAttrNames(inputCol)
val filteredOutputAttrNames = outputAttrNames.map { names =>
if (dropLast && !keepInvalid) {
require(names.length > 1,
s"The input column ${inputCol.name} should have at least two distinct values.")
names.dropRight(1)
} else if (!dropLast && keepInvalid) {
names ++ Seq("invalidValues")
} else {
names
}
}
genOutputAttrGroup(filteredOutputAttrNames, outputColName).toStructField()
}
/**
* This method is called when we want to generate `AttributeGroup` from actual data for
* one-hot encoder.
*/
def getOutputAttrGroupFromData(
dataset: Dataset[_],
inputColNames: Seq[String],
outputColNames: Seq[String],
dropLast: Boolean): Seq[AttributeGroup] = {
// The RDD approach has advantage of early-stop if any values are invalid. It seems that
// DataFrame ops don't have equivalent functions.
val columns = inputColNames.map { inputColName =>
col(inputColName).cast(DoubleType)
}
val numOfColumns = columns.length
val numAttrsArray = dataset.select(columns: _*).rdd.map { row =>
(0 until numOfColumns).map(idx => row.getDouble(idx)).toArray
}.treeAggregate(new Array[Double](numOfColumns))(
(maxValues, curValues) => {
(0 until numOfColumns).foreach { idx =>
val x = curValues(idx)
assert(x <= Int.MaxValue,
s"OneHotEncoder only supports up to ${Int.MaxValue} indices, but got $x.")
assert(x >= 0.0 && x == x.toInt,
s"Values from column ${inputColNames(idx)} must be indices, but got $x.")
maxValues(idx) = math.max(maxValues(idx), x)
}
maxValues
},
(m0, m1) => {
(0 until numOfColumns).foreach { idx =>
m0(idx) = math.max(m0(idx), m1(idx))
}
m0
}
).map(_.toInt + 1)
outputColNames.zip(numAttrsArray).map { case (outputColName, numAttrs) =>
createAttrGroupForAttrNames(outputColName, numAttrs, dropLast, keepInvalid = false)
}
}
/** Creates an `AttributeGroup` with the required number of `BinaryAttribute`. */
def createAttrGroupForAttrNames(
outputColName: String,
numAttrs: Int,
dropLast: Boolean,
keepInvalid: Boolean): AttributeGroup = {
val outputAttrNames = Array.tabulate(numAttrs)(_.toString)
val filtered = if (dropLast && !keepInvalid) {
outputAttrNames.dropRight(1)
} else if (!dropLast && keepInvalid) {
outputAttrNames ++ Seq("invalidValues")
} else {
outputAttrNames
}
genOutputAttrGroup(Some(filtered), outputColName)
}
}

View file

@ -1,528 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.feature
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCols, HasOutputCols}
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, lit, udf}
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
/** Private trait for params and common methods for OneHotEncoderEstimator and OneHotEncoderModel */
private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid
with HasInputCols with HasOutputCols {
/**
* Param for how to handle invalid data during transform().
* Options are 'keep' (invalid data presented as an extra categorical feature) or
* 'error' (throw an error).
* Note that this Param is only used during transform; during fitting, invalid data
* will result in an error.
* Default: "error"
* @group param
*/
@Since("2.3.0")
override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid",
"How to handle invalid data during transform(). " +
"Options are 'keep' (invalid data presented as an extra categorical feature) " +
"or error (throw an error). Note that this Param is only used during transform; " +
"during fitting, invalid data will result in an error.",
ParamValidators.inArray(OneHotEncoderEstimator.supportedHandleInvalids))
setDefault(handleInvalid, OneHotEncoderEstimator.ERROR_INVALID)
/**
* Whether to drop the last category in the encoded vector (default: true)
* @group param
*/
@Since("2.3.0")
final val dropLast: BooleanParam =
new BooleanParam(this, "dropLast", "whether to drop the last category")
setDefault(dropLast -> true)
/** @group getParam */
@Since("2.3.0")
def getDropLast: Boolean = $(dropLast)
protected def validateAndTransformSchema(
schema: StructType,
dropLast: Boolean,
keepInvalid: Boolean): StructType = {
val inputColNames = $(inputCols)
val outputColNames = $(outputCols)
require(inputColNames.length == outputColNames.length,
s"The number of input columns ${inputColNames.length} must be the same as the number of " +
s"output columns ${outputColNames.length}.")
// Input columns must be NumericType.
inputColNames.foreach(SchemaUtils.checkNumericType(schema, _))
// Prepares output columns with proper attributes by examining input columns.
val inputFields = $(inputCols).map(schema(_))
val outputFields = inputFields.zip(outputColNames).map { case (inputField, outputColName) =>
OneHotEncoderCommon.transformOutputColumnSchema(
inputField, outputColName, dropLast, keepInvalid)
}
outputFields.foldLeft(schema) { case (newSchema, outputField) =>
SchemaUtils.appendColumn(newSchema, outputField)
}
}
}
/**
* A one-hot encoder that maps a column of category indices to a column of binary vectors, with
* at most a single one-value per row that indicates the input category index.
* For example with 5 categories, an input value of 2.0 would map to an output vector of
* `[0.0, 0.0, 1.0, 0.0]`.
* The last category is not included by default (configurable via `dropLast`),
* because it makes the vector entries sum up to one, and hence linearly dependent.
* So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`.
*
* @note This is different from scikit-learn's OneHotEncoder, which keeps all categories.
* The output vectors are sparse.
*
* When `handleInvalid` is configured to 'keep', an extra "category" indicating invalid values is
* added as last category. So when `dropLast` is true, invalid values are encoded as all-zeros
* vector.
*
* @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols
* come in pairs, specified by the order in the arrays, and each pair is treated independently.
*
* @see `StringIndexer` for converting categorical values into category indices
*/
@Since("2.3.0")
class OneHotEncoderEstimator @Since("2.3.0") (@Since("2.3.0") override val uid: String)
extends Estimator[OneHotEncoderModel] with OneHotEncoderBase with DefaultParamsWritable {
@Since("2.3.0")
def this() = this(Identifiable.randomUID("oneHotEncoder"))
/** @group setParam */
@Since("2.3.0")
def setInputCols(values: Array[String]): this.type = set(inputCols, values)
/** @group setParam */
@Since("2.3.0")
def setOutputCols(values: Array[String]): this.type = set(outputCols, values)
/** @group setParam */
@Since("2.3.0")
def setDropLast(value: Boolean): this.type = set(dropLast, value)
/** @group setParam */
@Since("2.3.0")
def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
@Since("2.3.0")
override def transformSchema(schema: StructType): StructType = {
val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID
validateAndTransformSchema(schema, dropLast = $(dropLast),
keepInvalid = keepInvalid)
}
@Since("2.3.0")
override def fit(dataset: Dataset[_]): OneHotEncoderModel = {
transformSchema(dataset.schema)
// Compute the plain number of categories without `handleInvalid` and
// `dropLast` taken into account.
val transformedSchema = validateAndTransformSchema(dataset.schema, dropLast = false,
keepInvalid = false)
val categorySizes = new Array[Int]($(outputCols).length)
val columnToScanIndices = $(outputCols).zipWithIndex.flatMap { case (outputColName, idx) =>
val numOfAttrs = AttributeGroup.fromStructField(
transformedSchema(outputColName)).size
if (numOfAttrs < 0) {
Some(idx)
} else {
categorySizes(idx) = numOfAttrs
None
}
}
// Some input columns don't have attributes or their attributes don't have necessary info.
// We need to scan the data to get the number of values for each column.
if (columnToScanIndices.length > 0) {
val inputColNames = columnToScanIndices.map($(inputCols)(_))
val outputColNames = columnToScanIndices.map($(outputCols)(_))
// When fitting data, we want the plain number of categories without `handleInvalid` and
// `dropLast` taken into account.
val attrGroups = OneHotEncoderCommon.getOutputAttrGroupFromData(
dataset, inputColNames, outputColNames, dropLast = false)
attrGroups.zip(columnToScanIndices).foreach { case (attrGroup, idx) =>
categorySizes(idx) = attrGroup.size
}
}
val model = new OneHotEncoderModel(uid, categorySizes).setParent(this)
copyValues(model)
}
@Since("2.3.0")
override def copy(extra: ParamMap): OneHotEncoderEstimator = defaultCopy(extra)
}
@Since("2.3.0")
object OneHotEncoderEstimator extends DefaultParamsReadable[OneHotEncoderEstimator] {
private[feature] val KEEP_INVALID: String = "keep"
private[feature] val ERROR_INVALID: String = "error"
private[feature] val supportedHandleInvalids: Array[String] = Array(KEEP_INVALID, ERROR_INVALID)
@Since("2.3.0")
override def load(path: String): OneHotEncoderEstimator = super.load(path)
}
/**
* @param categorySizes Original number of categories for each feature being encoded.
* The array contains one value for each input column, in order.
*/
@Since("2.3.0")
class OneHotEncoderModel private[ml] (
@Since("2.3.0") override val uid: String,
@Since("2.3.0") val categorySizes: Array[Int])
extends Model[OneHotEncoderModel] with OneHotEncoderBase with MLWritable {
import OneHotEncoderModel._
// Returns the category size for each index with `dropLast` and `handleInvalid`
// taken into account.
private def getConfigedCategorySizes: Array[Int] = {
val dropLast = getDropLast
val keepInvalid = getHandleInvalid == OneHotEncoderEstimator.KEEP_INVALID
if (!dropLast && keepInvalid) {
// When `handleInvalid` is "keep", an extra category is added as last category
// for invalid data.
categorySizes.map(_ + 1)
} else if (dropLast && !keepInvalid) {
// When `dropLast` is true, the last category is removed.
categorySizes.map(_ - 1)
} else {
// When `dropLast` is true and `handleInvalid` is "keep", the extra category for invalid
// data is removed. Thus, it is the same as the plain number of categories.
categorySizes
}
}
private def encoder: UserDefinedFunction = {
val keepInvalid = getHandleInvalid == OneHotEncoderEstimator.KEEP_INVALID
val configedSizes = getConfigedCategorySizes
val localCategorySizes = categorySizes
// The udf performed on input data. The first parameter is the input value. The second
// parameter is the index in inputCols of the column being encoded.
udf { (label: Double, colIdx: Int) =>
val origCategorySize = localCategorySizes(colIdx)
// idx: index in vector of the single 1-valued element
val idx = if (label >= 0 && label < origCategorySize) {
label
} else {
if (keepInvalid) {
origCategorySize
} else {
if (label < 0) {
throw new SparkException(s"Negative value: $label. Input can't be negative. " +
s"To handle invalid values, set Param handleInvalid to " +
s"${OneHotEncoderEstimator.KEEP_INVALID}")
} else {
throw new SparkException(s"Unseen value: $label. To handle unseen values, " +
s"set Param handleInvalid to ${OneHotEncoderEstimator.KEEP_INVALID}.")
}
}
}
val size = configedSizes(colIdx)
if (idx < size) {
Vectors.sparse(size, Array(idx.toInt), Array(1.0))
} else {
Vectors.sparse(size, Array.empty[Int], Array.empty[Double])
}
}
}
/** @group setParam */
@Since("2.3.0")
def setInputCols(values: Array[String]): this.type = set(inputCols, values)
/** @group setParam */
@Since("2.3.0")
def setOutputCols(values: Array[String]): this.type = set(outputCols, values)
/** @group setParam */
@Since("2.3.0")
def setDropLast(value: Boolean): this.type = set(dropLast, value)
/** @group setParam */
@Since("2.3.0")
def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
@Since("2.3.0")
override def transformSchema(schema: StructType): StructType = {
val inputColNames = $(inputCols)
require(inputColNames.length == categorySizes.length,
s"The number of input columns ${inputColNames.length} must be the same as the number of " +
s"features ${categorySizes.length} during fitting.")
val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID
val transformedSchema = validateAndTransformSchema(schema, dropLast = $(dropLast),
keepInvalid = keepInvalid)
verifyNumOfValues(transformedSchema)
}
/**
* If the metadata of input columns also specifies the number of categories, we need to
* compare with expected category number with `handleInvalid` and `dropLast` taken into
* account. Mismatched numbers will cause exception.
*/
private def verifyNumOfValues(schema: StructType): StructType = {
val configedSizes = getConfigedCategorySizes
$(outputCols).zipWithIndex.foreach { case (outputColName, idx) =>
val inputColName = $(inputCols)(idx)
val attrGroup = AttributeGroup.fromStructField(schema(outputColName))
// If the input metadata specifies number of category for output column,
// comparing with expected category number with `handleInvalid` and
// `dropLast` taken into account.
if (attrGroup.attributes.nonEmpty) {
val numCategories = configedSizes(idx)
require(attrGroup.size == numCategories, "OneHotEncoderModel expected " +
s"$numCategories categorical values for input column $inputColName, " +
s"but the input column had metadata specifying ${attrGroup.size} values.")
}
}
schema
}
@Since("2.3.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val transformedSchema = transformSchema(dataset.schema, logging = true)
val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID
val encodedColumns = $(inputCols).indices.map { idx =>
val inputColName = $(inputCols)(idx)
val outputColName = $(outputCols)(idx)
val outputAttrGroupFromSchema =
AttributeGroup.fromStructField(transformedSchema(outputColName))
val metadata = if (outputAttrGroupFromSchema.size < 0) {
OneHotEncoderCommon.createAttrGroupForAttrNames(outputColName,
categorySizes(idx), $(dropLast), keepInvalid).toMetadata()
} else {
outputAttrGroupFromSchema.toMetadata()
}
encoder(col(inputColName).cast(DoubleType), lit(idx))
.as(outputColName, metadata)
}
dataset.withColumns($(outputCols), encodedColumns)
}
@Since("2.3.0")
override def copy(extra: ParamMap): OneHotEncoderModel = {
val copied = new OneHotEncoderModel(uid, categorySizes)
copyValues(copied, extra).setParent(parent)
}
@Since("2.3.0")
override def write: MLWriter = new OneHotEncoderModelWriter(this)
}
@Since("2.3.0")
object OneHotEncoderModel extends MLReadable[OneHotEncoderModel] {
private[OneHotEncoderModel]
class OneHotEncoderModelWriter(instance: OneHotEncoderModel) extends MLWriter {
private case class Data(categorySizes: Array[Int])
override protected def saveImpl(path: String): Unit = {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.categorySizes)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
}
private class OneHotEncoderModelReader extends MLReader[OneHotEncoderModel] {
private val className = classOf[OneHotEncoderModel].getName
override def load(path: String): OneHotEncoderModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath)
.select("categorySizes")
.head()
val categorySizes = data.getAs[Seq[Int]](0).toArray
val model = new OneHotEncoderModel(metadata.uid, categorySizes)
metadata.getAndSetParams(model)
model
}
}
@Since("2.3.0")
override def read: MLReader[OneHotEncoderModel] = new OneHotEncoderModelReader
@Since("2.3.0")
override def load(path: String): OneHotEncoderModel = super.load(path)
}
/**
* Provides some helper methods used by both `OneHotEncoder` and `OneHotEncoderEstimator`.
*/
private[feature] object OneHotEncoderCommon {
private def genOutputAttrNames(inputCol: StructField): Option[Array[String]] = {
val inputAttr = Attribute.fromStructField(inputCol)
inputAttr match {
case nominal: NominalAttribute =>
if (nominal.values.isDefined) {
nominal.values
} else if (nominal.numValues.isDefined) {
nominal.numValues.map(n => Array.tabulate(n)(_.toString))
} else {
None
}
case binary: BinaryAttribute =>
if (binary.values.isDefined) {
binary.values
} else {
Some(Array.tabulate(2)(_.toString))
}
case _: NumericAttribute =>
throw new RuntimeException(
s"The input column ${inputCol.name} cannot be continuous-value.")
case _ =>
None // optimistic about unknown attributes
}
}
/** Creates an `AttributeGroup` filled by the `BinaryAttribute` named as required. */
private def genOutputAttrGroup(
outputAttrNames: Option[Array[String]],
outputColName: String): AttributeGroup = {
outputAttrNames.map { attrNames =>
val attrs: Array[Attribute] = attrNames.map { name =>
BinaryAttribute.defaultAttr.withName(name)
}
new AttributeGroup(outputColName, attrs)
}.getOrElse{
new AttributeGroup(outputColName)
}
}
/**
* Prepares the `StructField` with proper metadata for `OneHotEncoder`'s output column.
*/
def transformOutputColumnSchema(
inputCol: StructField,
outputColName: String,
dropLast: Boolean,
keepInvalid: Boolean = false): StructField = {
val outputAttrNames = genOutputAttrNames(inputCol)
val filteredOutputAttrNames = outputAttrNames.map { names =>
if (dropLast && !keepInvalid) {
require(names.length > 1,
s"The input column ${inputCol.name} should have at least two distinct values.")
names.dropRight(1)
} else if (!dropLast && keepInvalid) {
names ++ Seq("invalidValues")
} else {
names
}
}
genOutputAttrGroup(filteredOutputAttrNames, outputColName).toStructField()
}
/**
* This method is called when we want to generate `AttributeGroup` from actual data for
* one-hot encoder.
*/
def getOutputAttrGroupFromData(
dataset: Dataset[_],
inputColNames: Seq[String],
outputColNames: Seq[String],
dropLast: Boolean): Seq[AttributeGroup] = {
// The RDD approach has advantage of early-stop if any values are invalid. It seems that
// DataFrame ops don't have equivalent functions.
val columns = inputColNames.map { inputColName =>
col(inputColName).cast(DoubleType)
}
val numOfColumns = columns.length
val numAttrsArray = dataset.select(columns: _*).rdd.map { row =>
(0 until numOfColumns).map(idx => row.getDouble(idx)).toArray
}.treeAggregate(new Array[Double](numOfColumns))(
(maxValues, curValues) => {
(0 until numOfColumns).foreach { idx =>
val x = curValues(idx)
assert(x <= Int.MaxValue,
s"OneHotEncoder only supports up to ${Int.MaxValue} indices, but got $x.")
assert(x >= 0.0 && x == x.toInt,
s"Values from column ${inputColNames(idx)} must be indices, but got $x.")
maxValues(idx) = math.max(maxValues(idx), x)
}
maxValues
},
(m0, m1) => {
(0 until numOfColumns).foreach { idx =>
m0(idx) = math.max(m0(idx), m1(idx))
}
m0
}
).map(_.toInt + 1)
outputColNames.zip(numAttrsArray).map { case (outputColName, numAttrs) =>
createAttrGroupForAttrNames(outputColName, numAttrs, dropLast, keepInvalid = false)
}
}
/** Creates an `AttributeGroup` with the required number of `BinaryAttribute`. */
def createAttrGroupForAttrNames(
outputColName: String,
numAttrs: Int,
dropLast: Boolean,
keepInvalid: Boolean): AttributeGroup = {
val outputAttrNames = Array.tabulate(numAttrs)(_.toString)
val filtered = if (dropLast && !keepInvalid) {
outputAttrNames.dropRight(1)
} else if (!dropLast && keepInvalid) {
outputAttrNames ++ Seq("invalidValues")
} else {
outputAttrNames
}
genOutputAttrGroup(Some(filtered), outputColName)
}
}

View file

@ -246,7 +246,7 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String)
// Formula w/o intercept, one of the categories in the first category feature is
// being used as reference category, we will not drop any category for that feature.
if (!hasIntercept && !keepReferenceCategory) {
encoderStages += new OneHotEncoderEstimator(uid)
encoderStages += new OneHotEncoder(uid)
.setInputCols(Array(indexed(term)))
.setOutputCols(Array(encodedCol))
.setDropLast(false)
@ -269,7 +269,7 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String)
if (oneHotEncodeColumns.nonEmpty) {
val (inputCols, outputCols) = oneHotEncodeColumns.toArray.unzip
encoderStages += new OneHotEncoderEstimator(uid)
encoderStages += new OneHotEncoder(uid)
.setInputCols(inputCols)
.setOutputCols(outputCols)
.setDropLast(true)

View file

@ -1,422 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.feature
import org.apache.spark.ml.attribute.{AttributeGroup, BinaryAttribute, NominalAttribute}
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
class OneHotEncoderEstimatorSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
test("params") {
ParamsSuite.checkParams(new OneHotEncoderEstimator)
}
test("OneHotEncoderEstimator dropLast = false") {
val data = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
assert(encoder.getDropLast === true)
encoder.setDropLast(false)
assert(encoder.getDropLast === false)
val model = encoder.fit(df)
testTransformer[(Double, Vector)](df, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}
}
test("OneHotEncoderEstimator dropLast = true") {
val data = Seq(
Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(2, Seq((1, 1.0)))),
Row(2.0, Vectors.sparse(2, Seq())),
Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(2, Seq())))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
val model = encoder.fit(df)
testTransformer[(Double, Vector)](df, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}
}
test("input column with ML attribute") {
val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large")
val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size")
.select(col("size").as("size", attr.toMetadata()))
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("size"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(df)
testTransformerByGlobalCheckFunc[(Double)](df, model, "encoded") { rows =>
val group = AttributeGroup.fromStructField(rows.head.schema("encoded"))
assert(group.size === 2)
assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0))
assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1))
}
}
test("input column without ML attribute") {
val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("index")
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("index"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(df)
testTransformerByGlobalCheckFunc[(Double)](df, model, "encoded") { rows =>
val group = AttributeGroup.fromStructField(rows.head.schema("encoded"))
assert(group.size === 2)
assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("0").withIndex(0))
assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("1").withIndex(1))
}
}
test("read/write") {
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("index"))
.setOutputCols(Array("encoded"))
testDefaultReadWrite(encoder)
}
test("OneHotEncoderModel read/write") {
val instance = new OneHotEncoderModel("myOneHotEncoderModel", Array(1, 2, 3))
val newInstance = testDefaultReadWrite(instance)
assert(newInstance.categorySizes === instance.categorySizes)
}
test("OneHotEncoderEstimator with varying types") {
val data = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
class NumericTypeWithEncoder[A](val numericType: NumericType)
(implicit val encoder: Encoder[(A, Vector)])
val types = Seq(
new NumericTypeWithEncoder[Short](ShortType),
new NumericTypeWithEncoder[Long](LongType),
new NumericTypeWithEncoder[Int](IntegerType),
new NumericTypeWithEncoder[Float](FloatType),
new NumericTypeWithEncoder[Byte](ByteType),
new NumericTypeWithEncoder[Double](DoubleType),
new NumericTypeWithEncoder[Decimal](DecimalType(10, 0))(ExpressionEncoder()))
for (t <- types) {
val dfWithTypes = df.select(col("input").cast(t.numericType), col("expected"))
val estimator = new OneHotEncoderEstimator()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
.setDropLast(false)
val model = estimator.fit(dfWithTypes)
testTransformer(dfWithTypes, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}(t.encoder)
}
}
test("OneHotEncoderEstimator: encoding multiple columns and dropLast = false") {
val data = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), 3.0, Vectors.sparse(4, Seq((3, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 1.0, Vectors.sparse(4, Seq((1, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0)))))
val schema = StructType(Array(
StructField("input1", DoubleType),
StructField("expected1", new VectorUDT),
StructField("input2", DoubleType),
StructField("expected2", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("input1", "input2"))
.setOutputCols(Array("output1", "output2"))
assert(encoder.getDropLast === true)
encoder.setDropLast(false)
assert(encoder.getDropLast === false)
val model = encoder.fit(df)
testTransformer[(Double, Vector, Double, Vector)](
df,
model,
"output1",
"output2",
"expected1",
"expected2") {
case Row(output1: Vector, output2: Vector, expected1: Vector, expected2: Vector) =>
assert(output1 === expected1)
assert(output2 === expected2)
}
}
test("OneHotEncoderEstimator: encoding multiple columns and dropLast = true") {
val data = Seq(
Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 2.0, Vectors.sparse(3, Seq((2, 1.0)))),
Row(1.0, Vectors.sparse(2, Seq((1, 1.0))), 3.0, Vectors.sparse(3, Seq())),
Row(2.0, Vectors.sparse(2, Seq()), 0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 1.0, Vectors.sparse(3, Seq((1, 1.0)))),
Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(2, Seq()), 2.0, Vectors.sparse(3, Seq((2, 1.0)))))
val schema = StructType(Array(
StructField("input1", DoubleType),
StructField("expected1", new VectorUDT),
StructField("input2", DoubleType),
StructField("expected2", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("input1", "input2"))
.setOutputCols(Array("output1", "output2"))
val model = encoder.fit(df)
testTransformer[(Double, Vector, Double, Vector)](
df,
model,
"output1",
"output2",
"expected1",
"expected2") {
case Row(output1: Vector, output2: Vector, expected1: Vector, expected2: Vector) =>
assert(output1 === expected1)
assert(output2 === expected2)
}
}
test("Throw error on invalid values") {
val trainingData = Seq((0, 0), (1, 1), (2, 2))
val trainingDF = trainingData.toDF("id", "a")
val testData = Seq((0, 0), (1, 2), (1, 3))
val testDF = testData.toDF("id", "a")
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("a"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(trainingDF)
testTransformerByInterceptingException[(Int, Int)](
testDF,
model,
expectedMessagePart = "Unseen value: 3.0. To handle unseen values",
firstResultCol = "encoded")
}
test("Can't transform on negative input") {
val trainingDF = Seq((0, 0), (1, 1), (2, 2)).toDF("a", "b")
val testDF = Seq((0, 0), (-1, 2), (1, 3)).toDF("a", "b")
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("a"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(trainingDF)
testTransformerByInterceptingException[(Int, Int)](
testDF,
model,
expectedMessagePart = "Negative value: -1.0. Input can't be negative",
firstResultCol = "encoded")
}
test("Keep on invalid values: dropLast = false") {
val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input")
val testData = Seq(
Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))),
Row(3.0, Vectors.sparse(4, Seq((3, 1.0)))))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val testDF = spark.createDataFrame(sc.parallelize(testData), schema)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
.setHandleInvalid("keep")
.setDropLast(false)
val model = encoder.fit(trainingDF)
testTransformer[(Double, Vector)](testDF, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}
}
test("Keep on invalid values: dropLast = true") {
val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input")
val testData = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))),
Row(3.0, Vectors.sparse(3, Seq())))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val testDF = spark.createDataFrame(sc.parallelize(testData), schema)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
.setHandleInvalid("keep")
.setDropLast(true)
val model = encoder.fit(trainingDF)
testTransformer[(Double, Vector)](testDF, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}
}
test("OneHotEncoderModel changes dropLast") {
val data = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), Vectors.sparse(2, Seq((1, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq())),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq())))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected1", new VectorUDT),
StructField("expected2", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
val model = encoder.fit(df)
model.setDropLast(false)
testTransformer[(Double, Vector, Vector)](df, model, "output", "expected1") {
case Row(output: Vector, expected1: Vector) =>
assert(output === expected1)
}
model.setDropLast(true)
testTransformer[(Double, Vector, Vector)](df, model, "output", "expected2") {
case Row(output: Vector, expected2: Vector) =>
assert(output === expected2)
}
}
test("OneHotEncoderModel changes handleInvalid") {
val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input")
val testData = Seq(
Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))),
Row(3.0, Vectors.sparse(4, Seq((3, 1.0)))))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val testDF = spark.createDataFrame(sc.parallelize(testData), schema)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
val model = encoder.fit(trainingDF)
model.setHandleInvalid("error")
testTransformerByInterceptingException[(Double, Vector)](
testDF,
model,
expectedMessagePart = "Unseen value: 3.0. To handle unseen values",
firstResultCol = "output")
model.setHandleInvalid("keep")
testTransformerByGlobalCheckFunc[(Double, Vector)](testDF, model, "output") { _ => }
}
test("Transforming on mismatched attributes") {
val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large")
val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size")
.select(col("size").as("size", attr.toMetadata()))
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("size"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(df)
val testAttr = NominalAttribute.defaultAttr.withValues("tiny", "small", "medium", "large")
val testDF = Seq(0.0, 1.0, 2.0, 3.0).map(Tuple1.apply).toDF("size")
.select(col("size").as("size", testAttr.toMetadata()))
testTransformerByInterceptingException[(Double)](
testDF,
model,
expectedMessagePart = "OneHotEncoderModel expected 2 categorical values",
firstResultCol = "encoded")
}
}

View file

@ -18,72 +18,71 @@
package org.apache.spark.ml.feature
import org.apache.spark.ml.attribute.{AttributeGroup, BinaryAttribute, NominalAttribute}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
class OneHotEncoderSuite
extends MLTest with DefaultReadWriteTest {
class OneHotEncoderSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
def stringIndexed(): DataFrame = {
val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
val df = data.toDF("id", "label")
val indexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("labelIndex")
.fit(df)
indexer.transform(df)
}
test("params") {
ParamsSuite.checkParams(new OneHotEncoder)
}
test("OneHotEncoder dropLast = false") {
val transformed = stringIndexed()
val data = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoder()
.setInputCol("labelIndex")
.setOutputCol("labelVec")
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
assert(encoder.getDropLast === true)
encoder.setDropLast(false)
assert(encoder.getDropLast === false)
val expected = Seq(
(0, Vectors.sparse(3, Seq((0, 1.0)))),
(1, Vectors.sparse(3, Seq((2, 1.0)))),
(2, Vectors.sparse(3, Seq((1, 1.0)))),
(3, Vectors.sparse(3, Seq((0, 1.0)))),
(4, Vectors.sparse(3, Seq((0, 1.0)))),
(5, Vectors.sparse(3, Seq((1, 1.0))))).toDF("id", "expected")
val withExpected = transformed.join(expected, "id")
testTransformer[(Int, String, Double, Vector)](withExpected, encoder, "labelVec", "expected") {
val model = encoder.fit(df)
testTransformer[(Double, Vector)](df, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}
}
test("OneHotEncoder dropLast = true") {
val transformed = stringIndexed()
val encoder = new OneHotEncoder()
.setInputCol("labelIndex")
.setOutputCol("labelVec")
val expected = Seq(
(0, Vectors.sparse(2, Seq((0, 1.0)))),
(1, Vectors.sparse(2, Seq())),
(2, Vectors.sparse(2, Seq((1, 1.0)))),
(3, Vectors.sparse(2, Seq((0, 1.0)))),
(4, Vectors.sparse(2, Seq((0, 1.0)))),
(5, Vectors.sparse(2, Seq((1, 1.0))))).toDF("id", "expected")
val data = Seq(
Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(2, Seq((1, 1.0)))),
Row(2.0, Vectors.sparse(2, Seq())),
Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(2, Seq())))
val withExpected = transformed.join(expected, "id")
testTransformer[(Int, String, Double, Vector)](withExpected, encoder, "labelVec", "expected") {
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoder()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
val model = encoder.fit(df)
testTransformer[(Double, Vector)](df, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}
@ -94,52 +93,61 @@ class OneHotEncoderSuite
val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size")
.select(col("size").as("size", attr.toMetadata()))
val encoder = new OneHotEncoder()
.setInputCol("size")
.setOutputCol("encoded")
testTransformerByGlobalCheckFunc[(Double)](df, encoder, "encoded") { rows =>
val group = AttributeGroup.fromStructField(rows.head.schema("encoded"))
assert(group.size === 2)
assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0))
assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1))
.setInputCols(Array("size"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(df)
testTransformerByGlobalCheckFunc[(Double)](df, model, "encoded") { rows =>
val group = AttributeGroup.fromStructField(rows.head.schema("encoded"))
assert(group.size === 2)
assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0))
assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1))
}
}
test("input column without ML attribute") {
val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("index")
val encoder = new OneHotEncoder()
.setInputCol("index")
.setOutputCol("encoded")
val rows = encoder.transform(df).select("encoded").collect()
val group = AttributeGroup.fromStructField(rows.head.schema("encoded"))
assert(group.size === 2)
assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("0").withIndex(0))
assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("1").withIndex(1))
.setInputCols(Array("index"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(df)
testTransformerByGlobalCheckFunc[(Double)](df, model, "encoded") { rows =>
val group = AttributeGroup.fromStructField(rows.head.schema("encoded"))
assert(group.size === 2)
assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("0").withIndex(0))
assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("1").withIndex(1))
}
}
test("read/write") {
val t = new OneHotEncoder()
.setInputCol("myInputCol")
.setOutputCol("myOutputCol")
.setDropLast(false)
testDefaultReadWrite(t)
val encoder = new OneHotEncoder()
.setInputCols(Array("index"))
.setOutputCols(Array("encoded"))
testDefaultReadWrite(encoder)
}
test("OneHotEncoderModel read/write") {
val instance = new OneHotEncoderModel("myOneHotEncoderModel", Array(1, 2, 3))
val newInstance = testDefaultReadWrite(instance)
assert(newInstance.categorySizes === instance.categorySizes)
}
test("OneHotEncoder with varying types") {
val df = stringIndexed()
val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large")
val expected = Seq(
(0, Vectors.sparse(3, Seq((0, 1.0)))),
(1, Vectors.sparse(3, Seq((2, 1.0)))),
(2, Vectors.sparse(3, Seq((1, 1.0)))),
(3, Vectors.sparse(3, Seq((0, 1.0)))),
(4, Vectors.sparse(3, Seq((0, 1.0)))),
(5, Vectors.sparse(3, Seq((1, 1.0))))).toDF("id", "expected")
val data = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))))
val withExpected = df.join(expected, "id")
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
class NumericTypeWithEncoder[A](val numericType: NumericType)
(implicit val encoder: Encoder[(A, Vector)])
(implicit val encoder: Encoder[(A, Vector)])
val types = Seq(
new NumericTypeWithEncoder[Short](ShortType),
@ -151,17 +159,264 @@ class OneHotEncoderSuite
new NumericTypeWithEncoder[Decimal](DecimalType(10, 0))(ExpressionEncoder()))
for (t <- types) {
val dfWithTypes = withExpected.select(col("labelIndex")
.cast(t.numericType).as("labelIndex", attr.toMetadata()), col("expected"))
val encoder = new OneHotEncoder()
.setInputCol("labelIndex")
.setOutputCol("labelVec")
val dfWithTypes = df.select(col("input").cast(t.numericType), col("expected"))
val estimator = new OneHotEncoder()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
.setDropLast(false)
testTransformer(dfWithTypes, encoder, "labelVec", "expected") {
val model = estimator.fit(dfWithTypes)
testTransformer(dfWithTypes, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}(t.encoder)
}
}
test("OneHotEncoder: encoding multiple columns and dropLast = false") {
val data = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), 3.0, Vectors.sparse(4, Seq((3, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 1.0, Vectors.sparse(4, Seq((1, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0)))))
val schema = StructType(Array(
StructField("input1", DoubleType),
StructField("expected1", new VectorUDT),
StructField("input2", DoubleType),
StructField("expected2", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoder()
.setInputCols(Array("input1", "input2"))
.setOutputCols(Array("output1", "output2"))
assert(encoder.getDropLast === true)
encoder.setDropLast(false)
assert(encoder.getDropLast === false)
val model = encoder.fit(df)
testTransformer[(Double, Vector, Double, Vector)](
df,
model,
"output1",
"output2",
"expected1",
"expected2") {
case Row(output1: Vector, output2: Vector, expected1: Vector, expected2: Vector) =>
assert(output1 === expected1)
assert(output2 === expected2)
}
}
test("OneHotEncoder: encoding multiple columns and dropLast = true") {
val data = Seq(
Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 2.0, Vectors.sparse(3, Seq((2, 1.0)))),
Row(1.0, Vectors.sparse(2, Seq((1, 1.0))), 3.0, Vectors.sparse(3, Seq())),
Row(2.0, Vectors.sparse(2, Seq()), 0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 1.0, Vectors.sparse(3, Seq((1, 1.0)))),
Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(2, Seq()), 2.0, Vectors.sparse(3, Seq((2, 1.0)))))
val schema = StructType(Array(
StructField("input1", DoubleType),
StructField("expected1", new VectorUDT),
StructField("input2", DoubleType),
StructField("expected2", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoder()
.setInputCols(Array("input1", "input2"))
.setOutputCols(Array("output1", "output2"))
val model = encoder.fit(df)
testTransformer[(Double, Vector, Double, Vector)](
df,
model,
"output1",
"output2",
"expected1",
"expected2") {
case Row(output1: Vector, output2: Vector, expected1: Vector, expected2: Vector) =>
assert(output1 === expected1)
assert(output2 === expected2)
}
}
test("Throw error on invalid values") {
val trainingData = Seq((0, 0), (1, 1), (2, 2))
val trainingDF = trainingData.toDF("id", "a")
val testData = Seq((0, 0), (1, 2), (1, 3))
val testDF = testData.toDF("id", "a")
val encoder = new OneHotEncoder()
.setInputCols(Array("a"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(trainingDF)
testTransformerByInterceptingException[(Int, Int)](
testDF,
model,
expectedMessagePart = "Unseen value: 3.0. To handle unseen values",
firstResultCol = "encoded")
}
test("Can't transform on negative input") {
val trainingDF = Seq((0, 0), (1, 1), (2, 2)).toDF("a", "b")
val testDF = Seq((0, 0), (-1, 2), (1, 3)).toDF("a", "b")
val encoder = new OneHotEncoder()
.setInputCols(Array("a"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(trainingDF)
testTransformerByInterceptingException[(Int, Int)](
testDF,
model,
expectedMessagePart = "Negative value: -1.0. Input can't be negative",
firstResultCol = "encoded")
}
test("Keep on invalid values: dropLast = false") {
val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input")
val testData = Seq(
Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))),
Row(3.0, Vectors.sparse(4, Seq((3, 1.0)))))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val testDF = spark.createDataFrame(sc.parallelize(testData), schema)
val encoder = new OneHotEncoder()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
.setHandleInvalid("keep")
.setDropLast(false)
val model = encoder.fit(trainingDF)
testTransformer[(Double, Vector)](testDF, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}
}
test("Keep on invalid values: dropLast = true") {
val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input")
val testData = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))),
Row(3.0, Vectors.sparse(3, Seq())))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val testDF = spark.createDataFrame(sc.parallelize(testData), schema)
val encoder = new OneHotEncoder()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
.setHandleInvalid("keep")
.setDropLast(true)
val model = encoder.fit(trainingDF)
testTransformer[(Double, Vector)](testDF, model, "output", "expected") {
case Row(output: Vector, expected: Vector) =>
assert(output === expected)
}
}
test("OneHotEncoderModel changes dropLast") {
val data = Seq(
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), Vectors.sparse(2, Seq((1, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq())),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))),
Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))),
Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq())))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected1", new VectorUDT),
StructField("expected2", new VectorUDT)))
val df = spark.createDataFrame(sc.parallelize(data), schema)
val encoder = new OneHotEncoder()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
val model = encoder.fit(df)
model.setDropLast(false)
testTransformer[(Double, Vector, Vector)](df, model, "output", "expected1") {
case Row(output: Vector, expected1: Vector) =>
assert(output === expected1)
}
model.setDropLast(true)
testTransformer[(Double, Vector, Vector)](df, model, "output", "expected2") {
case Row(output: Vector, expected2: Vector) =>
assert(output === expected2)
}
}
test("OneHotEncoderModel changes handleInvalid") {
val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input")
val testData = Seq(
Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))),
Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))),
Row(3.0, Vectors.sparse(4, Seq((3, 1.0)))))
val schema = StructType(Array(
StructField("input", DoubleType),
StructField("expected", new VectorUDT)))
val testDF = spark.createDataFrame(sc.parallelize(testData), schema)
val encoder = new OneHotEncoder()
.setInputCols(Array("input"))
.setOutputCols(Array("output"))
val model = encoder.fit(trainingDF)
model.setHandleInvalid("error")
testTransformerByInterceptingException[(Double, Vector)](
testDF,
model,
expectedMessagePart = "Unseen value: 3.0. To handle unseen values",
firstResultCol = "output")
model.setHandleInvalid("keep")
testTransformerByGlobalCheckFunc[(Double, Vector)](testDF, model, "output") { _ => }
}
test("Transforming on mismatched attributes") {
val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large")
val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size")
.select(col("size").as("size", attr.toMetadata()))
val encoder = new OneHotEncoder()
.setInputCols(Array("size"))
.setOutputCols(Array("encoded"))
val model = encoder.fit(df)
val testAttr = NominalAttribute.defaultAttr.withValues("tiny", "small", "medium", "large")
val testDF = Seq(0.0, 1.0, 2.0, 3.0).map(Tuple1.apply).toDF("size")
.select(col("size").as("size", testAttr.toMetadata()))
testTransformerByInterceptingException[(Double)](
testDF,
model,
expectedMessagePart = "OneHotEncoderModel expected 2 categorical values",
firstResultCol = "encoded")
}
}

View file

@ -228,6 +228,18 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.writer.DataWriterFactory.createWriter"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter"),
// [SPARK-26133][ML] Remove deprecated OneHotEncoder and rename OneHotEncoderEstimator to OneHotEncoder
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.OneHotEncoderEstimator"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.OneHotEncoder"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.getInputCol"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.getOutputCol"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.inputCol"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.setInputCol"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.setOutputCol"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.outputCol"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.OneHotEncoderEstimator$"),
// [SPARK-26141] Enable custom metrics implementation in shuffle write
// Following are Java private classes
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"),

View file

@ -44,8 +44,7 @@ __all__ = ['Binarizer',
'MinMaxScaler', 'MinMaxScalerModel',
'NGram',
'Normalizer',
'OneHotEncoder',
'OneHotEncoderEstimator', 'OneHotEncoderModel',
'OneHotEncoder', 'OneHotEncoderModel',
'PCA', 'PCAModel',
'PolynomialExpansion',
'QuantileDiscretizer',
@ -1642,91 +1641,8 @@ class Normalizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, Jav
@inherit_doc
class OneHotEncoder(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
"""
A one-hot encoder that maps a column of category indices to a
column of binary vectors, with at most a single one-value per row
that indicates the input category index.
For example with 5 categories, an input value of 2.0 would map to
an output vector of `[0.0, 0.0, 1.0, 0.0]`.
The last category is not included by default (configurable via
:py:attr:`dropLast`) because it makes the vector entries sum up to
one, and hence linearly dependent.
So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`.
.. note:: This is different from scikit-learn's OneHotEncoder,
which keeps all categories. The output vectors are sparse.
.. note:: Deprecated in 2.3.0. :py:class:`OneHotEncoderEstimator` will be renamed to
:py:class:`OneHotEncoder` and this :py:class:`OneHotEncoder` will be removed in 3.0.0.
.. seealso::
:py:class:`StringIndexer` for converting categorical values into
category indices
>>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
>>> model = stringIndexer.fit(stringIndDf)
>>> td = model.transform(stringIndDf)
>>> encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
>>> encoder.transform(td).head().features
SparseVector(2, {0: 1.0})
>>> encoder.setParams(outputCol="freqs").transform(td).head().freqs
SparseVector(2, {0: 1.0})
>>> params = {encoder.dropLast: False, encoder.outputCol: "test"}
>>> encoder.transform(td, params).head().test
SparseVector(3, {0: 1.0})
>>> onehotEncoderPath = temp_path + "/onehot-encoder"
>>> encoder.save(onehotEncoderPath)
>>> loadedEncoder = OneHotEncoder.load(onehotEncoderPath)
>>> loadedEncoder.getDropLast() == encoder.getDropLast()
True
.. versionadded:: 1.4.0
"""
dropLast = Param(Params._dummy(), "dropLast", "whether to drop the last category",
typeConverter=TypeConverters.toBoolean)
@keyword_only
def __init__(self, dropLast=True, inputCol=None, outputCol=None):
"""
__init__(self, dropLast=True, inputCol=None, outputCol=None)
"""
super(OneHotEncoder, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.OneHotEncoder", self.uid)
self._setDefault(dropLast=True)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
@since("1.4.0")
def setParams(self, dropLast=True, inputCol=None, outputCol=None):
"""
setParams(self, dropLast=True, inputCol=None, outputCol=None)
Sets params for this OneHotEncoder.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)
@since("1.4.0")
def setDropLast(self, value):
"""
Sets the value of :py:attr:`dropLast`.
"""
return self._set(dropLast=value)
@since("1.4.0")
def getDropLast(self):
"""
Gets the value of dropLast or its default value.
"""
return self.getOrDefault(self.dropLast)
@inherit_doc
class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHandleInvalid,
JavaMLReadable, JavaMLWritable):
class OneHotEncoder(JavaEstimator, HasInputCols, HasOutputCols, HasHandleInvalid,
JavaMLReadable, JavaMLWritable):
"""
A one-hot encoder that maps a column of category indices to a column of binary vectors, with
at most a single one-value per row that indicates the input category index.
@ -1751,13 +1667,13 @@ class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHand
>>> from pyspark.ml.linalg import Vectors
>>> df = spark.createDataFrame([(0.0,), (1.0,), (2.0,)], ["input"])
>>> ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
>>> ohe = OneHotEncoder(inputCols=["input"], outputCols=["output"])
>>> model = ohe.fit(df)
>>> model.transform(df).head().output
SparseVector(2, {0: 1.0})
>>> ohePath = temp_path + "/oheEstimator"
>>> ohe.save(ohePath)
>>> loadedOHE = OneHotEncoderEstimator.load(ohePath)
>>> loadedOHE = OneHotEncoder.load(ohePath)
>>> loadedOHE.getInputCols() == ohe.getInputCols()
True
>>> modelPath = temp_path + "/ohe-model"
@ -1784,9 +1700,9 @@ class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHand
"""
__init__(self, inputCols=None, outputCols=None, handleInvalid="error", dropLast=True)
"""
super(OneHotEncoderEstimator, self).__init__()
super(OneHotEncoder, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.feature.OneHotEncoderEstimator", self.uid)
"org.apache.spark.ml.feature.OneHotEncoder", self.uid)
self._setDefault(handleInvalid="error", dropLast=True)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@ -1796,7 +1712,7 @@ class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHand
def setParams(self, inputCols=None, outputCols=None, handleInvalid="error", dropLast=True):
"""
setParams(self, inputCols=None, outputCols=None, handleInvalid="error", dropLast=True)
Sets params for this OneHotEncoderEstimator.
Sets params for this OneHotEncoder.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)
@ -1821,7 +1737,7 @@ class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHand
class OneHotEncoderModel(JavaModel, JavaMLReadable, JavaMLWritable):
"""
Model fitted by :py:class:`OneHotEncoderEstimator`.
Model fitted by :py:class:`OneHotEncoder`.
.. versionadded:: 2.3.0
"""