[SPARK-33466][ML][PYTHON] Imputer support mode(most_frequent) strategy

### What changes were proposed in this pull request?
impl a new strategy `mode`: replace missing using the most frequent value along each column.

### Why are the changes needed?
it is highly scalable, and had been a function in [sklearn.impute.SimpleImputer](https://scikit-learn.org/stable/modules/generated/sklearn.impute.SimpleImputer.html#sklearn.impute.SimpleImputer) for a long time.

### Does this PR introduce _any_ user-facing change?
Yes, a new strategy is added

### How was this patch tested?
updated testsuites

Closes #30397 from zhengruifeng/imputer_max_freq.

Lead-authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Co-authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
Ruifeng Zheng 2020-11-20 11:35:34 -06:00 committed by Sean Owen
parent 47326ac1c6
commit 116b7b72a1
3 changed files with 144 additions and 121 deletions

View file

@ -39,14 +39,16 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp
* The imputation strategy. Currently only "mean" and "median" are supported.
* If "mean", then replace missing values using the mean value of the feature.
* If "median", then replace missing values using the approximate median value of the feature.
* If "mode", then replace missing using the most frequent value of the feature.
* Default: mean
*
* @group param
*/
final val strategy: Param[String] = new Param(this, "strategy", s"strategy for imputation. " +
s"If ${Imputer.mean}, then replace missing values using the mean value of the feature. " +
s"If ${Imputer.median}, then replace missing values using the median value of the feature.",
ParamValidators.inArray[String](Array(Imputer.mean, Imputer.median)))
s"If ${Imputer.median}, then replace missing values using the median value of the feature. " +
s"If ${Imputer.mode}, then replace missing values using the most frequent value of " +
s"the feature.", ParamValidators.inArray[String](Imputer.supportedStrategies))
/** @group getParam */
def getStrategy: String = $(strategy)
@ -104,7 +106,7 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp
* For example, if the input column is IntegerType (1, 2, 4, null),
* the output will be IntegerType (1, 2, 4, 2) after mean imputation.
*
* Note that the mean/median value is computed after filtering out missing values.
* Note that the mean/median/mode value is computed after filtering out missing values.
* All Null values in the input columns are treated as missing, and so are also imputed. For
* computing median, DataFrameStatFunctions.approxQuantile is used with a relative error of 0.001.
*/
@ -132,7 +134,7 @@ class Imputer @Since("2.2.0") (@Since("2.2.0") override val uid: String)
def setOutputCols(value: Array[String]): this.type = set(outputCols, value)
/**
* Imputation strategy. Available options are ["mean", "median"].
* Imputation strategy. Available options are ["mean", "median", "mode"].
* @group setParam
*/
@Since("2.2.0")
@ -151,39 +153,42 @@ class Imputer @Since("2.2.0") (@Since("2.2.0") override val uid: String)
val spark = dataset.sparkSession
val (inputColumns, _) = getInOutCols()
val cols = inputColumns.map { inputCol =>
when(col(inputCol).equalTo($(missingValue)), null)
.when(col(inputCol).isNaN, null)
.otherwise(col(inputCol))
.cast("double")
.cast(DoubleType)
.as(inputCol)
}
val numCols = cols.length
val results = $(strategy) match {
case Imputer.mean =>
// Function avg will ignore null automatically.
// For a column only containing null, avg will return null.
val row = dataset.select(cols.map(avg): _*).head()
Array.range(0, inputColumns.length).map { i =>
if (row.isNullAt(i)) {
Double.NaN
} else {
row.getDouble(i)
}
}
Array.tabulate(numCols)(i => if (row.isNullAt(i)) Double.NaN else row.getDouble(i))
case Imputer.median =>
// Function approxQuantile will ignore null automatically.
// For a column only containing null, approxQuantile will return an empty array.
dataset.select(cols: _*).stat.approxQuantile(inputColumns, Array(0.5), $(relativeError))
.map { array =>
if (array.isEmpty) {
Double.NaN
} else {
array.head
}
}
.map(_.headOption.getOrElse(Double.NaN))
case Imputer.mode =>
import spark.implicits._
// If there is more than one mode, choose the smallest one to keep in line
// with sklearn.impute.SimpleImputer (using scipy.stats.mode).
val modes = dataset.select(cols: _*).flatMap { row =>
// Ignore null.
Iterator.range(0, numCols)
.flatMap(i => if (row.isNullAt(i)) None else Some((i, row.getDouble(i))))
}.toDF("index", "value")
.groupBy("index", "value").agg(negate(count(lit(0))).as("negative_count"))
.groupBy("index").agg(min(struct("negative_count", "value")).as("mode"))
.select("index", "mode.value")
.as[(Int, Double)].collect().toMap
Array.tabulate(numCols)(i => modes.getOrElse(i, Double.NaN))
}
val emptyCols = inputColumns.zip(results).filter(_._2.isNaN).map(_._1)
@ -212,6 +217,10 @@ object Imputer extends DefaultParamsReadable[Imputer] {
/** strategy names that Imputer currently supports. */
private[feature] val mean = "mean"
private[feature] val median = "median"
private[feature] val mode = "mode"
/* Set of strategies that Imputer supports */
private[feature] val supportedStrategies = Array(mean, median, mode)
@Since("2.2.0")
override def load(path: String): Imputer = super.load(path)

View file

@ -28,13 +28,14 @@ import org.apache.spark.sql.types._
class ImputerSuite extends MLTest with DefaultReadWriteTest {
test("Imputer for Double with default missing Value NaN") {
val df = spark.createDataFrame( Seq(
(0, 1.0, 4.0, 1.0, 1.0, 4.0, 4.0),
(1, 11.0, 12.0, 11.0, 11.0, 12.0, 12.0),
(2, 3.0, Double.NaN, 3.0, 3.0, 10.0, 12.0),
(3, Double.NaN, 14.0, 5.0, 3.0, 14.0, 14.0)
)).toDF("id", "value1", "value2", "expected_mean_value1", "expected_median_value1",
"expected_mean_value2", "expected_median_value2")
val df = spark.createDataFrame(Seq(
(0, 1.0, 4.0, 1.0, 1.0, 1.0, 4.0, 4.0, 4.0),
(1, 11.0, 12.0, 11.0, 11.0, 11.0, 12.0, 12.0, 12.0),
(2, 3.0, Double.NaN, 3.0, 3.0, 3.0, 10.0, 12.0, 4.0),
(3, Double.NaN, 14.0, 5.0, 3.0, 1.0, 14.0, 14.0, 14.0)
)).toDF("id", "value1", "value2",
"expected_mean_value1", "expected_median_value1", "expected_mode_value1",
"expected_mean_value2", "expected_median_value2", "expected_mode_value2")
val imputer = new Imputer()
.setInputCols(Array("value1", "value2"))
.setOutputCols(Array("out1", "out2"))
@ -42,23 +43,25 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Single Column: Imputer for Double with default missing Value NaN") {
val df1 = spark.createDataFrame( Seq(
(0, 1.0, 1.0, 1.0),
(1, 11.0, 11.0, 11.0),
(2, 3.0, 3.0, 3.0),
(3, Double.NaN, 5.0, 3.0)
)).toDF("id", "value", "expected_mean_value", "expected_median_value")
val df1 = spark.createDataFrame(Seq(
(0, 1.0, 1.0, 1.0, 1.0),
(1, 11.0, 11.0, 11.0, 11.0),
(2, 3.0, 3.0, 3.0, 3.0),
(3, Double.NaN, 5.0, 3.0, 1.0)
)).toDF("id", "value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val imputer1 = new Imputer()
.setInputCol("value")
.setOutputCol("out")
ImputerSuite.iterateStrategyTest(false, imputer1, df1)
val df2 = spark.createDataFrame( Seq(
(0, 4.0, 4.0, 4.0),
(1, 12.0, 12.0, 12.0),
(2, Double.NaN, 10.0, 12.0),
(3, 14.0, 14.0, 14.0)
)).toDF("id", "value", "expected_mean_value", "expected_median_value")
val df2 = spark.createDataFrame(Seq(
(0, 4.0, 4.0, 4.0, 4.0),
(1, 12.0, 12.0, 12.0, 12.0),
(2, Double.NaN, 10.0, 12.0, 4.0),
(3, 14.0, 14.0, 14.0, 14.0)
)).toDF("id", "value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val imputer2 = new Imputer()
.setInputCol("value")
.setOutputCol("out")
@ -66,12 +69,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Imputer should handle NaNs when computing surrogate value, if missingValue is not NaN") {
val df = spark.createDataFrame( Seq(
(0, 1.0, 1.0, 1.0),
(1, 3.0, 3.0, 3.0),
(2, Double.NaN, Double.NaN, Double.NaN),
(3, -1.0, 2.0, 1.0)
)).toDF("id", "value", "expected_mean_value", "expected_median_value")
val df = spark.createDataFrame(Seq(
(0, 1.0, 1.0, 1.0, 1.0),
(1, 3.0, 3.0, 3.0, 3.0),
(2, Double.NaN, Double.NaN, Double.NaN, Double.NaN),
(3, -1.0, 2.0, 1.0, 1.0)
)).toDF("id", "value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val imputer = new Imputer().setInputCols(Array("value")).setOutputCols(Array("out"))
.setMissingValue(-1.0)
ImputerSuite.iterateStrategyTest(true, imputer, df)
@ -79,64 +83,69 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
test("Single Column: Imputer should handle NaNs when computing surrogate value," +
" if missingValue is not NaN") {
val df = spark.createDataFrame( Seq(
(0, 1.0, 1.0, 1.0),
(1, 3.0, 3.0, 3.0),
(2, Double.NaN, Double.NaN, Double.NaN),
(3, -1.0, 2.0, 1.0)
)).toDF("id", "value", "expected_mean_value", "expected_median_value")
val df = spark.createDataFrame(Seq(
(0, 1.0, 1.0, 1.0, 1.0),
(1, 3.0, 3.0, 3.0, 3.0),
(2, Double.NaN, Double.NaN, Double.NaN, Double.NaN),
(3, -1.0, 2.0, 1.0, 1.0)
)).toDF("id", "value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val imputer = new Imputer().setInputCol("value").setOutputCol("out")
.setMissingValue(-1.0)
ImputerSuite.iterateStrategyTest(false, imputer, df)
}
test("Imputer for Float with missing Value -1.0") {
val df = spark.createDataFrame( Seq(
(0, 1.0F, 1.0F, 1.0F),
(1, 3.0F, 3.0F, 3.0F),
(2, 10.0F, 10.0F, 10.0F),
(3, 10.0F, 10.0F, 10.0F),
(4, -1.0F, 6.0F, 3.0F)
)).toDF("id", "value", "expected_mean_value", "expected_median_value")
val df = spark.createDataFrame(Seq(
(0, 1.0F, 1.0F, 1.0F, 1.0F),
(1, 3.0F, 3.0F, 3.0F, 3.0F),
(2, 10.0F, 10.0F, 10.0F, 10.0F),
(3, 10.0F, 10.0F, 10.0F, 10.0F),
(4, -1.0F, 6.0F, 3.0F, 10.0F)
)).toDF("id", "value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val imputer = new Imputer().setInputCols(Array("value")).setOutputCols(Array("out"))
.setMissingValue(-1)
ImputerSuite.iterateStrategyTest(true, imputer, df)
}
test("Single Column: Imputer for Float with missing Value -1.0") {
val df = spark.createDataFrame( Seq(
(0, 1.0F, 1.0F, 1.0F),
(1, 3.0F, 3.0F, 3.0F),
(2, 10.0F, 10.0F, 10.0F),
(3, 10.0F, 10.0F, 10.0F),
(4, -1.0F, 6.0F, 3.0F)
)).toDF("id", "value", "expected_mean_value", "expected_median_value")
val df = spark.createDataFrame(Seq(
(0, 1.0F, 1.0F, 1.0F, 1.0F),
(1, 3.0F, 3.0F, 3.0F, 3.0F),
(2, 10.0F, 10.0F, 10.0F, 10.0F),
(3, 10.0F, 10.0F, 10.0F, 10.0F),
(4, -1.0F, 6.0F, 3.0F, 10.0F)
)).toDF("id", "value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val imputer = new Imputer().setInputCol("value").setOutputCol("out")
.setMissingValue(-1)
ImputerSuite.iterateStrategyTest(false, imputer, df)
}
test("Imputer should impute null as well as 'missingValue'") {
val rawDf = spark.createDataFrame( Seq(
(0, 4.0, 4.0, 4.0),
(1, 10.0, 10.0, 10.0),
(2, 10.0, 10.0, 10.0),
(3, Double.NaN, 8.0, 10.0),
(4, -1.0, 8.0, 10.0)
)).toDF("id", "rawValue", "expected_mean_value", "expected_median_value")
val rawDf = spark.createDataFrame(Seq(
(0, 4.0, 4.0, 4.0, 4.0),
(1, 10.0, 10.0, 10.0, 10.0),
(2, 10.0, 10.0, 10.0, 10.0),
(3, Double.NaN, 8.0, 10.0, 10.0),
(4, -1.0, 8.0, 10.0, 10.0)
)).toDF("id", "rawValue",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val df = rawDf.selectExpr("*", "IF(rawValue=-1.0, null, rawValue) as value")
val imputer = new Imputer().setInputCols(Array("value")).setOutputCols(Array("out"))
ImputerSuite.iterateStrategyTest(true, imputer, df)
}
test("Single Column: Imputer should impute null as well as 'missingValue'") {
val rawDf = spark.createDataFrame( Seq(
(0, 4.0, 4.0, 4.0),
(1, 10.0, 10.0, 10.0),
(2, 10.0, 10.0, 10.0),
(3, Double.NaN, 8.0, 10.0),
(4, -1.0, 8.0, 10.0)
)).toDF("id", "rawValue", "expected_mean_value", "expected_median_value")
val rawDf = spark.createDataFrame(Seq(
(0, 4.0, 4.0, 4.0, 4.0),
(1, 10.0, 10.0, 10.0, 10.0),
(2, 10.0, 10.0, 10.0, 10.0),
(3, Double.NaN, 8.0, 10.0, 10.0),
(4, -1.0, 8.0, 10.0, 10.0)
)).toDF("id", "rawValue",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val df = rawDf.selectExpr("*", "IF(rawValue=-1.0, null, rawValue) as value")
val imputer = new Imputer().setInputCol("value").setOutputCol("out")
ImputerSuite.iterateStrategyTest(false, imputer, df)
@ -187,7 +196,7 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Imputer throws exception when surrogate cannot be computed") {
val df = spark.createDataFrame( Seq(
val df = spark.createDataFrame(Seq(
(0, Double.NaN, 1.0, 1.0),
(1, Double.NaN, 3.0, 3.0),
(2, Double.NaN, Double.NaN, Double.NaN)
@ -205,12 +214,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Single Column: Imputer throws exception when surrogate cannot be computed") {
val df = spark.createDataFrame( Seq(
(0, Double.NaN, 1.0, 1.0),
(1, Double.NaN, 3.0, 3.0),
(2, Double.NaN, Double.NaN, Double.NaN)
)).toDF("id", "value", "expected_mean_value", "expected_median_value")
Seq("mean", "median").foreach { strategy =>
val df = spark.createDataFrame(Seq(
(0, Double.NaN, 1.0, 1.0, 1.0),
(1, Double.NaN, 3.0, 3.0, 3.0),
(2, Double.NaN, Double.NaN, Double.NaN, Double.NaN)
)).toDF("id", "value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
Seq("mean", "median", "mode").foreach { strategy =>
val imputer = new Imputer().setInputCol("value").setOutputCol("out")
.setStrategy(strategy)
withClue("Imputer should fail all the values are invalid") {
@ -223,12 +233,12 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Imputer input & output column validation") {
val df = spark.createDataFrame( Seq(
val df = spark.createDataFrame(Seq(
(0, 1.0, 1.0, 1.0),
(1, Double.NaN, 3.0, 3.0),
(2, Double.NaN, Double.NaN, Double.NaN)
)).toDF("id", "value1", "value2", "value3")
Seq("mean", "median").foreach { strategy =>
Seq("mean", "median", "mode").foreach { strategy =>
withClue("Imputer should fail if inputCols and outputCols are different length") {
val e: IllegalArgumentException = intercept[IllegalArgumentException] {
val imputer = new Imputer().setStrategy(strategy)
@ -306,13 +316,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Imputer for IntegerType with default missing value null") {
val df = spark.createDataFrame(Seq[(Integer, Integer, Integer)](
(1, 1, 1),
(11, 11, 11),
(3, 3, 3),
(null, 5, 3)
)).toDF("value1", "expected_mean_value1", "expected_median_value1")
val df = spark.createDataFrame(Seq[(Integer, Integer, Integer, Integer)](
(1, 1, 1, 1),
(11, 11, 11, 11),
(3, 3, 3, 3),
(null, 5, 3, 1)
)).toDF("value1",
"expected_mean_value1", "expected_median_value1", "expected_mode_value1")
val imputer = new Imputer()
.setInputCols(Array("value1"))
@ -327,12 +337,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Single Column Imputer for IntegerType with default missing value null") {
val df = spark.createDataFrame(Seq[(Integer, Integer, Integer)](
(1, 1, 1),
(11, 11, 11),
(3, 3, 3),
(null, 5, 3)
)).toDF("value", "expected_mean_value", "expected_median_value")
val df = spark.createDataFrame(Seq[(Integer, Integer, Integer, Integer)](
(1, 1, 1, 1),
(11, 11, 11, 11),
(3, 3, 3, 3),
(null, 5, 3, 1)
)).toDF("value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val imputer = new Imputer()
.setInputCol("value")
@ -347,13 +358,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Imputer for IntegerType with missing value -1") {
val df = spark.createDataFrame(Seq[(Integer, Integer, Integer)](
(1, 1, 1),
(11, 11, 11),
(3, 3, 3),
(-1, 5, 3)
)).toDF("value1", "expected_mean_value1", "expected_median_value1")
val df = spark.createDataFrame(Seq[(Integer, Integer, Integer, Integer)](
(1, 1, 1, 1),
(11, 11, 11, 11),
(3, 3, 3, 3),
(-1, 5, 3, 1)
)).toDF("value1",
"expected_mean_value1", "expected_median_value1", "expected_mode_value1")
val imputer = new Imputer()
.setInputCols(Array("value1"))
@ -369,12 +380,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Single Column: Imputer for IntegerType with missing value -1") {
val df = spark.createDataFrame(Seq[(Integer, Integer, Integer)](
(1, 1, 1),
(11, 11, 11),
(3, 3, 3),
(-1, 5, 3)
)).toDF("value", "expected_mean_value", "expected_median_value")
val df = spark.createDataFrame(Seq[(Integer, Integer, Integer, Integer)](
(1, 1, 1, 1),
(11, 11, 11, 11),
(3, 3, 3, 3),
(-1, 5, 3, 1)
)).toDF("value",
"expected_mean_value", "expected_median_value", "expected_mode_value")
val imputer = new Imputer()
.setInputCol("value")
@ -402,13 +414,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
}
test("Compare single/multiple column(s) Imputer in pipeline") {
val df = spark.createDataFrame( Seq(
val df = spark.createDataFrame(Seq(
(0, 1.0, 4.0),
(1, 11.0, 12.0),
(2, 3.0, Double.NaN),
(3, Double.NaN, 14.0)
)).toDF("id", "value1", "value2")
Seq("mean", "median").foreach { strategy =>
Seq("mean", "median", "mode").foreach { strategy =>
val multiColsImputer = new Imputer()
.setInputCols(Array("value1", "value2"))
.setOutputCols(Array("result1", "result2"))
@ -450,11 +462,12 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest {
object ImputerSuite {
/**
* Imputation strategy. Available options are ["mean", "median"].
* @param df DataFrame with columns "id", "value", "expected_mean", "expected_median"
* Imputation strategy. Available options are ["mean", "median", "mode"].
* @param df DataFrame with columns "id", "value", "expected_mean", "expected_median",
* "expected_mode".
*/
def iterateStrategyTest(isMultiCol: Boolean, imputer: Imputer, df: DataFrame): Unit = {
Seq("mean", "median").foreach { strategy =>
Seq("mean", "median", "mode").foreach { strategy =>
imputer.setStrategy(strategy)
val model = imputer.fit(df)
val resultDF = model.transform(df)

View file

@ -1507,7 +1507,8 @@ class _ImputerParams(HasInputCol, HasInputCols, HasOutputCol, HasOutputCols, Has
strategy = Param(Params._dummy(), "strategy",
"strategy for imputation. If mean, then replace missing values using the mean "
"value of the feature. If median, then replace missing values using the "
"median value of the feature.",
"median value of the feature. If mode, then replace missing using the most "
"frequent value of the feature.",
typeConverter=TypeConverters.toString)
missingValue = Param(Params._dummy(), "missingValue",
@ -1541,7 +1542,7 @@ class Imputer(JavaEstimator, _ImputerParams, JavaMLReadable, JavaMLWritable):
numeric type. Currently Imputer does not support categorical features and
possibly creates incorrect values for a categorical feature.
Note that the mean/median value is computed after filtering out missing values.
Note that the mean/median/mode value is computed after filtering out missing values.
All Null values in the input columns are treated as missing, and so are also imputed. For
computing median, :py:meth:`pyspark.sql.DataFrame.approxQuantile` is used with a
relative error of `0.001`.