[SPARK-29144][ML] Binarizer handle sparse vectors incorrectly with negative threshold

### What changes were proposed in this pull request?
if threshold<0, convert implict 0 to 1, althought this will break sparsity

### Why are the changes needed?
if `threshold<0`, current impl deal with sparse vector incorrectly.
See JIRA [SPARK-29144](https://issues.apache.org/jira/browse/SPARK-29144) and [Scikit-Learn's Binarizer](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.Binarizer.html) ('Threshold may not be less than 0 for operations on sparse matrices.') for details.

### Does this PR introduce any user-facing change?
no

### How was this patch tested?
added testsuite

Closes #25829 from zhengruifeng/binarizer_throw_exception_sparse_vector.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
zhengruifeng 2019-09-20 19:22:46 -05:00 committed by Sean Owen
parent 4a89fa1cd1
commit c764dd6dd7
2 changed files with 44 additions and 20 deletions

View file

@ -75,30 +75,40 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
val schema = dataset.schema
val inputType = schema($(inputCol)).dataType
val td = $(threshold)
val binarizerDouble = udf { in: Double => if (in > td) 1.0 else 0.0 }
val binarizerVector = udf { (data: Vector) =>
val indices = ArrayBuilder.make[Int]
val values = ArrayBuilder.make[Double]
data.foreachActive { (index, value) =>
if (value > td) {
indices += index
values += 1.0
}
}
Vectors.sparse(data.size, indices.result(), values.result()).compressed
}
val metadata = outputSchema($(outputCol)).metadata
inputType match {
val binarizerUDF = inputType match {
case DoubleType =>
dataset.select(col("*"), binarizerDouble(col($(inputCol))).as($(outputCol), metadata))
case _: VectorUDT =>
dataset.select(col("*"), binarizerVector(col($(inputCol))).as($(outputCol), metadata))
udf { in: Double => if (in > td) 1.0 else 0.0 }
case _: VectorUDT if td >= 0 =>
udf { vector: Vector =>
val indices = ArrayBuilder.make[Int]
val values = ArrayBuilder.make[Double]
vector.foreachActive { (index, value) =>
if (value > td) {
indices += index
values += 1.0
}
}
Vectors.sparse(vector.size, indices.result(), values.result()).compressed
}
case _: VectorUDT if td < 0 =>
this.logWarning(s"Binarization operations on sparse dataset with negative threshold " +
s"$td will build a dense output, so take care when applying to sparse input.")
udf { vector: Vector =>
val values = Array.fill(vector.size)(1.0)
vector.foreachActive { (index, value) =>
if (value <= td) {
values(index) = 0.0
}
}
Vectors.dense(values).compressed
}
}
dataset.withColumn($(outputCol), binarizerUDF(col($(inputCol))), metadata)
}
@Since("1.4.0")

View file

@ -101,6 +101,20 @@ class BinarizerSuite extends MLTest with DefaultReadWriteTest {
}
}
test("Binarizer should support sparse vector with negative threshold") {
val data = Seq(
(Vectors.sparse(3, Array(1), Array(0.5)), Vectors.dense(Array(1.0, 1.0, 1.0))),
(Vectors.dense(Array(0.0, 0.5, 0.0)), Vectors.dense(Array(1.0, 1.0, 1.0))))
val df = data.toDF("feature", "expected")
val binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(-0.5)
binarizer.transform(df).select("binarized_feature", "expected").collect().foreach {
case Row(x: Vector, y: Vector) =>
assert(x == y, "The feature value is not correct after binarization.")
}
}
test("read/write") {
val t = new Binarizer()