diff --git a/docs/ml-features.md b/docs/ml-features.md index 882b895a9d..83a211ce02 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -779,43 +779,37 @@ for more details on the API. -## OneHotEncoder (Deprecated since 2.3.0) - -Because this existing `OneHotEncoder` is a stateless transformer, it is not usable on new data where the number of categories may differ from the training data. In order to fix this, a new `OneHotEncoderEstimator` was created that produces an `OneHotEncoderModel` when fitting. For more detail, please see [SPARK-13030](https://issues.apache.org/jira/browse/SPARK-13030). - -`OneHotEncoder` has been deprecated in 2.3.0 and will be removed in 3.0.0. Please use [OneHotEncoderEstimator](ml-features.html#onehotencoderestimator) instead. - -## OneHotEncoderEstimator +## OneHotEncoder [One-hot encoding](http://en.wikipedia.org/wiki/One-hot) maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using [StringIndexer](ml-features.html#stringindexer) first. -`OneHotEncoderEstimator` can transform multiple columns, returning an one-hot-encoded output vector column for each input column. It is common to merge these vectors into a single feature vector using [VectorAssembler](ml-features.html#vectorassembler). +`OneHotEncoder` can transform multiple columns, returning an one-hot-encoded output vector column for each input column. It is common to merge these vectors into a single feature vector using [VectorAssembler](ml-features.html#vectorassembler). -`OneHotEncoderEstimator` supports the `handleInvalid` parameter to choose how to handle invalid input during transforming data. Available options include 'keep' (any invalid inputs are assigned to an extra categorical index) and 'error' (throw an error). +`OneHotEncoder` supports the `handleInvalid` parameter to choose how to handle invalid input during transforming data. Available options include 'keep' (any invalid inputs are assigned to an extra categorical index) and 'error' (throw an error). **Examples**
-Refer to the [OneHotEncoderEstimator Scala docs](api/scala/index.html#org.apache.spark.ml.feature.OneHotEncoderEstimator) for more details on the API. +Refer to the [OneHotEncoder Scala docs](api/scala/index.html#org.apache.spark.ml.feature.OneHotEncoder) for more details on the API. -{% include_example scala/org/apache/spark/examples/ml/OneHotEncoderEstimatorExample.scala %} +{% include_example scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala %}
-Refer to the [OneHotEncoderEstimator Java docs](api/java/org/apache/spark/ml/feature/OneHotEncoderEstimator.html) +Refer to the [OneHotEncoder Java docs](api/java/org/apache/spark/ml/feature/OneHotEncoder.html) for more details on the API. -{% include_example java/org/apache/spark/examples/ml/JavaOneHotEncoderEstimatorExample.java %} +{% include_example java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java %}
-Refer to the [OneHotEncoderEstimator Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoderEstimator) for more details on the API. +Refer to the [OneHotEncoder Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoder) for more details on the API. -{% include_example python/ml/onehot_encoder_estimator_example.py %} +{% include_example python/ml/onehot_encoder_example.py %}
diff --git a/docs/ml-guide.md b/docs/ml-guide.md index aea07be34c..57d4e1fe9d 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -104,6 +104,12 @@ MLlib is under active development. The APIs marked `Experimental`/`DeveloperApi` may change in future releases, and the migration guide below will explain all changes between releases. +## From 2.4 to 3.0 + +### Breaking changes + +* `OneHotEncoder` which is deprecated in 2.3, is removed in 3.0 and `OneHotEncoderEstimator` is now renamed to `OneHotEncoder`. + ## From 2.2 to 2.3 ### Breaking changes diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderEstimatorExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java similarity index 91% rename from examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderEstimatorExample.java rename to examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java index 6f93cff94b..4b49bebf7c 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderEstimatorExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession; import java.util.Arrays; import java.util.List; -import org.apache.spark.ml.feature.OneHotEncoderEstimator; +import org.apache.spark.ml.feature.OneHotEncoder; import org.apache.spark.ml.feature.OneHotEncoderModel; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -34,11 +34,11 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; // $example off$ -public class JavaOneHotEncoderEstimatorExample { +public class JavaOneHotEncoderExample { public static void main(String[] args) { SparkSession spark = SparkSession .builder() - .appName("JavaOneHotEncoderEstimatorExample") + .appName("JavaOneHotEncoderExample") .getOrCreate(); // Note: categorical features are usually first encoded with StringIndexer @@ -59,7 +59,7 @@ public class JavaOneHotEncoderEstimatorExample { Dataset df = spark.createDataFrame(data, schema); - OneHotEncoderEstimator encoder = new OneHotEncoderEstimator() + OneHotEncoder encoder = new OneHotEncoder() .setInputCols(new String[] {"categoryIndex1", "categoryIndex2"}) .setOutputCols(new String[] {"categoryVec1", "categoryVec2"}); diff --git a/examples/src/main/python/ml/onehot_encoder_estimator_example.py b/examples/src/main/python/ml/onehot_encoder_example.py similarity index 83% rename from examples/src/main/python/ml/onehot_encoder_estimator_example.py rename to examples/src/main/python/ml/onehot_encoder_example.py index 2723e681ce..73775b79e3 100644 --- a/examples/src/main/python/ml/onehot_encoder_estimator_example.py +++ b/examples/src/main/python/ml/onehot_encoder_example.py @@ -18,14 +18,14 @@ from __future__ import print_function # $example on$ -from pyspark.ml.feature import OneHotEncoderEstimator +from pyspark.ml.feature import OneHotEncoder # $example off$ from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ - .appName("OneHotEncoderEstimatorExample")\ + .appName("OneHotEncoderExample")\ .getOrCreate() # Note: categorical features are usually first encoded with StringIndexer @@ -39,8 +39,8 @@ if __name__ == "__main__": (2.0, 0.0) ], ["categoryIndex1", "categoryIndex2"]) - encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"], - outputCols=["categoryVec1", "categoryVec2"]) + encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"], + outputCols=["categoryVec1", "categoryVec2"]) model = encoder.fit(df) encoded = model.transform(df) encoded.show() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderEstimatorExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala similarity index 89% rename from examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderEstimatorExample.scala rename to examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala index 45d816808e..742f3cdeea 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderEstimatorExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala @@ -19,15 +19,15 @@ package org.apache.spark.examples.ml // $example on$ -import org.apache.spark.ml.feature.OneHotEncoderEstimator +import org.apache.spark.ml.feature.OneHotEncoder // $example off$ import org.apache.spark.sql.SparkSession -object OneHotEncoderEstimatorExample { +object OneHotEncoderExample { def main(args: Array[String]): Unit = { val spark = SparkSession .builder - .appName("OneHotEncoderEstimatorExample") + .appName("OneHotEncoderExample") .getOrCreate() // Note: categorical features are usually first encoded with StringIndexer @@ -41,7 +41,7 @@ object OneHotEncoderEstimatorExample { (2.0, 0.0) )).toDF("categoryIndex1", "categoryIndex2") - val encoder = new OneHotEncoderEstimator() + val encoder = new OneHotEncoder() .setInputCols(Array("categoryIndex1", "categoryIndex2")) .setOutputCols(Array("categoryVec1", "categoryVec2")) val model = encoder.fit(df) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala index 27e4869a02..ec9792cbbd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala @@ -17,126 +17,512 @@ package org.apache.spark.ml.feature +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.ml.Transformer +import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param._ -import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} +import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCols, HasOutputCols} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset} -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{DoubleType, NumericType, StructType} +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.{col, lit, udf} +import org.apache.spark.sql.types.{DoubleType, StructField, StructType} + +/** Private trait for params and common methods for OneHotEncoder and OneHotEncoderModel */ +private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid + with HasInputCols with HasOutputCols { + + /** + * Param for how to handle invalid data during transform(). + * Options are 'keep' (invalid data presented as an extra categorical feature) or + * 'error' (throw an error). + * Note that this Param is only used during transform; during fitting, invalid data + * will result in an error. + * Default: "error" + * @group param + */ + @Since("2.3.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(OneHotEncoder.supportedHandleInvalids)) + + setDefault(handleInvalid, OneHotEncoder.ERROR_INVALID) + + /** + * Whether to drop the last category in the encoded vector (default: true) + * @group param + */ + @Since("2.3.0") + final val dropLast: BooleanParam = + new BooleanParam(this, "dropLast", "whether to drop the last category") + setDefault(dropLast -> true) + + /** @group getParam */ + @Since("2.3.0") + def getDropLast: Boolean = $(dropLast) + + protected def validateAndTransformSchema( + schema: StructType, + dropLast: Boolean, + keepInvalid: Boolean): StructType = { + val inputColNames = $(inputCols) + val outputColNames = $(outputCols) + + require(inputColNames.length == outputColNames.length, + s"The number of input columns ${inputColNames.length} must be the same as the number of " + + s"output columns ${outputColNames.length}.") + + // Input columns must be NumericType. + inputColNames.foreach(SchemaUtils.checkNumericType(schema, _)) + + // Prepares output columns with proper attributes by examining input columns. + val inputFields = $(inputCols).map(schema(_)) + + val outputFields = inputFields.zip(outputColNames).map { case (inputField, outputColName) => + OneHotEncoderCommon.transformOutputColumnSchema( + inputField, outputColName, dropLast, keepInvalid) + } + outputFields.foldLeft(schema) { case (newSchema, outputField) => + SchemaUtils.appendColumn(newSchema, outputField) + } + } +} /** * A one-hot encoder that maps a column of category indices to a column of binary vectors, with * at most a single one-value per row that indicates the input category index. * For example with 5 categories, an input value of 2.0 would map to an output vector of * `[0.0, 0.0, 1.0, 0.0]`. - * The last category is not included by default (configurable via `OneHotEncoder!.dropLast` + * The last category is not included by default (configurable via `dropLast`), * because it makes the vector entries sum up to one, and hence linearly dependent. * So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`. * * @note This is different from scikit-learn's OneHotEncoder, which keeps all categories. * The output vectors are sparse. * + * When `handleInvalid` is configured to 'keep', an extra "category" indicating invalid values is + * added as last category. So when `dropLast` is true, invalid values are encoded as all-zeros + * vector. + * + * @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols + * come in pairs, specified by the order in the arrays, and each pair is treated independently. + * * @see `StringIndexer` for converting categorical values into category indices - * @deprecated `OneHotEncoderEstimator` will be renamed `OneHotEncoder` and this `OneHotEncoder` - * will be removed in 3.0.0. */ -@Since("1.4.0") -@deprecated("`OneHotEncoderEstimator` will be renamed `OneHotEncoder` and this `OneHotEncoder`" + - " will be removed in 3.0.0.", "2.3.0") -class OneHotEncoder @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Transformer - with HasInputCol with HasOutputCol with DefaultParamsWritable { +@Since("3.0.0") +class OneHotEncoder @Since("3.0.0") (@Since("3.0.0") override val uid: String) + extends Estimator[OneHotEncoderModel] with OneHotEncoderBase with DefaultParamsWritable { - @Since("1.4.0") - def this() = this(Identifiable.randomUID("oneHot")) - - /** - * Whether to drop the last category in the encoded vector (default: true) - * @group param - */ - @Since("1.4.0") - final val dropLast: BooleanParam = - new BooleanParam(this, "dropLast", "whether to drop the last category") - setDefault(dropLast -> true) - - /** @group getParam */ - @Since("2.0.0") - def getDropLast: Boolean = $(dropLast) + @Since("3.0.0") + def this() = this(Identifiable.randomUID("oneHotEncoder")) /** @group setParam */ - @Since("1.4.0") + @Since("3.0.0") + def setInputCols(values: Array[String]): this.type = set(inputCols, values) + + /** @group setParam */ + @Since("3.0.0") + def setOutputCols(values: Array[String]): this.type = set(outputCols, values) + + /** @group setParam */ + @Since("3.0.0") def setDropLast(value: Boolean): this.type = set(dropLast, value) /** @group setParam */ - @Since("1.4.0") - def setInputCol(value: String): this.type = set(inputCol, value) + @Since("3.0.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) - /** @group setParam */ - @Since("1.4.0") - def setOutputCol(value: String): this.type = set(outputCol, value) - - @Since("1.4.0") + @Since("3.0.0") override def transformSchema(schema: StructType): StructType = { - val inputColName = $(inputCol) - val outputColName = $(outputCol) - val inputFields = schema.fields - - require(schema(inputColName).dataType.isInstanceOf[NumericType], - s"Input column must be of type ${NumericType.simpleString} but got " + - schema(inputColName).dataType.catalogString) - require(!inputFields.exists(_.name == outputColName), - s"Output column $outputColName already exists.") - - val outputField = OneHotEncoderCommon.transformOutputColumnSchema( - schema(inputColName), outputColName, $(dropLast)) - val outputFields = inputFields :+ outputField - StructType(outputFields) + val keepInvalid = $(handleInvalid) == OneHotEncoder.KEEP_INVALID + validateAndTransformSchema(schema, dropLast = $(dropLast), + keepInvalid = keepInvalid) } - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { - // schema transformation - val inputColName = $(inputCol) - val outputColName = $(outputCol) + @Since("3.0.0") + override def fit(dataset: Dataset[_]): OneHotEncoderModel = { + transformSchema(dataset.schema) - val outputAttrGroupFromSchema = AttributeGroup.fromStructField( - transformSchema(dataset.schema)(outputColName)) + // Compute the plain number of categories without `handleInvalid` and + // `dropLast` taken into account. + val transformedSchema = validateAndTransformSchema(dataset.schema, dropLast = false, + keepInvalid = false) + val categorySizes = new Array[Int]($(outputCols).length) - val outputAttrGroup = if (outputAttrGroupFromSchema.size < 0) { - OneHotEncoderCommon.getOutputAttrGroupFromData( - dataset, Seq(inputColName), Seq(outputColName), $(dropLast))(0) - } else { - outputAttrGroupFromSchema - } - - val metadata = outputAttrGroup.toMetadata() - - // data transformation - val size = outputAttrGroup.size - val oneValue = Array(1.0) - val emptyValues = Array.empty[Double] - val emptyIndices = Array.empty[Int] - val encode = udf { label: Double => - if (label < size) { - Vectors.sparse(size, Array(label.toInt), oneValue) + val columnToScanIndices = $(outputCols).zipWithIndex.flatMap { case (outputColName, idx) => + val numOfAttrs = AttributeGroup.fromStructField( + transformedSchema(outputColName)).size + if (numOfAttrs < 0) { + Some(idx) } else { - Vectors.sparse(size, emptyIndices, emptyValues) + categorySizes(idx) = numOfAttrs + None } } - dataset.select(col("*"), encode(col(inputColName).cast(DoubleType)).as(outputColName, metadata)) + // Some input columns don't have attributes or their attributes don't have necessary info. + // We need to scan the data to get the number of values for each column. + if (columnToScanIndices.length > 0) { + val inputColNames = columnToScanIndices.map($(inputCols)(_)) + val outputColNames = columnToScanIndices.map($(outputCols)(_)) + + // When fitting data, we want the plain number of categories without `handleInvalid` and + // `dropLast` taken into account. + val attrGroups = OneHotEncoderCommon.getOutputAttrGroupFromData( + dataset, inputColNames, outputColNames, dropLast = false) + attrGroups.zip(columnToScanIndices).foreach { case (attrGroup, idx) => + categorySizes(idx) = attrGroup.size + } + } + + val model = new OneHotEncoderModel(uid, categorySizes).setParent(this) + copyValues(model) } - @Since("1.4.1") + @Since("3.0.0") override def copy(extra: ParamMap): OneHotEncoder = defaultCopy(extra) } -@Since("1.6.0") +@Since("3.0.0") object OneHotEncoder extends DefaultParamsReadable[OneHotEncoder] { - @Since("1.6.0") + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val supportedHandleInvalids: Array[String] = Array(KEEP_INVALID, ERROR_INVALID) + + @Since("3.0.0") override def load(path: String): OneHotEncoder = super.load(path) } + +/** + * @param categorySizes Original number of categories for each feature being encoded. + * The array contains one value for each input column, in order. + */ +@Since("3.0.0") +class OneHotEncoderModel private[ml] ( + @Since("3.0.0") override val uid: String, + @Since("3.0.0") val categorySizes: Array[Int]) + extends Model[OneHotEncoderModel] with OneHotEncoderBase with MLWritable { + + import OneHotEncoderModel._ + + // Returns the category size for each index with `dropLast` and `handleInvalid` + // taken into account. + private def getConfigedCategorySizes: Array[Int] = { + val dropLast = getDropLast + val keepInvalid = getHandleInvalid == OneHotEncoder.KEEP_INVALID + + if (!dropLast && keepInvalid) { + // When `handleInvalid` is "keep", an extra category is added as last category + // for invalid data. + categorySizes.map(_ + 1) + } else if (dropLast && !keepInvalid) { + // When `dropLast` is true, the last category is removed. + categorySizes.map(_ - 1) + } else { + // When `dropLast` is true and `handleInvalid` is "keep", the extra category for invalid + // data is removed. Thus, it is the same as the plain number of categories. + categorySizes + } + } + + private def encoder: UserDefinedFunction = { + val keepInvalid = getHandleInvalid == OneHotEncoder.KEEP_INVALID + val configedSizes = getConfigedCategorySizes + val localCategorySizes = categorySizes + + // The udf performed on input data. The first parameter is the input value. The second + // parameter is the index in inputCols of the column being encoded. + udf { (label: Double, colIdx: Int) => + val origCategorySize = localCategorySizes(colIdx) + // idx: index in vector of the single 1-valued element + val idx = if (label >= 0 && label < origCategorySize) { + label + } else { + if (keepInvalid) { + origCategorySize + } else { + if (label < 0) { + throw new SparkException(s"Negative value: $label. Input can't be negative. " + + s"To handle invalid values, set Param handleInvalid to " + + s"${OneHotEncoder.KEEP_INVALID}") + } else { + throw new SparkException(s"Unseen value: $label. To handle unseen values, " + + s"set Param handleInvalid to ${OneHotEncoder.KEEP_INVALID}.") + } + } + } + + val size = configedSizes(colIdx) + if (idx < size) { + Vectors.sparse(size, Array(idx.toInt), Array(1.0)) + } else { + Vectors.sparse(size, Array.empty[Int], Array.empty[Double]) + } + } + } + + /** @group setParam */ + @Since("3.0.0") + def setInputCols(values: Array[String]): this.type = set(inputCols, values) + + /** @group setParam */ + @Since("3.0.0") + def setOutputCols(values: Array[String]): this.type = set(outputCols, values) + + /** @group setParam */ + @Since("3.0.0") + def setDropLast(value: Boolean): this.type = set(dropLast, value) + + /** @group setParam */ + @Since("3.0.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + @Since("3.0.0") + override def transformSchema(schema: StructType): StructType = { + val inputColNames = $(inputCols) + + require(inputColNames.length == categorySizes.length, + s"The number of input columns ${inputColNames.length} must be the same as the number of " + + s"features ${categorySizes.length} during fitting.") + + val keepInvalid = $(handleInvalid) == OneHotEncoder.KEEP_INVALID + val transformedSchema = validateAndTransformSchema(schema, dropLast = $(dropLast), + keepInvalid = keepInvalid) + verifyNumOfValues(transformedSchema) + } + + /** + * If the metadata of input columns also specifies the number of categories, we need to + * compare with expected category number with `handleInvalid` and `dropLast` taken into + * account. Mismatched numbers will cause exception. + */ + private def verifyNumOfValues(schema: StructType): StructType = { + val configedSizes = getConfigedCategorySizes + $(outputCols).zipWithIndex.foreach { case (outputColName, idx) => + val inputColName = $(inputCols)(idx) + val attrGroup = AttributeGroup.fromStructField(schema(outputColName)) + + // If the input metadata specifies number of category for output column, + // comparing with expected category number with `handleInvalid` and + // `dropLast` taken into account. + if (attrGroup.attributes.nonEmpty) { + val numCategories = configedSizes(idx) + require(attrGroup.size == numCategories, "OneHotEncoderModel expected " + + s"$numCategories categorical values for input column $inputColName, " + + s"but the input column had metadata specifying ${attrGroup.size} values.") + } + } + schema + } + + @Since("3.0.0") + override def transform(dataset: Dataset[_]): DataFrame = { + val transformedSchema = transformSchema(dataset.schema, logging = true) + val keepInvalid = $(handleInvalid) == OneHotEncoder.KEEP_INVALID + + val encodedColumns = $(inputCols).indices.map { idx => + val inputColName = $(inputCols)(idx) + val outputColName = $(outputCols)(idx) + + val outputAttrGroupFromSchema = + AttributeGroup.fromStructField(transformedSchema(outputColName)) + + val metadata = if (outputAttrGroupFromSchema.size < 0) { + OneHotEncoderCommon.createAttrGroupForAttrNames(outputColName, + categorySizes(idx), $(dropLast), keepInvalid).toMetadata() + } else { + outputAttrGroupFromSchema.toMetadata() + } + + encoder(col(inputColName).cast(DoubleType), lit(idx)) + .as(outputColName, metadata) + } + dataset.withColumns($(outputCols), encodedColumns) + } + + @Since("3.0.0") + override def copy(extra: ParamMap): OneHotEncoderModel = { + val copied = new OneHotEncoderModel(uid, categorySizes) + copyValues(copied, extra).setParent(parent) + } + + @Since("3.0.0") + override def write: MLWriter = new OneHotEncoderModelWriter(this) +} + +@Since("3.0.0") +object OneHotEncoderModel extends MLReadable[OneHotEncoderModel] { + + private[OneHotEncoderModel] + class OneHotEncoderModelWriter(instance: OneHotEncoderModel) extends MLWriter { + + private case class Data(categorySizes: Array[Int]) + + override protected def saveImpl(path: String): Unit = { + DefaultParamsWriter.saveMetadata(instance, path, sc) + val data = Data(instance.categorySizes) + val dataPath = new Path(path, "data").toString + sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + } + } + + private class OneHotEncoderModelReader extends MLReader[OneHotEncoderModel] { + + private val className = classOf[OneHotEncoderModel].getName + + override def load(path: String): OneHotEncoderModel = { + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val dataPath = new Path(path, "data").toString + val data = sparkSession.read.parquet(dataPath) + .select("categorySizes") + .head() + val categorySizes = data.getAs[Seq[Int]](0).toArray + val model = new OneHotEncoderModel(metadata.uid, categorySizes) + metadata.getAndSetParams(model) + model + } + } + + @Since("3.0.0") + override def read: MLReader[OneHotEncoderModel] = new OneHotEncoderModelReader + + @Since("3.0.0") + override def load(path: String): OneHotEncoderModel = super.load(path) +} + +/** + * Provides some helper methods used by `OneHotEncoder`. + */ +private[feature] object OneHotEncoderCommon { + + private def genOutputAttrNames(inputCol: StructField): Option[Array[String]] = { + val inputAttr = Attribute.fromStructField(inputCol) + inputAttr match { + case nominal: NominalAttribute => + if (nominal.values.isDefined) { + nominal.values + } else if (nominal.numValues.isDefined) { + nominal.numValues.map(n => Array.tabulate(n)(_.toString)) + } else { + None + } + case binary: BinaryAttribute => + if (binary.values.isDefined) { + binary.values + } else { + Some(Array.tabulate(2)(_.toString)) + } + case _: NumericAttribute => + throw new RuntimeException( + s"The input column ${inputCol.name} cannot be continuous-value.") + case _ => + None // optimistic about unknown attributes + } + } + + /** Creates an `AttributeGroup` filled by the `BinaryAttribute` named as required. */ + private def genOutputAttrGroup( + outputAttrNames: Option[Array[String]], + outputColName: String): AttributeGroup = { + outputAttrNames.map { attrNames => + val attrs: Array[Attribute] = attrNames.map { name => + BinaryAttribute.defaultAttr.withName(name) + } + new AttributeGroup(outputColName, attrs) + }.getOrElse{ + new AttributeGroup(outputColName) + } + } + + /** + * Prepares the `StructField` with proper metadata for `OneHotEncoder`'s output column. + */ + def transformOutputColumnSchema( + inputCol: StructField, + outputColName: String, + dropLast: Boolean, + keepInvalid: Boolean = false): StructField = { + val outputAttrNames = genOutputAttrNames(inputCol) + val filteredOutputAttrNames = outputAttrNames.map { names => + if (dropLast && !keepInvalid) { + require(names.length > 1, + s"The input column ${inputCol.name} should have at least two distinct values.") + names.dropRight(1) + } else if (!dropLast && keepInvalid) { + names ++ Seq("invalidValues") + } else { + names + } + } + + genOutputAttrGroup(filteredOutputAttrNames, outputColName).toStructField() + } + + /** + * This method is called when we want to generate `AttributeGroup` from actual data for + * one-hot encoder. + */ + def getOutputAttrGroupFromData( + dataset: Dataset[_], + inputColNames: Seq[String], + outputColNames: Seq[String], + dropLast: Boolean): Seq[AttributeGroup] = { + // The RDD approach has advantage of early-stop if any values are invalid. It seems that + // DataFrame ops don't have equivalent functions. + val columns = inputColNames.map { inputColName => + col(inputColName).cast(DoubleType) + } + val numOfColumns = columns.length + + val numAttrsArray = dataset.select(columns: _*).rdd.map { row => + (0 until numOfColumns).map(idx => row.getDouble(idx)).toArray + }.treeAggregate(new Array[Double](numOfColumns))( + (maxValues, curValues) => { + (0 until numOfColumns).foreach { idx => + val x = curValues(idx) + assert(x <= Int.MaxValue, + s"OneHotEncoder only supports up to ${Int.MaxValue} indices, but got $x.") + assert(x >= 0.0 && x == x.toInt, + s"Values from column ${inputColNames(idx)} must be indices, but got $x.") + maxValues(idx) = math.max(maxValues(idx), x) + } + maxValues + }, + (m0, m1) => { + (0 until numOfColumns).foreach { idx => + m0(idx) = math.max(m0(idx), m1(idx)) + } + m0 + } + ).map(_.toInt + 1) + + outputColNames.zip(numAttrsArray).map { case (outputColName, numAttrs) => + createAttrGroupForAttrNames(outputColName, numAttrs, dropLast, keepInvalid = false) + } + } + + /** Creates an `AttributeGroup` with the required number of `BinaryAttribute`. */ + def createAttrGroupForAttrNames( + outputColName: String, + numAttrs: Int, + dropLast: Boolean, + keepInvalid: Boolean): AttributeGroup = { + val outputAttrNames = Array.tabulate(numAttrs)(_.toString) + val filtered = if (dropLast && !keepInvalid) { + outputAttrNames.dropRight(1) + } else if (!dropLast && keepInvalid) { + outputAttrNames ++ Seq("invalidValues") + } else { + outputAttrNames + } + genOutputAttrGroup(Some(filtered), outputColName) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala deleted file mode 100644 index 4a44f31865..0000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala +++ /dev/null @@ -1,528 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.feature - -import org.apache.hadoop.fs.Path - -import org.apache.spark.SparkException -import org.apache.spark.annotation.Since -import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.param._ -import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCols, HasOutputCols} -import org.apache.spark.ml.util._ -import org.apache.spark.sql.{DataFrame, Dataset} -import org.apache.spark.sql.expressions.UserDefinedFunction -import org.apache.spark.sql.functions.{col, lit, udf} -import org.apache.spark.sql.types.{DoubleType, StructField, StructType} - -/** Private trait for params and common methods for OneHotEncoderEstimator and OneHotEncoderModel */ -private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid - with HasInputCols with HasOutputCols { - - /** - * Param for how to handle invalid data during transform(). - * Options are 'keep' (invalid data presented as an extra categorical feature) or - * 'error' (throw an error). - * Note that this Param is only used during transform; during fitting, invalid data - * will result in an error. - * Default: "error" - * @group param - */ - @Since("2.3.0") - override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", - "How to handle invalid data during transform(). " + - "Options are 'keep' (invalid data presented as an extra categorical feature) " + - "or error (throw an error). Note that this Param is only used during transform; " + - "during fitting, invalid data will result in an error.", - ParamValidators.inArray(OneHotEncoderEstimator.supportedHandleInvalids)) - - setDefault(handleInvalid, OneHotEncoderEstimator.ERROR_INVALID) - - /** - * Whether to drop the last category in the encoded vector (default: true) - * @group param - */ - @Since("2.3.0") - final val dropLast: BooleanParam = - new BooleanParam(this, "dropLast", "whether to drop the last category") - setDefault(dropLast -> true) - - /** @group getParam */ - @Since("2.3.0") - def getDropLast: Boolean = $(dropLast) - - protected def validateAndTransformSchema( - schema: StructType, - dropLast: Boolean, - keepInvalid: Boolean): StructType = { - val inputColNames = $(inputCols) - val outputColNames = $(outputCols) - - require(inputColNames.length == outputColNames.length, - s"The number of input columns ${inputColNames.length} must be the same as the number of " + - s"output columns ${outputColNames.length}.") - - // Input columns must be NumericType. - inputColNames.foreach(SchemaUtils.checkNumericType(schema, _)) - - // Prepares output columns with proper attributes by examining input columns. - val inputFields = $(inputCols).map(schema(_)) - - val outputFields = inputFields.zip(outputColNames).map { case (inputField, outputColName) => - OneHotEncoderCommon.transformOutputColumnSchema( - inputField, outputColName, dropLast, keepInvalid) - } - outputFields.foldLeft(schema) { case (newSchema, outputField) => - SchemaUtils.appendColumn(newSchema, outputField) - } - } -} - -/** - * A one-hot encoder that maps a column of category indices to a column of binary vectors, with - * at most a single one-value per row that indicates the input category index. - * For example with 5 categories, an input value of 2.0 would map to an output vector of - * `[0.0, 0.0, 1.0, 0.0]`. - * The last category is not included by default (configurable via `dropLast`), - * because it makes the vector entries sum up to one, and hence linearly dependent. - * So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`. - * - * @note This is different from scikit-learn's OneHotEncoder, which keeps all categories. - * The output vectors are sparse. - * - * When `handleInvalid` is configured to 'keep', an extra "category" indicating invalid values is - * added as last category. So when `dropLast` is true, invalid values are encoded as all-zeros - * vector. - * - * @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols - * come in pairs, specified by the order in the arrays, and each pair is treated independently. - * - * @see `StringIndexer` for converting categorical values into category indices - */ -@Since("2.3.0") -class OneHotEncoderEstimator @Since("2.3.0") (@Since("2.3.0") override val uid: String) - extends Estimator[OneHotEncoderModel] with OneHotEncoderBase with DefaultParamsWritable { - - @Since("2.3.0") - def this() = this(Identifiable.randomUID("oneHotEncoder")) - - /** @group setParam */ - @Since("2.3.0") - def setInputCols(values: Array[String]): this.type = set(inputCols, values) - - /** @group setParam */ - @Since("2.3.0") - def setOutputCols(values: Array[String]): this.type = set(outputCols, values) - - /** @group setParam */ - @Since("2.3.0") - def setDropLast(value: Boolean): this.type = set(dropLast, value) - - /** @group setParam */ - @Since("2.3.0") - def setHandleInvalid(value: String): this.type = set(handleInvalid, value) - - @Since("2.3.0") - override def transformSchema(schema: StructType): StructType = { - val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID - validateAndTransformSchema(schema, dropLast = $(dropLast), - keepInvalid = keepInvalid) - } - - @Since("2.3.0") - override def fit(dataset: Dataset[_]): OneHotEncoderModel = { - transformSchema(dataset.schema) - - // Compute the plain number of categories without `handleInvalid` and - // `dropLast` taken into account. - val transformedSchema = validateAndTransformSchema(dataset.schema, dropLast = false, - keepInvalid = false) - val categorySizes = new Array[Int]($(outputCols).length) - - val columnToScanIndices = $(outputCols).zipWithIndex.flatMap { case (outputColName, idx) => - val numOfAttrs = AttributeGroup.fromStructField( - transformedSchema(outputColName)).size - if (numOfAttrs < 0) { - Some(idx) - } else { - categorySizes(idx) = numOfAttrs - None - } - } - - // Some input columns don't have attributes or their attributes don't have necessary info. - // We need to scan the data to get the number of values for each column. - if (columnToScanIndices.length > 0) { - val inputColNames = columnToScanIndices.map($(inputCols)(_)) - val outputColNames = columnToScanIndices.map($(outputCols)(_)) - - // When fitting data, we want the plain number of categories without `handleInvalid` and - // `dropLast` taken into account. - val attrGroups = OneHotEncoderCommon.getOutputAttrGroupFromData( - dataset, inputColNames, outputColNames, dropLast = false) - attrGroups.zip(columnToScanIndices).foreach { case (attrGroup, idx) => - categorySizes(idx) = attrGroup.size - } - } - - val model = new OneHotEncoderModel(uid, categorySizes).setParent(this) - copyValues(model) - } - - @Since("2.3.0") - override def copy(extra: ParamMap): OneHotEncoderEstimator = defaultCopy(extra) -} - -@Since("2.3.0") -object OneHotEncoderEstimator extends DefaultParamsReadable[OneHotEncoderEstimator] { - - private[feature] val KEEP_INVALID: String = "keep" - private[feature] val ERROR_INVALID: String = "error" - private[feature] val supportedHandleInvalids: Array[String] = Array(KEEP_INVALID, ERROR_INVALID) - - @Since("2.3.0") - override def load(path: String): OneHotEncoderEstimator = super.load(path) -} - -/** - * @param categorySizes Original number of categories for each feature being encoded. - * The array contains one value for each input column, in order. - */ -@Since("2.3.0") -class OneHotEncoderModel private[ml] ( - @Since("2.3.0") override val uid: String, - @Since("2.3.0") val categorySizes: Array[Int]) - extends Model[OneHotEncoderModel] with OneHotEncoderBase with MLWritable { - - import OneHotEncoderModel._ - - // Returns the category size for each index with `dropLast` and `handleInvalid` - // taken into account. - private def getConfigedCategorySizes: Array[Int] = { - val dropLast = getDropLast - val keepInvalid = getHandleInvalid == OneHotEncoderEstimator.KEEP_INVALID - - if (!dropLast && keepInvalid) { - // When `handleInvalid` is "keep", an extra category is added as last category - // for invalid data. - categorySizes.map(_ + 1) - } else if (dropLast && !keepInvalid) { - // When `dropLast` is true, the last category is removed. - categorySizes.map(_ - 1) - } else { - // When `dropLast` is true and `handleInvalid` is "keep", the extra category for invalid - // data is removed. Thus, it is the same as the plain number of categories. - categorySizes - } - } - - private def encoder: UserDefinedFunction = { - val keepInvalid = getHandleInvalid == OneHotEncoderEstimator.KEEP_INVALID - val configedSizes = getConfigedCategorySizes - val localCategorySizes = categorySizes - - // The udf performed on input data. The first parameter is the input value. The second - // parameter is the index in inputCols of the column being encoded. - udf { (label: Double, colIdx: Int) => - val origCategorySize = localCategorySizes(colIdx) - // idx: index in vector of the single 1-valued element - val idx = if (label >= 0 && label < origCategorySize) { - label - } else { - if (keepInvalid) { - origCategorySize - } else { - if (label < 0) { - throw new SparkException(s"Negative value: $label. Input can't be negative. " + - s"To handle invalid values, set Param handleInvalid to " + - s"${OneHotEncoderEstimator.KEEP_INVALID}") - } else { - throw new SparkException(s"Unseen value: $label. To handle unseen values, " + - s"set Param handleInvalid to ${OneHotEncoderEstimator.KEEP_INVALID}.") - } - } - } - - val size = configedSizes(colIdx) - if (idx < size) { - Vectors.sparse(size, Array(idx.toInt), Array(1.0)) - } else { - Vectors.sparse(size, Array.empty[Int], Array.empty[Double]) - } - } - } - - /** @group setParam */ - @Since("2.3.0") - def setInputCols(values: Array[String]): this.type = set(inputCols, values) - - /** @group setParam */ - @Since("2.3.0") - def setOutputCols(values: Array[String]): this.type = set(outputCols, values) - - /** @group setParam */ - @Since("2.3.0") - def setDropLast(value: Boolean): this.type = set(dropLast, value) - - /** @group setParam */ - @Since("2.3.0") - def setHandleInvalid(value: String): this.type = set(handleInvalid, value) - - @Since("2.3.0") - override def transformSchema(schema: StructType): StructType = { - val inputColNames = $(inputCols) - - require(inputColNames.length == categorySizes.length, - s"The number of input columns ${inputColNames.length} must be the same as the number of " + - s"features ${categorySizes.length} during fitting.") - - val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID - val transformedSchema = validateAndTransformSchema(schema, dropLast = $(dropLast), - keepInvalid = keepInvalid) - verifyNumOfValues(transformedSchema) - } - - /** - * If the metadata of input columns also specifies the number of categories, we need to - * compare with expected category number with `handleInvalid` and `dropLast` taken into - * account. Mismatched numbers will cause exception. - */ - private def verifyNumOfValues(schema: StructType): StructType = { - val configedSizes = getConfigedCategorySizes - $(outputCols).zipWithIndex.foreach { case (outputColName, idx) => - val inputColName = $(inputCols)(idx) - val attrGroup = AttributeGroup.fromStructField(schema(outputColName)) - - // If the input metadata specifies number of category for output column, - // comparing with expected category number with `handleInvalid` and - // `dropLast` taken into account. - if (attrGroup.attributes.nonEmpty) { - val numCategories = configedSizes(idx) - require(attrGroup.size == numCategories, "OneHotEncoderModel expected " + - s"$numCategories categorical values for input column $inputColName, " + - s"but the input column had metadata specifying ${attrGroup.size} values.") - } - } - schema - } - - @Since("2.3.0") - override def transform(dataset: Dataset[_]): DataFrame = { - val transformedSchema = transformSchema(dataset.schema, logging = true) - val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID - - val encodedColumns = $(inputCols).indices.map { idx => - val inputColName = $(inputCols)(idx) - val outputColName = $(outputCols)(idx) - - val outputAttrGroupFromSchema = - AttributeGroup.fromStructField(transformedSchema(outputColName)) - - val metadata = if (outputAttrGroupFromSchema.size < 0) { - OneHotEncoderCommon.createAttrGroupForAttrNames(outputColName, - categorySizes(idx), $(dropLast), keepInvalid).toMetadata() - } else { - outputAttrGroupFromSchema.toMetadata() - } - - encoder(col(inputColName).cast(DoubleType), lit(idx)) - .as(outputColName, metadata) - } - dataset.withColumns($(outputCols), encodedColumns) - } - - @Since("2.3.0") - override def copy(extra: ParamMap): OneHotEncoderModel = { - val copied = new OneHotEncoderModel(uid, categorySizes) - copyValues(copied, extra).setParent(parent) - } - - @Since("2.3.0") - override def write: MLWriter = new OneHotEncoderModelWriter(this) -} - -@Since("2.3.0") -object OneHotEncoderModel extends MLReadable[OneHotEncoderModel] { - - private[OneHotEncoderModel] - class OneHotEncoderModelWriter(instance: OneHotEncoderModel) extends MLWriter { - - private case class Data(categorySizes: Array[Int]) - - override protected def saveImpl(path: String): Unit = { - DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.categorySizes) - val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) - } - } - - private class OneHotEncoderModelReader extends MLReader[OneHotEncoderModel] { - - private val className = classOf[OneHotEncoderModel].getName - - override def load(path: String): OneHotEncoderModel = { - val metadata = DefaultParamsReader.loadMetadata(path, sc, className) - val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath) - .select("categorySizes") - .head() - val categorySizes = data.getAs[Seq[Int]](0).toArray - val model = new OneHotEncoderModel(metadata.uid, categorySizes) - metadata.getAndSetParams(model) - model - } - } - - @Since("2.3.0") - override def read: MLReader[OneHotEncoderModel] = new OneHotEncoderModelReader - - @Since("2.3.0") - override def load(path: String): OneHotEncoderModel = super.load(path) -} - -/** - * Provides some helper methods used by both `OneHotEncoder` and `OneHotEncoderEstimator`. - */ -private[feature] object OneHotEncoderCommon { - - private def genOutputAttrNames(inputCol: StructField): Option[Array[String]] = { - val inputAttr = Attribute.fromStructField(inputCol) - inputAttr match { - case nominal: NominalAttribute => - if (nominal.values.isDefined) { - nominal.values - } else if (nominal.numValues.isDefined) { - nominal.numValues.map(n => Array.tabulate(n)(_.toString)) - } else { - None - } - case binary: BinaryAttribute => - if (binary.values.isDefined) { - binary.values - } else { - Some(Array.tabulate(2)(_.toString)) - } - case _: NumericAttribute => - throw new RuntimeException( - s"The input column ${inputCol.name} cannot be continuous-value.") - case _ => - None // optimistic about unknown attributes - } - } - - /** Creates an `AttributeGroup` filled by the `BinaryAttribute` named as required. */ - private def genOutputAttrGroup( - outputAttrNames: Option[Array[String]], - outputColName: String): AttributeGroup = { - outputAttrNames.map { attrNames => - val attrs: Array[Attribute] = attrNames.map { name => - BinaryAttribute.defaultAttr.withName(name) - } - new AttributeGroup(outputColName, attrs) - }.getOrElse{ - new AttributeGroup(outputColName) - } - } - - /** - * Prepares the `StructField` with proper metadata for `OneHotEncoder`'s output column. - */ - def transformOutputColumnSchema( - inputCol: StructField, - outputColName: String, - dropLast: Boolean, - keepInvalid: Boolean = false): StructField = { - val outputAttrNames = genOutputAttrNames(inputCol) - val filteredOutputAttrNames = outputAttrNames.map { names => - if (dropLast && !keepInvalid) { - require(names.length > 1, - s"The input column ${inputCol.name} should have at least two distinct values.") - names.dropRight(1) - } else if (!dropLast && keepInvalid) { - names ++ Seq("invalidValues") - } else { - names - } - } - - genOutputAttrGroup(filteredOutputAttrNames, outputColName).toStructField() - } - - /** - * This method is called when we want to generate `AttributeGroup` from actual data for - * one-hot encoder. - */ - def getOutputAttrGroupFromData( - dataset: Dataset[_], - inputColNames: Seq[String], - outputColNames: Seq[String], - dropLast: Boolean): Seq[AttributeGroup] = { - // The RDD approach has advantage of early-stop if any values are invalid. It seems that - // DataFrame ops don't have equivalent functions. - val columns = inputColNames.map { inputColName => - col(inputColName).cast(DoubleType) - } - val numOfColumns = columns.length - - val numAttrsArray = dataset.select(columns: _*).rdd.map { row => - (0 until numOfColumns).map(idx => row.getDouble(idx)).toArray - }.treeAggregate(new Array[Double](numOfColumns))( - (maxValues, curValues) => { - (0 until numOfColumns).foreach { idx => - val x = curValues(idx) - assert(x <= Int.MaxValue, - s"OneHotEncoder only supports up to ${Int.MaxValue} indices, but got $x.") - assert(x >= 0.0 && x == x.toInt, - s"Values from column ${inputColNames(idx)} must be indices, but got $x.") - maxValues(idx) = math.max(maxValues(idx), x) - } - maxValues - }, - (m0, m1) => { - (0 until numOfColumns).foreach { idx => - m0(idx) = math.max(m0(idx), m1(idx)) - } - m0 - } - ).map(_.toInt + 1) - - outputColNames.zip(numAttrsArray).map { case (outputColName, numAttrs) => - createAttrGroupForAttrNames(outputColName, numAttrs, dropLast, keepInvalid = false) - } - } - - /** Creates an `AttributeGroup` with the required number of `BinaryAttribute`. */ - def createAttrGroupForAttrNames( - outputColName: String, - numAttrs: Int, - dropLast: Boolean, - keepInvalid: Boolean): AttributeGroup = { - val outputAttrNames = Array.tabulate(numAttrs)(_.toString) - val filtered = if (dropLast && !keepInvalid) { - outputAttrNames.dropRight(1) - } else if (!dropLast && keepInvalid) { - outputAttrNames ++ Seq("invalidValues") - } else { - outputAttrNames - } - genOutputAttrGroup(Some(filtered), outputColName) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 346e1823f0..d7eb13772a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -246,7 +246,7 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) // Formula w/o intercept, one of the categories in the first category feature is // being used as reference category, we will not drop any category for that feature. if (!hasIntercept && !keepReferenceCategory) { - encoderStages += new OneHotEncoderEstimator(uid) + encoderStages += new OneHotEncoder(uid) .setInputCols(Array(indexed(term))) .setOutputCols(Array(encodedCol)) .setDropLast(false) @@ -269,7 +269,7 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) if (oneHotEncodeColumns.nonEmpty) { val (inputCols, outputCols) = oneHotEncodeColumns.toArray.unzip - encoderStages += new OneHotEncoderEstimator(uid) + encoderStages += new OneHotEncoder(uid) .setInputCols(inputCols) .setOutputCols(outputCols) .setDropLast(true) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala deleted file mode 100644 index d549e13262..0000000000 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala +++ /dev/null @@ -1,422 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.feature - -import org.apache.spark.ml.attribute.{AttributeGroup, BinaryAttribute, NominalAttribute} -import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} -import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} -import org.apache.spark.sql.{Encoder, Row} -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types._ - -class OneHotEncoderEstimatorSuite extends MLTest with DefaultReadWriteTest { - - import testImplicits._ - - test("params") { - ParamsSuite.checkParams(new OneHotEncoderEstimator) - } - - test("OneHotEncoderEstimator dropLast = false") { - val data = Seq( - Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), - Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))), - Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(2.0, Vectors.sparse(3, Seq((2, 1.0))))) - - val schema = StructType(Array( - StructField("input", DoubleType), - StructField("expected", new VectorUDT))) - - val df = spark.createDataFrame(sc.parallelize(data), schema) - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("input")) - .setOutputCols(Array("output")) - assert(encoder.getDropLast === true) - encoder.setDropLast(false) - assert(encoder.getDropLast === false) - val model = encoder.fit(df) - testTransformer[(Double, Vector)](df, model, "output", "expected") { - case Row(output: Vector, expected: Vector) => - assert(output === expected) - } - } - - test("OneHotEncoderEstimator dropLast = true") { - val data = Seq( - Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), - Row(1.0, Vectors.sparse(2, Seq((1, 1.0)))), - Row(2.0, Vectors.sparse(2, Seq())), - Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), - Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), - Row(2.0, Vectors.sparse(2, Seq()))) - - val schema = StructType(Array( - StructField("input", DoubleType), - StructField("expected", new VectorUDT))) - - val df = spark.createDataFrame(sc.parallelize(data), schema) - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("input")) - .setOutputCols(Array("output")) - - val model = encoder.fit(df) - testTransformer[(Double, Vector)](df, model, "output", "expected") { - case Row(output: Vector, expected: Vector) => - assert(output === expected) - } - } - - test("input column with ML attribute") { - val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large") - val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size") - .select(col("size").as("size", attr.toMetadata())) - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("size")) - .setOutputCols(Array("encoded")) - val model = encoder.fit(df) - testTransformerByGlobalCheckFunc[(Double)](df, model, "encoded") { rows => - val group = AttributeGroup.fromStructField(rows.head.schema("encoded")) - assert(group.size === 2) - assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0)) - assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1)) - } - } - - test("input column without ML attribute") { - val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("index") - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("index")) - .setOutputCols(Array("encoded")) - val model = encoder.fit(df) - testTransformerByGlobalCheckFunc[(Double)](df, model, "encoded") { rows => - val group = AttributeGroup.fromStructField(rows.head.schema("encoded")) - assert(group.size === 2) - assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("0").withIndex(0)) - assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("1").withIndex(1)) - } - } - - test("read/write") { - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("index")) - .setOutputCols(Array("encoded")) - testDefaultReadWrite(encoder) - } - - test("OneHotEncoderModel read/write") { - val instance = new OneHotEncoderModel("myOneHotEncoderModel", Array(1, 2, 3)) - val newInstance = testDefaultReadWrite(instance) - assert(newInstance.categorySizes === instance.categorySizes) - } - - test("OneHotEncoderEstimator with varying types") { - val data = Seq( - Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), - Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))), - Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(2.0, Vectors.sparse(3, Seq((2, 1.0))))) - - val schema = StructType(Array( - StructField("input", DoubleType), - StructField("expected", new VectorUDT))) - - val df = spark.createDataFrame(sc.parallelize(data), schema) - - class NumericTypeWithEncoder[A](val numericType: NumericType) - (implicit val encoder: Encoder[(A, Vector)]) - - val types = Seq( - new NumericTypeWithEncoder[Short](ShortType), - new NumericTypeWithEncoder[Long](LongType), - new NumericTypeWithEncoder[Int](IntegerType), - new NumericTypeWithEncoder[Float](FloatType), - new NumericTypeWithEncoder[Byte](ByteType), - new NumericTypeWithEncoder[Double](DoubleType), - new NumericTypeWithEncoder[Decimal](DecimalType(10, 0))(ExpressionEncoder())) - - for (t <- types) { - val dfWithTypes = df.select(col("input").cast(t.numericType), col("expected")) - val estimator = new OneHotEncoderEstimator() - .setInputCols(Array("input")) - .setOutputCols(Array("output")) - .setDropLast(false) - - val model = estimator.fit(dfWithTypes) - testTransformer(dfWithTypes, model, "output", "expected") { - case Row(output: Vector, expected: Vector) => - assert(output === expected) - }(t.encoder) - } - } - - test("OneHotEncoderEstimator: encoding multiple columns and dropLast = false") { - val data = Seq( - Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0)))), - Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), 3.0, Vectors.sparse(4, Seq((3, 1.0)))), - Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))), - Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 1.0, Vectors.sparse(4, Seq((1, 1.0)))), - Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))), - Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0))))) - - val schema = StructType(Array( - StructField("input1", DoubleType), - StructField("expected1", new VectorUDT), - StructField("input2", DoubleType), - StructField("expected2", new VectorUDT))) - - val df = spark.createDataFrame(sc.parallelize(data), schema) - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("input1", "input2")) - .setOutputCols(Array("output1", "output2")) - assert(encoder.getDropLast === true) - encoder.setDropLast(false) - assert(encoder.getDropLast === false) - - val model = encoder.fit(df) - testTransformer[(Double, Vector, Double, Vector)]( - df, - model, - "output1", - "output2", - "expected1", - "expected2") { - case Row(output1: Vector, output2: Vector, expected1: Vector, expected2: Vector) => - assert(output1 === expected1) - assert(output2 === expected2) - } - } - - test("OneHotEncoderEstimator: encoding multiple columns and dropLast = true") { - val data = Seq( - Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 2.0, Vectors.sparse(3, Seq((2, 1.0)))), - Row(1.0, Vectors.sparse(2, Seq((1, 1.0))), 3.0, Vectors.sparse(3, Seq())), - Row(2.0, Vectors.sparse(2, Seq()), 0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 1.0, Vectors.sparse(3, Seq((1, 1.0)))), - Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(2.0, Vectors.sparse(2, Seq()), 2.0, Vectors.sparse(3, Seq((2, 1.0))))) - - val schema = StructType(Array( - StructField("input1", DoubleType), - StructField("expected1", new VectorUDT), - StructField("input2", DoubleType), - StructField("expected2", new VectorUDT))) - - val df = spark.createDataFrame(sc.parallelize(data), schema) - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("input1", "input2")) - .setOutputCols(Array("output1", "output2")) - - val model = encoder.fit(df) - testTransformer[(Double, Vector, Double, Vector)]( - df, - model, - "output1", - "output2", - "expected1", - "expected2") { - case Row(output1: Vector, output2: Vector, expected1: Vector, expected2: Vector) => - assert(output1 === expected1) - assert(output2 === expected2) - } - } - - test("Throw error on invalid values") { - val trainingData = Seq((0, 0), (1, 1), (2, 2)) - val trainingDF = trainingData.toDF("id", "a") - val testData = Seq((0, 0), (1, 2), (1, 3)) - val testDF = testData.toDF("id", "a") - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("a")) - .setOutputCols(Array("encoded")) - - val model = encoder.fit(trainingDF) - testTransformerByInterceptingException[(Int, Int)]( - testDF, - model, - expectedMessagePart = "Unseen value: 3.0. To handle unseen values", - firstResultCol = "encoded") - - } - - test("Can't transform on negative input") { - val trainingDF = Seq((0, 0), (1, 1), (2, 2)).toDF("a", "b") - val testDF = Seq((0, 0), (-1, 2), (1, 3)).toDF("a", "b") - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("a")) - .setOutputCols(Array("encoded")) - - val model = encoder.fit(trainingDF) - testTransformerByInterceptingException[(Int, Int)]( - testDF, - model, - expectedMessagePart = "Negative value: -1.0. Input can't be negative", - firstResultCol = "encoded") - } - - test("Keep on invalid values: dropLast = false") { - val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") - - val testData = Seq( - Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))), - Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))), - Row(3.0, Vectors.sparse(4, Seq((3, 1.0))))) - - val schema = StructType(Array( - StructField("input", DoubleType), - StructField("expected", new VectorUDT))) - - val testDF = spark.createDataFrame(sc.parallelize(testData), schema) - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("input")) - .setOutputCols(Array("output")) - .setHandleInvalid("keep") - .setDropLast(false) - - val model = encoder.fit(trainingDF) - testTransformer[(Double, Vector)](testDF, model, "output", "expected") { - case Row(output: Vector, expected: Vector) => - assert(output === expected) - } - } - - test("Keep on invalid values: dropLast = true") { - val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") - - val testData = Seq( - Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), - Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), - Row(3.0, Vectors.sparse(3, Seq()))) - - val schema = StructType(Array( - StructField("input", DoubleType), - StructField("expected", new VectorUDT))) - - val testDF = spark.createDataFrame(sc.parallelize(testData), schema) - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("input")) - .setOutputCols(Array("output")) - .setHandleInvalid("keep") - .setDropLast(true) - - val model = encoder.fit(trainingDF) - testTransformer[(Double, Vector)](testDF, model, "output", "expected") { - case Row(output: Vector, expected: Vector) => - assert(output === expected) - } - } - - test("OneHotEncoderModel changes dropLast") { - val data = Seq( - Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), - Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), Vectors.sparse(2, Seq((1, 1.0)))), - Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq())), - Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), - Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), - Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq()))) - - val schema = StructType(Array( - StructField("input", DoubleType), - StructField("expected1", new VectorUDT), - StructField("expected2", new VectorUDT))) - - val df = spark.createDataFrame(sc.parallelize(data), schema) - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("input")) - .setOutputCols(Array("output")) - - val model = encoder.fit(df) - - model.setDropLast(false) - testTransformer[(Double, Vector, Vector)](df, model, "output", "expected1") { - case Row(output: Vector, expected1: Vector) => - assert(output === expected1) - } - - model.setDropLast(true) - testTransformer[(Double, Vector, Vector)](df, model, "output", "expected2") { - case Row(output: Vector, expected2: Vector) => - assert(output === expected2) - } - } - - test("OneHotEncoderModel changes handleInvalid") { - val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") - - val testData = Seq( - Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))), - Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))), - Row(3.0, Vectors.sparse(4, Seq((3, 1.0))))) - - val schema = StructType(Array( - StructField("input", DoubleType), - StructField("expected", new VectorUDT))) - - val testDF = spark.createDataFrame(sc.parallelize(testData), schema) - - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("input")) - .setOutputCols(Array("output")) - - val model = encoder.fit(trainingDF) - model.setHandleInvalid("error") - - testTransformerByInterceptingException[(Double, Vector)]( - testDF, - model, - expectedMessagePart = "Unseen value: 3.0. To handle unseen values", - firstResultCol = "output") - - model.setHandleInvalid("keep") - testTransformerByGlobalCheckFunc[(Double, Vector)](testDF, model, "output") { _ => } - } - - test("Transforming on mismatched attributes") { - val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large") - val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size") - .select(col("size").as("size", attr.toMetadata())) - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("size")) - .setOutputCols(Array("encoded")) - val model = encoder.fit(df) - - val testAttr = NominalAttribute.defaultAttr.withValues("tiny", "small", "medium", "large") - val testDF = Seq(0.0, 1.0, 2.0, 3.0).map(Tuple1.apply).toDF("size") - .select(col("size").as("size", testAttr.toMetadata())) - testTransformerByInterceptingException[(Double)]( - testDF, - model, - expectedMessagePart = "OneHotEncoderModel expected 2 categorical values", - firstResultCol = "encoded") - } -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala index 41b32b2ffa..d92313f4ce 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala @@ -18,72 +18,71 @@ package org.apache.spark.ml.feature import org.apache.spark.ml.attribute.{AttributeGroup, BinaryAttribute, NominalAttribute} -import org.apache.spark.ml.linalg.Vector -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} -import org.apache.spark.sql.{DataFrame, Encoder, Row} +import org.apache.spark.sql.{Encoder, Row} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ -class OneHotEncoderSuite - extends MLTest with DefaultReadWriteTest { +class OneHotEncoderSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ - def stringIndexed(): DataFrame = { - val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) - val df = data.toDF("id", "label") - val indexer = new StringIndexer() - .setInputCol("label") - .setOutputCol("labelIndex") - .fit(df) - indexer.transform(df) - } - test("params") { ParamsSuite.checkParams(new OneHotEncoder) } test("OneHotEncoder dropLast = false") { - val transformed = stringIndexed() + val data = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + val encoder = new OneHotEncoder() - .setInputCol("labelIndex") - .setOutputCol("labelVec") + .setInputCols(Array("input")) + .setOutputCols(Array("output")) assert(encoder.getDropLast === true) encoder.setDropLast(false) assert(encoder.getDropLast === false) - val expected = Seq( - (0, Vectors.sparse(3, Seq((0, 1.0)))), - (1, Vectors.sparse(3, Seq((2, 1.0)))), - (2, Vectors.sparse(3, Seq((1, 1.0)))), - (3, Vectors.sparse(3, Seq((0, 1.0)))), - (4, Vectors.sparse(3, Seq((0, 1.0)))), - (5, Vectors.sparse(3, Seq((1, 1.0))))).toDF("id", "expected") - - val withExpected = transformed.join(expected, "id") - testTransformer[(Int, String, Double, Vector)](withExpected, encoder, "labelVec", "expected") { + val model = encoder.fit(df) + testTransformer[(Double, Vector)](df, model, "output", "expected") { case Row(output: Vector, expected: Vector) => assert(output === expected) } } test("OneHotEncoder dropLast = true") { - val transformed = stringIndexed() - val encoder = new OneHotEncoder() - .setInputCol("labelIndex") - .setOutputCol("labelVec") - val expected = Seq( - (0, Vectors.sparse(2, Seq((0, 1.0)))), - (1, Vectors.sparse(2, Seq())), - (2, Vectors.sparse(2, Seq((1, 1.0)))), - (3, Vectors.sparse(2, Seq((0, 1.0)))), - (4, Vectors.sparse(2, Seq((0, 1.0)))), - (5, Vectors.sparse(2, Seq((1, 1.0))))).toDF("id", "expected") + val data = Seq( + Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(2, Seq((1, 1.0)))), + Row(2.0, Vectors.sparse(2, Seq())), + Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(2, Seq()))) - val withExpected = transformed.join(expected, "id") - testTransformer[(Int, String, Double, Vector)](withExpected, encoder, "labelVec", "expected") { + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + + val encoder = new OneHotEncoder() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + + val model = encoder.fit(df) + testTransformer[(Double, Vector)](df, model, "output", "expected") { case Row(output: Vector, expected: Vector) => assert(output === expected) } @@ -94,52 +93,61 @@ class OneHotEncoderSuite val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size") .select(col("size").as("size", attr.toMetadata())) val encoder = new OneHotEncoder() - .setInputCol("size") - .setOutputCol("encoded") - testTransformerByGlobalCheckFunc[(Double)](df, encoder, "encoded") { rows => - val group = AttributeGroup.fromStructField(rows.head.schema("encoded")) - assert(group.size === 2) - assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0)) - assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1)) + .setInputCols(Array("size")) + .setOutputCols(Array("encoded")) + val model = encoder.fit(df) + testTransformerByGlobalCheckFunc[(Double)](df, model, "encoded") { rows => + val group = AttributeGroup.fromStructField(rows.head.schema("encoded")) + assert(group.size === 2) + assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0)) + assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1)) } } - test("input column without ML attribute") { val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("index") val encoder = new OneHotEncoder() - .setInputCol("index") - .setOutputCol("encoded") - val rows = encoder.transform(df).select("encoded").collect() - val group = AttributeGroup.fromStructField(rows.head.schema("encoded")) - assert(group.size === 2) - assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("0").withIndex(0)) - assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("1").withIndex(1)) + .setInputCols(Array("index")) + .setOutputCols(Array("encoded")) + val model = encoder.fit(df) + testTransformerByGlobalCheckFunc[(Double)](df, model, "encoded") { rows => + val group = AttributeGroup.fromStructField(rows.head.schema("encoded")) + assert(group.size === 2) + assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("0").withIndex(0)) + assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("1").withIndex(1)) + } } test("read/write") { - val t = new OneHotEncoder() - .setInputCol("myInputCol") - .setOutputCol("myOutputCol") - .setDropLast(false) - testDefaultReadWrite(t) + val encoder = new OneHotEncoder() + .setInputCols(Array("index")) + .setOutputCols(Array("encoded")) + testDefaultReadWrite(encoder) + } + + test("OneHotEncoderModel read/write") { + val instance = new OneHotEncoderModel("myOneHotEncoderModel", Array(1, 2, 3)) + val newInstance = testDefaultReadWrite(instance) + assert(newInstance.categorySizes === instance.categorySizes) } test("OneHotEncoder with varying types") { - val df = stringIndexed() - val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large") - val expected = Seq( - (0, Vectors.sparse(3, Seq((0, 1.0)))), - (1, Vectors.sparse(3, Seq((2, 1.0)))), - (2, Vectors.sparse(3, Seq((1, 1.0)))), - (3, Vectors.sparse(3, Seq((0, 1.0)))), - (4, Vectors.sparse(3, Seq((0, 1.0)))), - (5, Vectors.sparse(3, Seq((1, 1.0))))).toDF("id", "expected") + val data = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))))) - val withExpected = df.join(expected, "id") + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) class NumericTypeWithEncoder[A](val numericType: NumericType) - (implicit val encoder: Encoder[(A, Vector)]) + (implicit val encoder: Encoder[(A, Vector)]) val types = Seq( new NumericTypeWithEncoder[Short](ShortType), @@ -151,17 +159,264 @@ class OneHotEncoderSuite new NumericTypeWithEncoder[Decimal](DecimalType(10, 0))(ExpressionEncoder())) for (t <- types) { - val dfWithTypes = withExpected.select(col("labelIndex") - .cast(t.numericType).as("labelIndex", attr.toMetadata()), col("expected")) - val encoder = new OneHotEncoder() - .setInputCol("labelIndex") - .setOutputCol("labelVec") + val dfWithTypes = df.select(col("input").cast(t.numericType), col("expected")) + val estimator = new OneHotEncoder() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) .setDropLast(false) - testTransformer(dfWithTypes, encoder, "labelVec", "expected") { + val model = estimator.fit(dfWithTypes) + testTransformer(dfWithTypes, model, "output", "expected") { case Row(output: Vector, expected: Vector) => assert(output === expected) }(t.encoder) } } + + test("OneHotEncoder: encoding multiple columns and dropLast = false") { + val data = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), 3.0, Vectors.sparse(4, Seq((3, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 1.0, Vectors.sparse(4, Seq((1, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0))))) + + val schema = StructType(Array( + StructField("input1", DoubleType), + StructField("expected1", new VectorUDT), + StructField("input2", DoubleType), + StructField("expected2", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + + val encoder = new OneHotEncoder() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("output1", "output2")) + assert(encoder.getDropLast === true) + encoder.setDropLast(false) + assert(encoder.getDropLast === false) + + val model = encoder.fit(df) + testTransformer[(Double, Vector, Double, Vector)]( + df, + model, + "output1", + "output2", + "expected1", + "expected2") { + case Row(output1: Vector, output2: Vector, expected1: Vector, expected2: Vector) => + assert(output1 === expected1) + assert(output2 === expected2) + } + } + + test("OneHotEncoder: encoding multiple columns and dropLast = true") { + val data = Seq( + Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 2.0, Vectors.sparse(3, Seq((2, 1.0)))), + Row(1.0, Vectors.sparse(2, Seq((1, 1.0))), 3.0, Vectors.sparse(3, Seq())), + Row(2.0, Vectors.sparse(2, Seq()), 0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 1.0, Vectors.sparse(3, Seq((1, 1.0)))), + Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(2, Seq()), 2.0, Vectors.sparse(3, Seq((2, 1.0))))) + + val schema = StructType(Array( + StructField("input1", DoubleType), + StructField("expected1", new VectorUDT), + StructField("input2", DoubleType), + StructField("expected2", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + + val encoder = new OneHotEncoder() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("output1", "output2")) + + val model = encoder.fit(df) + testTransformer[(Double, Vector, Double, Vector)]( + df, + model, + "output1", + "output2", + "expected1", + "expected2") { + case Row(output1: Vector, output2: Vector, expected1: Vector, expected2: Vector) => + assert(output1 === expected1) + assert(output2 === expected2) + } + } + + test("Throw error on invalid values") { + val trainingData = Seq((0, 0), (1, 1), (2, 2)) + val trainingDF = trainingData.toDF("id", "a") + val testData = Seq((0, 0), (1, 2), (1, 3)) + val testDF = testData.toDF("id", "a") + + val encoder = new OneHotEncoder() + .setInputCols(Array("a")) + .setOutputCols(Array("encoded")) + + val model = encoder.fit(trainingDF) + testTransformerByInterceptingException[(Int, Int)]( + testDF, + model, + expectedMessagePart = "Unseen value: 3.0. To handle unseen values", + firstResultCol = "encoded") + + } + + test("Can't transform on negative input") { + val trainingDF = Seq((0, 0), (1, 1), (2, 2)).toDF("a", "b") + val testDF = Seq((0, 0), (-1, 2), (1, 3)).toDF("a", "b") + + val encoder = new OneHotEncoder() + .setInputCols(Array("a")) + .setOutputCols(Array("encoded")) + + val model = encoder.fit(trainingDF) + testTransformerByInterceptingException[(Int, Int)]( + testDF, + model, + expectedMessagePart = "Negative value: -1.0. Input can't be negative", + firstResultCol = "encoded") + } + + test("Keep on invalid values: dropLast = false") { + val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") + + val testData = Seq( + Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))), + Row(3.0, Vectors.sparse(4, Seq((3, 1.0))))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val testDF = spark.createDataFrame(sc.parallelize(testData), schema) + + val encoder = new OneHotEncoder() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + .setHandleInvalid("keep") + .setDropLast(false) + + val model = encoder.fit(trainingDF) + testTransformer[(Double, Vector)](testDF, model, "output", "expected") { + case Row(output: Vector, expected: Vector) => + assert(output === expected) + } + } + + test("Keep on invalid values: dropLast = true") { + val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") + + val testData = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), + Row(3.0, Vectors.sparse(3, Seq()))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val testDF = spark.createDataFrame(sc.parallelize(testData), schema) + + val encoder = new OneHotEncoder() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + .setHandleInvalid("keep") + .setDropLast(true) + + val model = encoder.fit(trainingDF) + testTransformer[(Double, Vector)](testDF, model, "output", "expected") { + case Row(output: Vector, expected: Vector) => + assert(output === expected) + } + } + + test("OneHotEncoderModel changes dropLast") { + val data = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), Vectors.sparse(2, Seq((1, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq())), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq()))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected1", new VectorUDT), + StructField("expected2", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + + val encoder = new OneHotEncoder() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + + val model = encoder.fit(df) + + model.setDropLast(false) + testTransformer[(Double, Vector, Vector)](df, model, "output", "expected1") { + case Row(output: Vector, expected1: Vector) => + assert(output === expected1) + } + + model.setDropLast(true) + testTransformer[(Double, Vector, Vector)](df, model, "output", "expected2") { + case Row(output: Vector, expected2: Vector) => + assert(output === expected2) + } + } + + test("OneHotEncoderModel changes handleInvalid") { + val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") + + val testData = Seq( + Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))), + Row(3.0, Vectors.sparse(4, Seq((3, 1.0))))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val testDF = spark.createDataFrame(sc.parallelize(testData), schema) + + val encoder = new OneHotEncoder() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + + val model = encoder.fit(trainingDF) + model.setHandleInvalid("error") + + testTransformerByInterceptingException[(Double, Vector)]( + testDF, + model, + expectedMessagePart = "Unseen value: 3.0. To handle unseen values", + firstResultCol = "output") + + model.setHandleInvalid("keep") + testTransformerByGlobalCheckFunc[(Double, Vector)](testDF, model, "output") { _ => } + } + + test("Transforming on mismatched attributes") { + val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large") + val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size") + .select(col("size").as("size", attr.toMetadata())) + val encoder = new OneHotEncoder() + .setInputCols(Array("size")) + .setOutputCols(Array("encoded")) + val model = encoder.fit(df) + + val testAttr = NominalAttribute.defaultAttr.withValues("tiny", "small", "medium", "large") + val testDF = Seq(0.0, 1.0, 2.0, 3.0).map(Tuple1.apply).toDF("size") + .select(col("size").as("size", testAttr.toMetadata())) + testTransformerByInterceptingException[(Double)]( + testDF, + model, + expectedMessagePart = "OneHotEncoderModel expected 2 categorical values", + firstResultCol = "encoded") + } } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 3fabec0f60..5e97d82637 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -228,6 +228,18 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.writer.DataWriterFactory.createWriter"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter"), + // [SPARK-26133][ML] Remove deprecated OneHotEncoder and rename OneHotEncoderEstimator to OneHotEncoder + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.OneHotEncoderEstimator"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.OneHotEncoder"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.transform"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.getInputCol"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.getOutputCol"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.inputCol"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.setInputCol"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.setOutputCol"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.feature.OneHotEncoder.outputCol"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.OneHotEncoderEstimator$"), + // [SPARK-26141] Enable custom metrics implementation in shuffle write // Following are Java private classes ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"), diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 3d23700242..6cc80e181e 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -44,8 +44,7 @@ __all__ = ['Binarizer', 'MinMaxScaler', 'MinMaxScalerModel', 'NGram', 'Normalizer', - 'OneHotEncoder', - 'OneHotEncoderEstimator', 'OneHotEncoderModel', + 'OneHotEncoder', 'OneHotEncoderModel', 'PCA', 'PCAModel', 'PolynomialExpansion', 'QuantileDiscretizer', @@ -1642,91 +1641,8 @@ class Normalizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, Jav @inherit_doc -class OneHotEncoder(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): - """ - A one-hot encoder that maps a column of category indices to a - column of binary vectors, with at most a single one-value per row - that indicates the input category index. - For example with 5 categories, an input value of 2.0 would map to - an output vector of `[0.0, 0.0, 1.0, 0.0]`. - The last category is not included by default (configurable via - :py:attr:`dropLast`) because it makes the vector entries sum up to - one, and hence linearly dependent. - So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`. - - .. note:: This is different from scikit-learn's OneHotEncoder, - which keeps all categories. The output vectors are sparse. - - .. note:: Deprecated in 2.3.0. :py:class:`OneHotEncoderEstimator` will be renamed to - :py:class:`OneHotEncoder` and this :py:class:`OneHotEncoder` will be removed in 3.0.0. - - .. seealso:: - - :py:class:`StringIndexer` for converting categorical values into - category indices - - >>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed") - >>> model = stringIndexer.fit(stringIndDf) - >>> td = model.transform(stringIndDf) - >>> encoder = OneHotEncoder(inputCol="indexed", outputCol="features") - >>> encoder.transform(td).head().features - SparseVector(2, {0: 1.0}) - >>> encoder.setParams(outputCol="freqs").transform(td).head().freqs - SparseVector(2, {0: 1.0}) - >>> params = {encoder.dropLast: False, encoder.outputCol: "test"} - >>> encoder.transform(td, params).head().test - SparseVector(3, {0: 1.0}) - >>> onehotEncoderPath = temp_path + "/onehot-encoder" - >>> encoder.save(onehotEncoderPath) - >>> loadedEncoder = OneHotEncoder.load(onehotEncoderPath) - >>> loadedEncoder.getDropLast() == encoder.getDropLast() - True - - .. versionadded:: 1.4.0 - """ - - dropLast = Param(Params._dummy(), "dropLast", "whether to drop the last category", - typeConverter=TypeConverters.toBoolean) - - @keyword_only - def __init__(self, dropLast=True, inputCol=None, outputCol=None): - """ - __init__(self, dropLast=True, inputCol=None, outputCol=None) - """ - super(OneHotEncoder, self).__init__() - self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.OneHotEncoder", self.uid) - self._setDefault(dropLast=True) - kwargs = self._input_kwargs - self.setParams(**kwargs) - - @keyword_only - @since("1.4.0") - def setParams(self, dropLast=True, inputCol=None, outputCol=None): - """ - setParams(self, dropLast=True, inputCol=None, outputCol=None) - Sets params for this OneHotEncoder. - """ - kwargs = self._input_kwargs - return self._set(**kwargs) - - @since("1.4.0") - def setDropLast(self, value): - """ - Sets the value of :py:attr:`dropLast`. - """ - return self._set(dropLast=value) - - @since("1.4.0") - def getDropLast(self): - """ - Gets the value of dropLast or its default value. - """ - return self.getOrDefault(self.dropLast) - - -@inherit_doc -class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHandleInvalid, - JavaMLReadable, JavaMLWritable): +class OneHotEncoder(JavaEstimator, HasInputCols, HasOutputCols, HasHandleInvalid, + JavaMLReadable, JavaMLWritable): """ A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. @@ -1751,13 +1667,13 @@ class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHand >>> from pyspark.ml.linalg import Vectors >>> df = spark.createDataFrame([(0.0,), (1.0,), (2.0,)], ["input"]) - >>> ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"]) + >>> ohe = OneHotEncoder(inputCols=["input"], outputCols=["output"]) >>> model = ohe.fit(df) >>> model.transform(df).head().output SparseVector(2, {0: 1.0}) >>> ohePath = temp_path + "/oheEstimator" >>> ohe.save(ohePath) - >>> loadedOHE = OneHotEncoderEstimator.load(ohePath) + >>> loadedOHE = OneHotEncoder.load(ohePath) >>> loadedOHE.getInputCols() == ohe.getInputCols() True >>> modelPath = temp_path + "/ohe-model" @@ -1784,9 +1700,9 @@ class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHand """ __init__(self, inputCols=None, outputCols=None, handleInvalid="error", dropLast=True) """ - super(OneHotEncoderEstimator, self).__init__() + super(OneHotEncoder, self).__init__() self._java_obj = self._new_java_obj( - "org.apache.spark.ml.feature.OneHotEncoderEstimator", self.uid) + "org.apache.spark.ml.feature.OneHotEncoder", self.uid) self._setDefault(handleInvalid="error", dropLast=True) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -1796,7 +1712,7 @@ class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHand def setParams(self, inputCols=None, outputCols=None, handleInvalid="error", dropLast=True): """ setParams(self, inputCols=None, outputCols=None, handleInvalid="error", dropLast=True) - Sets params for this OneHotEncoderEstimator. + Sets params for this OneHotEncoder. """ kwargs = self._input_kwargs return self._set(**kwargs) @@ -1821,7 +1737,7 @@ class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHand class OneHotEncoderModel(JavaModel, JavaMLReadable, JavaMLWritable): """ - Model fitted by :py:class:`OneHotEncoderEstimator`. + Model fitted by :py:class:`OneHotEncoder`. .. versionadded:: 2.3.0 """