[SPARK-30776][ML][FOLLOWUP] FValue clean up
### What changes were proposed in this pull request? remove unused variables; ### Why are the changes needed? remove unused variables; ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing testsuites Closes #27922 from zhengruifeng/test_cleanup. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
This commit is contained in:
parent
01f20394ac
commit
93088f79cc
|
@ -224,7 +224,8 @@ final class ChiSqSelector @Since("1.6.0") (@Since("1.6.0") override val uid: Str
|
|||
val selected = tempRes
|
||||
.zipWithIndex
|
||||
.filter { case ((res, _), index) =>
|
||||
res.pValue <= getFdr * (index + 1) / testResult.length }
|
||||
res.pValue <= getFdr * (index + 1) / testResult.length
|
||||
}
|
||||
if (selected.isEmpty) {
|
||||
Array.empty[(SelectionTestResult, Int)]
|
||||
} else {
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.spark.ml.stat.{FValueTest, SelectionTestResult}
|
|||
import org.apache.spark.ml.util._
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
|
||||
import org.apache.spark.sql.types.{StructField, StructType}
|
||||
|
||||
|
||||
/**
|
||||
|
@ -200,10 +200,6 @@ final class FValueSelector @Since("3.1.0") (override val uid: String)
|
|||
@Since("3.1.0")
|
||||
override def fit(dataset: Dataset[_]): FValueSelectorModel = {
|
||||
transformSchema(dataset.schema, logging = true)
|
||||
dataset.select(col($(labelCol)).cast(DoubleType), col($(featuresCol))).rdd.map {
|
||||
case Row(label: Double, features: Vector) =>
|
||||
LabeledPoint(label, features)
|
||||
}
|
||||
|
||||
val testResult = FValueTest.testRegression(dataset, getFeaturesCol, getLabelCol)
|
||||
.zipWithIndex
|
||||
|
@ -227,7 +223,8 @@ final class FValueSelector @Since("3.1.0") (override val uid: String)
|
|||
val selected = tempRes
|
||||
.zipWithIndex
|
||||
.filter { case ((res, _), index) =>
|
||||
res.pValue <= getFdr * (index + 1) / testResult.length }
|
||||
res.pValue <= getFdr * (index + 1) / testResult.length
|
||||
}
|
||||
if (selected.isEmpty) {
|
||||
Array.empty[(SelectionTestResult, Int)]
|
||||
} else {
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.spark.ml.util.SchemaUtils
|
|||
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
|
||||
import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
|
||||
import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, Row}
|
||||
import org.apache.spark.sql.functions.col
|
||||
import org.apache.spark.sql.types.DoubleType
|
||||
|
@ -72,9 +71,9 @@ object ChiSquareTest {
|
|||
val rdd = dataset.select(col(labelCol).cast("double"), col(featuresCol)).as[(Double, Vector)]
|
||||
.rdd.map { case (label, features) => OldLabeledPoint(label, OldVectors.fromML(features)) }
|
||||
val testResults = OldStatistics.chiSqTest(rdd)
|
||||
val pValues: Vector = Vectors.dense(testResults.map(_.pValue))
|
||||
val degreesOfFreedom: Array[Int] = testResults.map(_.degreesOfFreedom)
|
||||
val statistics: Vector = Vectors.dense(testResults.map(_.statistic))
|
||||
val pValues = Vectors.dense(testResults.map(_.pValue))
|
||||
val degreesOfFreedom = testResults.map(_.degreesOfFreedom)
|
||||
val statistics = Vectors.dense(testResults.map(_.statistic))
|
||||
spark.createDataFrame(Seq(ChiSquareResult(pValues, degreesOfFreedom, statistics)))
|
||||
}
|
||||
|
||||
|
@ -91,16 +90,12 @@ object ChiSquareTest {
|
|||
featuresCol: String,
|
||||
labelCol: String): Array[SelectionTestResult] = {
|
||||
|
||||
val spark = dataset.sparkSession
|
||||
|
||||
SchemaUtils.checkColumnType(dataset.schema, featuresCol, new VectorUDT)
|
||||
SchemaUtils.checkNumericType(dataset.schema, labelCol)
|
||||
val input: RDD[OldLabeledPoint] =
|
||||
dataset.select(col(labelCol).cast(DoubleType), col(featuresCol)).rdd
|
||||
.map {
|
||||
case Row(label: Double, features: Vector) =>
|
||||
OldLabeledPoint(label, OldVectors.fromML(features))
|
||||
}
|
||||
val input = dataset.select(col(labelCol).cast(DoubleType), col(featuresCol)).rdd
|
||||
.map { case Row(label: Double, features: Vector) =>
|
||||
OldLabeledPoint(label, OldVectors.fromML(features))
|
||||
}
|
||||
val chiTestResult = OldStatistics.chiSqTest(input)
|
||||
chiTestResult.map(r => new ChiSqTestResult(r.pValue, r.degreesOfFreedom, r.statistic))
|
||||
}
|
||||
|
|
|
@ -52,11 +52,10 @@ object FValueTest {
|
|||
def test(dataset: DataFrame, featuresCol: String, labelCol: String): DataFrame = {
|
||||
val spark = dataset.sparkSession
|
||||
val testResults = testRegression(dataset, featuresCol, labelCol)
|
||||
val pValues: Vector = Vectors.dense(testResults.map(_.pValue))
|
||||
val degreesOfFreedom: Array[Long] = testResults.map(_.degreesOfFreedom)
|
||||
val fValues: Vector = Vectors.dense(testResults.map(_.statistic))
|
||||
spark.createDataFrame(
|
||||
Seq(new FValueResult(pValues, degreesOfFreedom, fValues)))
|
||||
val pValues = Vectors.dense(testResults.map(_.pValue))
|
||||
val degreesOfFreedom = testResults.map(_.degreesOfFreedom)
|
||||
val fValues = Vectors.dense(testResults.map(_.statistic))
|
||||
spark.createDataFrame(Seq(FValueResult(pValues, degreesOfFreedom, fValues)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -113,17 +112,15 @@ object FValueTest {
|
|||
}
|
||||
array1
|
||||
}
|
||||
var fTestResultArray = new Array[SelectionTestResult](numFeatures)
|
||||
|
||||
val fd = new FDistribution(1, degreesOfFreedom)
|
||||
for (i <- 0 until numFeatures) {
|
||||
Array.tabulate(numFeatures) { i =>
|
||||
// Cov(X,Y) = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y))) / (N-1)
|
||||
val covariance = sumForCov (i) / (numSamples - 1)
|
||||
val corr = covariance / (yStd * xStd(i))
|
||||
val fValue = corr * corr / (1 - corr * corr) * degreesOfFreedom
|
||||
val pValue = 1.0 - fd.cumulativeProbability(fValue)
|
||||
fTestResultArray(i) = new FValueTestResult(pValue, degreesOfFreedom, fValue)
|
||||
new FValueTestResult(pValue, degreesOfFreedom, fValue)
|
||||
}
|
||||
fTestResultArray
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,21 +127,18 @@ class FValueSelectorSuite extends MLTest with DefaultReadWriteTest {
|
|||
test("Test FValue selector: numTopFeatures") {
|
||||
val selector = new FValueSelector()
|
||||
.setOutputCol("filtered").setSelectorType("numTopFeatures").setNumTopFeatures(1)
|
||||
val model = selector.fit(dataset)
|
||||
testSelector(selector, dataset)
|
||||
}
|
||||
|
||||
test("Test F Value selector: percentile") {
|
||||
val selector = new FValueSelector()
|
||||
.setOutputCol("filtered").setSelectorType("percentile").setPercentile(0.17)
|
||||
val model = selector.fit(dataset)
|
||||
testSelector(selector, dataset)
|
||||
}
|
||||
|
||||
test("Test F Value selector: fpr") {
|
||||
val selector = new FValueSelector()
|
||||
.setOutputCol("filtered").setSelectorType("fpr").setFpr(0.01)
|
||||
val model = selector.fit(dataset)
|
||||
testSelector(selector, dataset)
|
||||
}
|
||||
|
||||
|
@ -169,7 +166,6 @@ class FValueSelectorSuite extends MLTest with DefaultReadWriteTest {
|
|||
|
||||
val selector = new FValueSelector()
|
||||
.setOutputCol("filtered").setSelectorType("numTopFeatures").setNumTopFeatures(1)
|
||||
val model = selector.fit(df)
|
||||
testSelector(selector, df)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue