[SPARK-16356][ML] Add testImplicits for ML unit tests and promote toDF()
## What changes were proposed in this pull request?
This was suggested in 101663f1ae (commitcomment-17114968)
.
This PR adds `testImplicits` to `MLlibTestSparkContext` so that some implicits such as `toDF()` can be sued across ml tests.
This PR also changes all the usages of `spark.createDataFrame( ... )` to `toDF()` where applicable in ml tests in Scala.
## How was this patch tested?
Existing tests should work.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #14035 from HyukjinKwon/minor-ml-test.
This commit is contained in:
parent
50b89d05b7
commit
f234b7cd79
|
@ -36,6 +36,8 @@ import org.apache.spark.sql.types.StructType
|
|||
|
||||
class PipelineSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
abstract class MyModel extends Model[MyModel]
|
||||
|
||||
test("pipeline") {
|
||||
|
@ -183,12 +185,11 @@ class PipelineSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
}
|
||||
|
||||
test("pipeline validateParams") {
|
||||
val df = spark.createDataFrame(
|
||||
Seq(
|
||||
(1, Vectors.dense(0.0, 1.0, 4.0), 1.0),
|
||||
(2, Vectors.dense(1.0, 0.0, 4.0), 2.0),
|
||||
(3, Vectors.dense(1.0, 0.0, 5.0), 3.0),
|
||||
(4, Vectors.dense(0.0, 0.0, 5.0), 4.0))
|
||||
val df = Seq(
|
||||
(1, Vectors.dense(0.0, 1.0, 4.0), 1.0),
|
||||
(2, Vectors.dense(1.0, 0.0, 4.0), 2.0),
|
||||
(3, Vectors.dense(1.0, 0.0, 5.0), 3.0),
|
||||
(4, Vectors.dense(0.0, 0.0, 5.0), 4.0)
|
||||
).toDF("id", "features", "label")
|
||||
|
||||
intercept[IllegalArgumentException] {
|
||||
|
|
|
@ -29,12 +29,13 @@ import org.apache.spark.sql.{DataFrame, Dataset}
|
|||
|
||||
class ClassifierSuite extends SparkFunSuite with MLlibTestSparkContext {
|
||||
|
||||
test("extractLabeledPoints") {
|
||||
def getTestData(labels: Seq[Double]): DataFrame = {
|
||||
val data = labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }
|
||||
spark.createDataFrame(data)
|
||||
}
|
||||
import testImplicits._
|
||||
|
||||
private def getTestData(labels: Seq[Double]): DataFrame = {
|
||||
labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }.toDF()
|
||||
}
|
||||
|
||||
test("extractLabeledPoints") {
|
||||
val c = new MockClassifier
|
||||
// Valid dataset
|
||||
val df0 = getTestData(Seq(0.0, 2.0, 1.0, 5.0))
|
||||
|
@ -70,11 +71,6 @@ class ClassifierSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
}
|
||||
|
||||
test("getNumClasses") {
|
||||
def getTestData(labels: Seq[Double]): DataFrame = {
|
||||
val data = labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }
|
||||
spark.createDataFrame(data)
|
||||
}
|
||||
|
||||
val c = new MockClassifier
|
||||
// Valid dataset
|
||||
val df0 = getTestData(Seq(0.0, 2.0, 1.0, 5.0))
|
||||
|
|
|
@ -34,6 +34,7 @@ class DecisionTreeClassifierSuite
|
|||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import DecisionTreeClassifierSuite.compareAPIs
|
||||
import testImplicits._
|
||||
|
||||
private var categoricalDataPointsRDD: RDD[LabeledPoint] = _
|
||||
private var orderedLabeledPointsWithLabel0RDD: RDD[LabeledPoint] = _
|
||||
|
@ -345,7 +346,7 @@ class DecisionTreeClassifierSuite
|
|||
}
|
||||
|
||||
test("Fitting without numClasses in metadata") {
|
||||
val df: DataFrame = spark.createDataFrame(TreeTests.featureImportanceData(sc))
|
||||
val df: DataFrame = TreeTests.featureImportanceData(sc).toDF()
|
||||
val dt = new DecisionTreeClassifier().setMaxDepth(1)
|
||||
dt.fit(df)
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.spark.util.Utils
|
|||
class GBTClassifierSuite extends SparkFunSuite with MLlibTestSparkContext
|
||||
with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
import GBTClassifierSuite.compareAPIs
|
||||
|
||||
// Combinations for estimators, learning rates and subsamplingRate
|
||||
|
@ -134,15 +135,14 @@ class GBTClassifierSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
*/
|
||||
|
||||
test("Fitting without numClasses in metadata") {
|
||||
val df: DataFrame = spark.createDataFrame(TreeTests.featureImportanceData(sc))
|
||||
val df: DataFrame = TreeTests.featureImportanceData(sc).toDF()
|
||||
val gbt = new GBTClassifier().setMaxDepth(1).setMaxIter(1)
|
||||
gbt.fit(df)
|
||||
}
|
||||
|
||||
test("extractLabeledPoints with bad data") {
|
||||
def getTestData(labels: Seq[Double]): DataFrame = {
|
||||
val data = labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }
|
||||
spark.createDataFrame(data)
|
||||
labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }.toDF()
|
||||
}
|
||||
|
||||
val gbt = new GBTClassifier().setMaxDepth(1).setMaxIter(1)
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.apache.spark.sql.functions.lit
|
|||
class LogisticRegressionSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var smallBinaryDataset: Dataset[_] = _
|
||||
@transient var smallMultinomialDataset: Dataset[_] = _
|
||||
@transient var binaryDataset: Dataset[_] = _
|
||||
|
@ -46,8 +48,7 @@ class LogisticRegressionSuite
|
|||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
|
||||
smallBinaryDataset =
|
||||
spark.createDataFrame(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42))
|
||||
smallBinaryDataset = generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42).toDF()
|
||||
|
||||
smallMultinomialDataset = {
|
||||
val nPoints = 100
|
||||
|
@ -61,7 +62,7 @@ class LogisticRegressionSuite
|
|||
val testData = generateMultinomialLogisticInput(
|
||||
coefficients, xMean, xVariance, addIntercept = true, nPoints, 42)
|
||||
|
||||
val df = spark.createDataFrame(sc.parallelize(testData, 4))
|
||||
val df = sc.parallelize(testData, 4).toDF()
|
||||
df.cache()
|
||||
df
|
||||
}
|
||||
|
@ -76,7 +77,7 @@ class LogisticRegressionSuite
|
|||
generateMultinomialLogisticInput(coefficients, xMean, xVariance,
|
||||
addIntercept = true, nPoints, 42)
|
||||
|
||||
spark.createDataFrame(sc.parallelize(testData, 4))
|
||||
sc.parallelize(testData, 4).toDF()
|
||||
}
|
||||
|
||||
multinomialDataset = {
|
||||
|
@ -91,7 +92,7 @@ class LogisticRegressionSuite
|
|||
val testData = generateMultinomialLogisticInput(
|
||||
coefficients, xMean, xVariance, addIntercept = true, nPoints, 42)
|
||||
|
||||
val df = spark.createDataFrame(sc.parallelize(testData, 4))
|
||||
val df = sc.parallelize(testData, 4).toDF()
|
||||
df.cache()
|
||||
df
|
||||
}
|
||||
|
@ -430,10 +431,10 @@ class LogisticRegressionSuite
|
|||
val model = new LogisticRegressionModel("mLogReg",
|
||||
Matrices.dense(3, 2, Array(0.0, 0.0, 0.0, 1.0, 2.0, 3.0)),
|
||||
Vectors.dense(0.0, 0.0, 0.0), 3, true)
|
||||
val overFlowData = spark.createDataFrame(Seq(
|
||||
val overFlowData = Seq(
|
||||
LabeledPoint(1.0, Vectors.dense(0.0, 1000.0)),
|
||||
LabeledPoint(1.0, Vectors.dense(0.0, -1.0))
|
||||
))
|
||||
).toDF()
|
||||
val results = model.transform(overFlowData).select("rawPrediction", "probability").collect()
|
||||
|
||||
// probabilities are correct when margins have to be adjusted
|
||||
|
@ -1795,9 +1796,9 @@ class LogisticRegressionSuite
|
|||
val numPoints = 40
|
||||
val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark,
|
||||
numClasses, numPoints)
|
||||
val testData = spark.createDataFrame(Array.tabulate[LabeledPoint](numClasses) { i =>
|
||||
val testData = Array.tabulate[LabeledPoint](numClasses) { i =>
|
||||
LabeledPoint(i.toDouble, Vectors.dense(i.toDouble))
|
||||
})
|
||||
}.toSeq.toDF()
|
||||
val lr = new LogisticRegression().setFamily("binomial").setWeightCol("weight")
|
||||
val model = lr.fit(outlierData)
|
||||
val results = model.transform(testData).select("label", "prediction").collect()
|
||||
|
@ -1819,9 +1820,9 @@ class LogisticRegressionSuite
|
|||
val numPoints = 40
|
||||
val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark,
|
||||
numClasses, numPoints)
|
||||
val testData = spark.createDataFrame(Array.tabulate[LabeledPoint](numClasses) { i =>
|
||||
val testData = Array.tabulate[LabeledPoint](numClasses) { i =>
|
||||
LabeledPoint(i.toDouble, Vectors.dense(i.toDouble))
|
||||
})
|
||||
}.toSeq.toDF()
|
||||
val mlr = new LogisticRegression().setFamily("multinomial").setWeightCol("weight")
|
||||
val model = mlr.fit(outlierData)
|
||||
val results = model.transform(testData).select("label", "prediction").collect()
|
||||
|
@ -1945,11 +1946,10 @@ class LogisticRegressionSuite
|
|||
}
|
||||
|
||||
test("multiclass logistic regression with all labels the same") {
|
||||
val constantData = spark.createDataFrame(Seq(
|
||||
val constantData = Seq(
|
||||
LabeledPoint(4.0, Vectors.dense(0.0)),
|
||||
LabeledPoint(4.0, Vectors.dense(1.0)),
|
||||
LabeledPoint(4.0, Vectors.dense(2.0)))
|
||||
)
|
||||
LabeledPoint(4.0, Vectors.dense(2.0))).toDF()
|
||||
val mlr = new LogisticRegression().setFamily("multinomial")
|
||||
val model = mlr.fit(constantData)
|
||||
val results = model.transform(constantData)
|
||||
|
@ -1961,11 +1961,10 @@ class LogisticRegressionSuite
|
|||
}
|
||||
|
||||
// force the model to be trained with only one class
|
||||
val constantZeroData = spark.createDataFrame(Seq(
|
||||
val constantZeroData = Seq(
|
||||
LabeledPoint(0.0, Vectors.dense(0.0)),
|
||||
LabeledPoint(0.0, Vectors.dense(1.0)),
|
||||
LabeledPoint(0.0, Vectors.dense(2.0)))
|
||||
)
|
||||
LabeledPoint(0.0, Vectors.dense(2.0))).toDF()
|
||||
val modelZeroLabel = mlr.setFitIntercept(false).fit(constantZeroData)
|
||||
val resultsZero = modelZeroLabel.transform(constantZeroData)
|
||||
resultsZero.select("rawPrediction", "probability", "prediction").collect().foreach {
|
||||
|
@ -1990,20 +1989,18 @@ class LogisticRegressionSuite
|
|||
}
|
||||
|
||||
test("compressed storage") {
|
||||
val moreClassesThanFeatures = spark.createDataFrame(Seq(
|
||||
val moreClassesThanFeatures = Seq(
|
||||
LabeledPoint(4.0, Vectors.dense(0.0, 0.0, 0.0)),
|
||||
LabeledPoint(4.0, Vectors.dense(1.0, 1.0, 1.0)),
|
||||
LabeledPoint(4.0, Vectors.dense(2.0, 2.0, 2.0)))
|
||||
)
|
||||
LabeledPoint(4.0, Vectors.dense(2.0, 2.0, 2.0))).toDF()
|
||||
val mlr = new LogisticRegression().setFamily("multinomial")
|
||||
val model = mlr.fit(moreClassesThanFeatures)
|
||||
assert(model.coefficientMatrix.isInstanceOf[SparseMatrix])
|
||||
assert(model.coefficientMatrix.asInstanceOf[SparseMatrix].colPtrs.length === 4)
|
||||
val moreFeaturesThanClasses = spark.createDataFrame(Seq(
|
||||
val moreFeaturesThanClasses = Seq(
|
||||
LabeledPoint(1.0, Vectors.dense(0.0, 0.0, 0.0)),
|
||||
LabeledPoint(1.0, Vectors.dense(1.0, 1.0, 1.0)),
|
||||
LabeledPoint(1.0, Vectors.dense(2.0, 2.0, 2.0)))
|
||||
)
|
||||
LabeledPoint(1.0, Vectors.dense(2.0, 2.0, 2.0))).toDF()
|
||||
val model2 = mlr.fit(moreFeaturesThanClasses)
|
||||
assert(model2.coefficientMatrix.isInstanceOf[SparseMatrix])
|
||||
assert(model2.coefficientMatrix.asInstanceOf[SparseMatrix].colPtrs.length === 3)
|
||||
|
|
|
@ -33,16 +33,18 @@ import org.apache.spark.sql.{Dataset, Row}
|
|||
class MultilayerPerceptronClassifierSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var dataset: Dataset[_] = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
|
||||
dataset = spark.createDataFrame(Seq(
|
||||
(Vectors.dense(0.0, 0.0), 0.0),
|
||||
(Vectors.dense(0.0, 1.0), 1.0),
|
||||
(Vectors.dense(1.0, 0.0), 1.0),
|
||||
(Vectors.dense(1.0, 1.0), 0.0))
|
||||
dataset = Seq(
|
||||
(Vectors.dense(0.0, 0.0), 0.0),
|
||||
(Vectors.dense(0.0, 1.0), 1.0),
|
||||
(Vectors.dense(1.0, 0.0), 1.0),
|
||||
(Vectors.dense(1.0, 1.0), 0.0)
|
||||
).toDF("features", "label")
|
||||
}
|
||||
|
||||
|
@ -80,11 +82,11 @@ class MultilayerPerceptronClassifierSuite
|
|||
}
|
||||
|
||||
test("Test setWeights by training restart") {
|
||||
val dataFrame = spark.createDataFrame(Seq(
|
||||
val dataFrame = Seq(
|
||||
(Vectors.dense(0.0, 0.0), 0.0),
|
||||
(Vectors.dense(0.0, 1.0), 1.0),
|
||||
(Vectors.dense(1.0, 0.0), 1.0),
|
||||
(Vectors.dense(1.0, 1.0), 0.0))
|
||||
(Vectors.dense(1.0, 1.0), 0.0)
|
||||
).toDF("features", "label")
|
||||
val layers = Array[Int](2, 5, 2)
|
||||
val trainer = new MultilayerPerceptronClassifier()
|
||||
|
@ -114,9 +116,9 @@ class MultilayerPerceptronClassifierSuite
|
|||
val xMean = Array(5.843, 3.057, 3.758, 1.199)
|
||||
val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
|
||||
// the input seed is somewhat magic, to make this test pass
|
||||
val rdd = sc.parallelize(generateMultinomialLogisticInput(
|
||||
coefficients, xMean, xVariance, true, nPoints, 1), 2)
|
||||
val dataFrame = spark.createDataFrame(rdd).toDF("label", "features")
|
||||
val data = generateMultinomialLogisticInput(
|
||||
coefficients, xMean, xVariance, true, nPoints, 1).toDS()
|
||||
val dataFrame = data.toDF("label", "features")
|
||||
val numClasses = 3
|
||||
val numIterations = 100
|
||||
val layers = Array[Int](4, 5, 4, numClasses)
|
||||
|
@ -137,9 +139,9 @@ class MultilayerPerceptronClassifierSuite
|
|||
.setNumClasses(numClasses)
|
||||
lr.optimizer.setRegParam(0.0)
|
||||
.setNumIterations(numIterations)
|
||||
val lrModel = lr.run(rdd.map(OldLabeledPoint.fromML))
|
||||
val lrModel = lr.run(data.rdd.map(OldLabeledPoint.fromML))
|
||||
val lrPredictionAndLabels =
|
||||
lrModel.predict(rdd.map(p => OldVectors.fromML(p.features))).zip(rdd.map(_.label))
|
||||
lrModel.predict(data.rdd.map(p => OldVectors.fromML(p.features))).zip(data.rdd.map(_.label))
|
||||
// MLP's predictions should not differ a lot from LR's.
|
||||
val lrMetrics = new MulticlassMetrics(lrPredictionAndLabels)
|
||||
val mlpMetrics = new MulticlassMetrics(mlpPredictionAndLabels)
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row}
|
|||
|
||||
class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var dataset: Dataset[_] = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
|
@ -47,7 +49,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
|
|||
Array(0.10, 0.10, 0.70, 0.10) // label 2
|
||||
).map(_.map(math.log))
|
||||
|
||||
dataset = spark.createDataFrame(generateNaiveBayesInput(pi, theta, 100, 42))
|
||||
dataset = generateNaiveBayesInput(pi, theta, 100, 42).toDF()
|
||||
}
|
||||
|
||||
def validatePrediction(predictionAndLabels: DataFrame): Unit = {
|
||||
|
@ -131,16 +133,16 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
|
|||
val pi = Vectors.dense(piArray)
|
||||
val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)
|
||||
|
||||
val testDataset = spark.createDataFrame(generateNaiveBayesInput(
|
||||
piArray, thetaArray, nPoints, 42, "multinomial"))
|
||||
val testDataset =
|
||||
generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF()
|
||||
val nb = new NaiveBayes().setSmoothing(1.0).setModelType("multinomial")
|
||||
val model = nb.fit(testDataset)
|
||||
|
||||
validateModelFit(pi, theta, model)
|
||||
assert(model.hasParent)
|
||||
|
||||
val validationDataset = spark.createDataFrame(generateNaiveBayesInput(
|
||||
piArray, thetaArray, nPoints, 17, "multinomial"))
|
||||
val validationDataset =
|
||||
generateNaiveBayesInput(piArray, thetaArray, nPoints, 17, "multinomial").toDF()
|
||||
|
||||
val predictionAndLabels = model.transform(validationDataset).select("prediction", "label")
|
||||
validatePrediction(predictionAndLabels)
|
||||
|
@ -161,16 +163,16 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
|
|||
val pi = Vectors.dense(piArray)
|
||||
val theta = new DenseMatrix(3, 12, thetaArray.flatten, true)
|
||||
|
||||
val testDataset = spark.createDataFrame(generateNaiveBayesInput(
|
||||
piArray, thetaArray, nPoints, 45, "bernoulli"))
|
||||
val testDataset =
|
||||
generateNaiveBayesInput(piArray, thetaArray, nPoints, 45, "bernoulli").toDF()
|
||||
val nb = new NaiveBayes().setSmoothing(1.0).setModelType("bernoulli")
|
||||
val model = nb.fit(testDataset)
|
||||
|
||||
validateModelFit(pi, theta, model)
|
||||
assert(model.hasParent)
|
||||
|
||||
val validationDataset = spark.createDataFrame(generateNaiveBayesInput(
|
||||
piArray, thetaArray, nPoints, 20, "bernoulli"))
|
||||
val validationDataset =
|
||||
generateNaiveBayesInput(piArray, thetaArray, nPoints, 20, "bernoulli").toDF()
|
||||
|
||||
val predictionAndLabels = model.transform(validationDataset).select("prediction", "label")
|
||||
validatePrediction(predictionAndLabels)
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.apache.spark.sql.types.Metadata
|
|||
|
||||
class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var dataset: Dataset[_] = _
|
||||
@transient var rdd: RDD[LabeledPoint] = _
|
||||
|
||||
|
@ -55,7 +57,7 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
|
|||
val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
|
||||
rdd = sc.parallelize(generateMultinomialLogisticInput(
|
||||
coefficients, xMean, xVariance, true, nPoints, 42), 2)
|
||||
dataset = spark.createDataFrame(rdd)
|
||||
dataset = rdd.toDF()
|
||||
}
|
||||
|
||||
test("params") {
|
||||
|
|
|
@ -39,6 +39,7 @@ class RandomForestClassifierSuite
|
|||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import RandomForestClassifierSuite.compareAPIs
|
||||
import testImplicits._
|
||||
|
||||
private var orderedLabeledPoints50_1000: RDD[LabeledPoint] = _
|
||||
private var orderedLabeledPoints5_20: RDD[LabeledPoint] = _
|
||||
|
@ -158,7 +159,7 @@ class RandomForestClassifierSuite
|
|||
}
|
||||
|
||||
test("Fitting without numClasses in metadata") {
|
||||
val df: DataFrame = spark.createDataFrame(TreeTests.featureImportanceData(sc))
|
||||
val df: DataFrame = TreeTests.featureImportanceData(sc).toDF()
|
||||
val rf = new RandomForestClassifier().setMaxDepth(1).setNumTrees(1)
|
||||
rf.fit(df)
|
||||
}
|
||||
|
|
|
@ -62,6 +62,8 @@ object LDASuite {
|
|||
|
||||
class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
val k: Int = 5
|
||||
val vocabSize: Int = 30
|
||||
@transient var dataset: Dataset[_] = _
|
||||
|
@ -140,8 +142,8 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
|
|||
new LDA().setTopicConcentration(-1.1)
|
||||
}
|
||||
|
||||
val dummyDF = spark.createDataFrame(Seq(
|
||||
(1, Vectors.dense(1.0, 2.0)))).toDF("id", "features")
|
||||
val dummyDF = Seq((1, Vectors.dense(1.0, 2.0))).toDF("id", "features")
|
||||
|
||||
// validate parameters
|
||||
lda.transformSchema(dummyDF.schema)
|
||||
lda.setDocConcentration(1.1)
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
|
|||
class BinaryClassificationEvaluatorSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new BinaryClassificationEvaluator)
|
||||
}
|
||||
|
@ -42,25 +44,25 @@ class BinaryClassificationEvaluatorSuite
|
|||
val evaluator = new BinaryClassificationEvaluator()
|
||||
.setMetricName("areaUnderPR")
|
||||
|
||||
val vectorDF = spark.createDataFrame(Seq(
|
||||
val vectorDF = Seq(
|
||||
(0d, Vectors.dense(12, 2.5)),
|
||||
(1d, Vectors.dense(1, 3)),
|
||||
(0d, Vectors.dense(10, 2))
|
||||
)).toDF("label", "rawPrediction")
|
||||
).toDF("label", "rawPrediction")
|
||||
assert(evaluator.evaluate(vectorDF) === 1.0)
|
||||
|
||||
val doubleDF = spark.createDataFrame(Seq(
|
||||
val doubleDF = Seq(
|
||||
(0d, 0d),
|
||||
(1d, 1d),
|
||||
(0d, 0d)
|
||||
)).toDF("label", "rawPrediction")
|
||||
).toDF("label", "rawPrediction")
|
||||
assert(evaluator.evaluate(doubleDF) === 1.0)
|
||||
|
||||
val stringDF = spark.createDataFrame(Seq(
|
||||
val stringDF = Seq(
|
||||
(0d, "0d"),
|
||||
(1d, "1d"),
|
||||
(0d, "0d")
|
||||
)).toDF("label", "rawPrediction")
|
||||
).toDF("label", "rawPrediction")
|
||||
val thrown = intercept[IllegalArgumentException] {
|
||||
evaluator.evaluate(stringDF)
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.spark.mllib.util.TestingUtils._
|
|||
class RegressionEvaluatorSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new RegressionEvaluator)
|
||||
}
|
||||
|
@ -42,9 +44,9 @@ class RegressionEvaluatorSuite
|
|||
* data.map(x=> x.label + ", " + x.features(0) + ", " + x.features(1))
|
||||
* .saveAsTextFile("path")
|
||||
*/
|
||||
val dataset = spark.createDataFrame(
|
||||
sc.parallelize(LinearDataGenerator.generateLinearInput(
|
||||
6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2).map(_.asML))
|
||||
val dataset = LinearDataGenerator.generateLinearInput(
|
||||
6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1)
|
||||
.map(_.asML).toDF()
|
||||
|
||||
/**
|
||||
* Using the following R code to load the data, train the model and evaluate metrics.
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.spark.sql.{DataFrame, Row}
|
|||
|
||||
class BinarizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var data: Array[Double] = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
|
@ -39,8 +41,7 @@ class BinarizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
|
|||
|
||||
test("Binarize continuous features with default parameter") {
|
||||
val defaultBinarized: Array[Double] = data.map(x => if (x > 0.0) 1.0 else 0.0)
|
||||
val dataFrame: DataFrame = spark.createDataFrame(
|
||||
data.zip(defaultBinarized)).toDF("feature", "expected")
|
||||
val dataFrame: DataFrame = data.zip(defaultBinarized).toSeq.toDF("feature", "expected")
|
||||
|
||||
val binarizer: Binarizer = new Binarizer()
|
||||
.setInputCol("feature")
|
||||
|
@ -55,8 +56,7 @@ class BinarizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
|
|||
test("Binarize continuous features with setter") {
|
||||
val threshold: Double = 0.2
|
||||
val thresholdBinarized: Array[Double] = data.map(x => if (x > threshold) 1.0 else 0.0)
|
||||
val dataFrame: DataFrame = spark.createDataFrame(
|
||||
data.zip(thresholdBinarized)).toDF("feature", "expected")
|
||||
val dataFrame: DataFrame = data.zip(thresholdBinarized).toSeq.toDF("feature", "expected")
|
||||
|
||||
val binarizer: Binarizer = new Binarizer()
|
||||
.setInputCol("feature")
|
||||
|
@ -71,9 +71,9 @@ class BinarizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
|
|||
|
||||
test("Binarize vector of continuous features with default parameter") {
|
||||
val defaultBinarized: Array[Double] = data.map(x => if (x > 0.0) 1.0 else 0.0)
|
||||
val dataFrame: DataFrame = spark.createDataFrame(Seq(
|
||||
val dataFrame: DataFrame = Seq(
|
||||
(Vectors.dense(data), Vectors.dense(defaultBinarized))
|
||||
)).toDF("feature", "expected")
|
||||
).toDF("feature", "expected")
|
||||
|
||||
val binarizer: Binarizer = new Binarizer()
|
||||
.setInputCol("feature")
|
||||
|
@ -88,9 +88,9 @@ class BinarizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
|
|||
test("Binarize vector of continuous features with setter") {
|
||||
val threshold: Double = 0.2
|
||||
val defaultBinarized: Array[Double] = data.map(x => if (x > threshold) 1.0 else 0.0)
|
||||
val dataFrame: DataFrame = spark.createDataFrame(Seq(
|
||||
val dataFrame: DataFrame = Seq(
|
||||
(Vectors.dense(data), Vectors.dense(defaultBinarized))
|
||||
)).toDF("feature", "expected")
|
||||
).toDF("feature", "expected")
|
||||
|
||||
val binarizer: Binarizer = new Binarizer()
|
||||
.setInputCol("feature")
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.spark.sql.{DataFrame, Row}
|
|||
|
||||
class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new Bucketizer)
|
||||
}
|
||||
|
@ -38,8 +40,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
|
|||
val splits = Array(-0.5, 0.0, 0.5)
|
||||
val validData = Array(-0.5, -0.3, 0.0, 0.2)
|
||||
val expectedBuckets = Array(0.0, 0.0, 1.0, 1.0)
|
||||
val dataFrame: DataFrame =
|
||||
spark.createDataFrame(validData.zip(expectedBuckets)).toDF("feature", "expected")
|
||||
val dataFrame: DataFrame = validData.zip(expectedBuckets).toSeq.toDF("feature", "expected")
|
||||
|
||||
val bucketizer: Bucketizer = new Bucketizer()
|
||||
.setInputCol("feature")
|
||||
|
@ -55,13 +56,13 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
|
|||
// Check for exceptions when using a set of invalid feature values.
|
||||
val invalidData1: Array[Double] = Array(-0.9) ++ validData
|
||||
val invalidData2 = Array(0.51) ++ validData
|
||||
val badDF1 = spark.createDataFrame(invalidData1.zipWithIndex).toDF("feature", "idx")
|
||||
val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx")
|
||||
withClue("Invalid feature value -0.9 was not caught as an invalid feature!") {
|
||||
intercept[SparkException] {
|
||||
bucketizer.transform(badDF1).collect()
|
||||
}
|
||||
}
|
||||
val badDF2 = spark.createDataFrame(invalidData2.zipWithIndex).toDF("feature", "idx")
|
||||
val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx")
|
||||
withClue("Invalid feature value 0.51 was not caught as an invalid feature!") {
|
||||
intercept[SparkException] {
|
||||
bucketizer.transform(badDF2).collect()
|
||||
|
@ -73,8 +74,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
|
|||
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
|
||||
val validData = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9)
|
||||
val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0)
|
||||
val dataFrame: DataFrame =
|
||||
spark.createDataFrame(validData.zip(expectedBuckets)).toDF("feature", "expected")
|
||||
val dataFrame: DataFrame = validData.zip(expectedBuckets).toSeq.toDF("feature", "expected")
|
||||
|
||||
val bucketizer: Bucketizer = new Bucketizer()
|
||||
.setInputCol("feature")
|
||||
|
@ -92,8 +92,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
|
|||
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
|
||||
val validData = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN)
|
||||
val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0)
|
||||
val dataFrame: DataFrame =
|
||||
spark.createDataFrame(validData.zip(expectedBuckets)).toDF("feature", "expected")
|
||||
val dataFrame: DataFrame = validData.zip(expectedBuckets).toSeq.toDF("feature", "expected")
|
||||
|
||||
val bucketizer: Bucketizer = new Bucketizer()
|
||||
.setInputCol("feature")
|
||||
|
|
|
@ -29,8 +29,7 @@ class ChiSqSelectorSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
with DefaultReadWriteTest {
|
||||
|
||||
test("Test Chi-Square selector") {
|
||||
val spark = this.spark
|
||||
import spark.implicits._
|
||||
import testImplicits._
|
||||
val data = Seq(
|
||||
LabeledPoint(0.0, Vectors.sparse(3, Array((0, 8.0), (1, 7.0)))),
|
||||
LabeledPoint(1.0, Vectors.sparse(3, Array((1, 9.0), (2, 6.0)))),
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.spark.sql.Row
|
|||
class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
||||
with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new CountVectorizer)
|
||||
ParamsSuite.checkParams(new CountVectorizerModel(Array("empty")))
|
||||
|
@ -35,7 +37,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
private def split(s: String): Seq[String] = s.split("\\s+")
|
||||
|
||||
test("CountVectorizerModel common cases") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
(0, split("a b c d"),
|
||||
Vectors.sparse(4, Seq((0, 1.0), (1, 1.0), (2, 1.0), (3, 1.0)))),
|
||||
(1, split("a b b c d a"),
|
||||
|
@ -44,7 +46,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
(3, split(""), Vectors.sparse(4, Seq())), // empty string
|
||||
(4, split("a notInDict d"),
|
||||
Vectors.sparse(4, Seq((0, 1.0), (3, 1.0)))) // with words not in vocabulary
|
||||
)).toDF("id", "words", "expected")
|
||||
).toDF("id", "words", "expected")
|
||||
val cv = new CountVectorizerModel(Array("a", "b", "c", "d"))
|
||||
.setInputCol("words")
|
||||
.setOutputCol("features")
|
||||
|
@ -55,13 +57,13 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("CountVectorizer common cases") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
(0, split("a b c d e"),
|
||||
Vectors.sparse(5, Seq((0, 1.0), (1, 1.0), (2, 1.0), (3, 1.0), (4, 1.0)))),
|
||||
(1, split("a a a a a a"), Vectors.sparse(5, Seq((0, 6.0)))),
|
||||
(2, split("c c"), Vectors.sparse(5, Seq((2, 2.0)))),
|
||||
(3, split("d"), Vectors.sparse(5, Seq((3, 1.0)))),
|
||||
(4, split("b b b b b"), Vectors.sparse(5, Seq((1, 5.0)))))
|
||||
(4, split("b b b b b"), Vectors.sparse(5, Seq((1, 5.0))))
|
||||
).toDF("id", "words", "expected")
|
||||
val cv = new CountVectorizer()
|
||||
.setInputCol("words")
|
||||
|
@ -76,11 +78,11 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("CountVectorizer vocabSize and minDF") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
(0, split("a b c d"), Vectors.sparse(2, Seq((0, 1.0), (1, 1.0)))),
|
||||
(1, split("a b c"), Vectors.sparse(2, Seq((0, 1.0), (1, 1.0)))),
|
||||
(2, split("a b"), Vectors.sparse(2, Seq((0, 1.0), (1, 1.0)))),
|
||||
(3, split("a"), Vectors.sparse(2, Seq((0, 1.0)))))
|
||||
(3, split("a"), Vectors.sparse(2, Seq((0, 1.0))))
|
||||
).toDF("id", "words", "expected")
|
||||
val cvModel = new CountVectorizer()
|
||||
.setInputCol("words")
|
||||
|
@ -118,9 +120,9 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
|
||||
test("CountVectorizer throws exception when vocab is empty") {
|
||||
intercept[IllegalArgumentException] {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
(0, split("a a b b c c")),
|
||||
(1, split("aa bb cc")))
|
||||
(1, split("aa bb cc"))
|
||||
).toDF("id", "words")
|
||||
val cvModel = new CountVectorizer()
|
||||
.setInputCol("words")
|
||||
|
@ -132,11 +134,11 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("CountVectorizerModel with minTF count") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
(0, split("a a a b b c c c d "), Vectors.sparse(4, Seq((0, 3.0), (2, 3.0)))),
|
||||
(1, split("c c c c c c"), Vectors.sparse(4, Seq((2, 6.0)))),
|
||||
(2, split("a"), Vectors.sparse(4, Seq())),
|
||||
(3, split("e e e e e"), Vectors.sparse(4, Seq())))
|
||||
(3, split("e e e e e"), Vectors.sparse(4, Seq()))
|
||||
).toDF("id", "words", "expected")
|
||||
|
||||
// minTF: count
|
||||
|
@ -151,11 +153,11 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("CountVectorizerModel with minTF freq") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
(0, split("a a a b b c c c d "), Vectors.sparse(4, Seq((0, 3.0), (2, 3.0)))),
|
||||
(1, split("c c c c c c"), Vectors.sparse(4, Seq((2, 6.0)))),
|
||||
(2, split("a"), Vectors.sparse(4, Seq((0, 1.0)))),
|
||||
(3, split("e e e e e"), Vectors.sparse(4, Seq())))
|
||||
(3, split("e e e e e"), Vectors.sparse(4, Seq()))
|
||||
).toDF("id", "words", "expected")
|
||||
|
||||
// minTF: set frequency
|
||||
|
@ -170,12 +172,12 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("CountVectorizerModel and CountVectorizer with binary") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
(0, split("a a a a b b b b c d"),
|
||||
Vectors.sparse(4, Seq((0, 1.0), (1, 1.0), (2, 1.0), (3, 1.0)))),
|
||||
(1, split("c c c"), Vectors.sparse(4, Seq((2, 1.0)))),
|
||||
(2, split("a"), Vectors.sparse(4, Seq((0, 1.0))))
|
||||
)).toDF("id", "words", "expected")
|
||||
).toDF("id", "words", "expected")
|
||||
|
||||
// CountVectorizer test
|
||||
val cv = new CountVectorizer()
|
||||
|
|
|
@ -32,6 +32,8 @@ case class DCTTestData(vec: Vector, wantedVec: Vector)
|
|||
|
||||
class DCTSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("forward transform of discrete cosine matches jTransforms result") {
|
||||
val data = Vectors.dense((0 until 128).map(_ => 2D * math.random - 1D).toArray)
|
||||
val inverse = false
|
||||
|
@ -57,15 +59,13 @@ class DCTSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
|
|||
private def testDCT(data: Vector, inverse: Boolean): Unit = {
|
||||
val expectedResultBuffer = data.toArray.clone()
|
||||
if (inverse) {
|
||||
(new DoubleDCT_1D(data.size)).inverse(expectedResultBuffer, true)
|
||||
new DoubleDCT_1D(data.size).inverse(expectedResultBuffer, true)
|
||||
} else {
|
||||
(new DoubleDCT_1D(data.size)).forward(expectedResultBuffer, true)
|
||||
new DoubleDCT_1D(data.size).forward(expectedResultBuffer, true)
|
||||
}
|
||||
val expectedResult = Vectors.dense(expectedResultBuffer)
|
||||
|
||||
val dataset = spark.createDataFrame(Seq(
|
||||
DCTTestData(data, expectedResult)
|
||||
))
|
||||
val dataset = Seq(DCTTestData(data, expectedResult)).toDF()
|
||||
|
||||
val transformer = new DCT()
|
||||
.setInputCol("vec")
|
||||
|
|
|
@ -29,14 +29,14 @@ import org.apache.spark.util.Utils
|
|||
|
||||
class HashingTFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new HashingTF)
|
||||
}
|
||||
|
||||
test("hashingTF") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
(0, "a a b b c d".split(" ").toSeq)
|
||||
)).toDF("id", "words")
|
||||
val df = Seq((0, "a a b b c d".split(" ").toSeq)).toDF("id", "words")
|
||||
val n = 100
|
||||
val hashingTF = new HashingTF()
|
||||
.setInputCol("words")
|
||||
|
@ -54,9 +54,7 @@ class HashingTFSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
|
|||
}
|
||||
|
||||
test("applying binary term freqs") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
(0, "a a b c c c".split(" ").toSeq)
|
||||
)).toDF("id", "words")
|
||||
val df = Seq((0, "a a b c c c".split(" ").toSeq)).toDF("id", "words")
|
||||
val n = 100
|
||||
val hashingTF = new HashingTF()
|
||||
.setInputCol("words")
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.spark.sql.Row
|
|||
|
||||
class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
def scaleDataWithIDF(dataSet: Array[Vector], model: Vector): Array[Vector] = {
|
||||
dataSet.map {
|
||||
case data: DenseVector =>
|
||||
|
@ -61,7 +63,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
|
|||
})
|
||||
val expected = scaleDataWithIDF(data, idf)
|
||||
|
||||
val df = spark.createDataFrame(data.zip(expected)).toDF("features", "expected")
|
||||
val df = data.zip(expected).toSeq.toDF("features", "expected")
|
||||
|
||||
val idfModel = new IDF()
|
||||
.setInputCol("features")
|
||||
|
@ -87,7 +89,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
|
|||
})
|
||||
val expected = scaleDataWithIDF(data, idf)
|
||||
|
||||
val df = spark.createDataFrame(data.zip(expected)).toDF("features", "expected")
|
||||
val df = data.zip(expected).toSeq.toDF("features", "expected")
|
||||
|
||||
val idfModel = new IDF()
|
||||
.setInputCol("features")
|
||||
|
|
|
@ -28,6 +28,9 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
|
|||
import org.apache.spark.sql.functions.col
|
||||
|
||||
class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new Interaction())
|
||||
}
|
||||
|
@ -59,11 +62,10 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
|
|||
}
|
||||
|
||||
test("numeric interaction") {
|
||||
val data = spark.createDataFrame(
|
||||
Seq(
|
||||
(2, Vectors.dense(3.0, 4.0)),
|
||||
(1, Vectors.dense(1.0, 5.0)))
|
||||
).toDF("a", "b")
|
||||
val data = Seq(
|
||||
(2, Vectors.dense(3.0, 4.0)),
|
||||
(1, Vectors.dense(1.0, 5.0))
|
||||
).toDF("a", "b")
|
||||
val groupAttr = new AttributeGroup(
|
||||
"b",
|
||||
Array[Attribute](
|
||||
|
@ -74,11 +76,10 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
|
|||
col("b").as("b", groupAttr.toMetadata()))
|
||||
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
|
||||
val res = trans.transform(df)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
(2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
|
||||
(1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0)))
|
||||
).toDF("a", "b", "features")
|
||||
val expected = Seq(
|
||||
(2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
|
||||
(1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
|
||||
).toDF("a", "b", "features")
|
||||
assert(res.collect() === expected.collect())
|
||||
val attrs = AttributeGroup.fromStructField(res.schema("features"))
|
||||
val expectedAttrs = new AttributeGroup(
|
||||
|
@ -90,11 +91,10 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
|
|||
}
|
||||
|
||||
test("nominal interaction") {
|
||||
val data = spark.createDataFrame(
|
||||
Seq(
|
||||
(2, Vectors.dense(3.0, 4.0)),
|
||||
(1, Vectors.dense(1.0, 5.0)))
|
||||
).toDF("a", "b")
|
||||
val data = Seq(
|
||||
(2, Vectors.dense(3.0, 4.0)),
|
||||
(1, Vectors.dense(1.0, 5.0))
|
||||
).toDF("a", "b")
|
||||
val groupAttr = new AttributeGroup(
|
||||
"b",
|
||||
Array[Attribute](
|
||||
|
@ -106,11 +106,10 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
|
|||
col("b").as("b", groupAttr.toMetadata()))
|
||||
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
|
||||
val res = trans.transform(df)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
(2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
|
||||
(1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0)))
|
||||
).toDF("a", "b", "features")
|
||||
val expected = Seq(
|
||||
(2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
|
||||
(1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
|
||||
).toDF("a", "b", "features")
|
||||
assert(res.collect() === expected.collect())
|
||||
val attrs = AttributeGroup.fromStructField(res.schema("features"))
|
||||
val expectedAttrs = new AttributeGroup(
|
||||
|
@ -126,10 +125,9 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
|
|||
}
|
||||
|
||||
test("default attr names") {
|
||||
val data = spark.createDataFrame(
|
||||
Seq(
|
||||
val data = Seq(
|
||||
(2, Vectors.dense(0.0, 4.0), 1.0),
|
||||
(1, Vectors.dense(1.0, 5.0), 10.0))
|
||||
(1, Vectors.dense(1.0, 5.0), 10.0)
|
||||
).toDF("a", "b", "c")
|
||||
val groupAttr = new AttributeGroup(
|
||||
"b",
|
||||
|
@ -142,11 +140,10 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
|
|||
col("c").as("c", NumericAttribute.defaultAttr.toMetadata()))
|
||||
val trans = new Interaction().setInputCols(Array("a", "b", "c")).setOutputCol("features")
|
||||
val res = trans.transform(df)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
(2, Vectors.dense(0.0, 4.0), 1.0, Vectors.dense(0, 0, 0, 0, 0, 0, 1, 0, 4)),
|
||||
(1, Vectors.dense(1.0, 5.0), 10.0, Vectors.dense(0, 0, 0, 0, 10, 50, 0, 0, 0)))
|
||||
).toDF("a", "b", "c", "features")
|
||||
val expected = Seq(
|
||||
(2, Vectors.dense(0.0, 4.0), 1.0, Vectors.dense(0, 0, 0, 0, 0, 0, 1, 0, 4)),
|
||||
(1, Vectors.dense(1.0, 5.0), 10.0, Vectors.dense(0, 0, 0, 0, 10, 50, 0, 0, 0))
|
||||
).toDF("a", "b", "c", "features")
|
||||
assert(res.collect() === expected.collect())
|
||||
val attrs = AttributeGroup.fromStructField(res.schema("features"))
|
||||
val expectedAttrs = new AttributeGroup(
|
||||
|
|
|
@ -23,6 +23,9 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
|
|||
import org.apache.spark.sql.Row
|
||||
|
||||
class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("MaxAbsScaler fit basic case") {
|
||||
val data = Array(
|
||||
Vectors.dense(1, 0, 100),
|
||||
|
@ -36,7 +39,7 @@ class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
|
|||
Vectors.sparse(3, Array(0, 2), Array(-1, -1)),
|
||||
Vectors.sparse(3, Array(0), Array(-0.75)))
|
||||
|
||||
val df = spark.createDataFrame(data.zip(expected)).toDF("features", "expected")
|
||||
val df = data.zip(expected).toSeq.toDF("features", "expected")
|
||||
val scaler = new MaxAbsScaler()
|
||||
.setInputCol("features")
|
||||
.setOutputCol("scaled")
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.spark.sql.Row
|
|||
|
||||
class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("MinMaxScaler fit basic case") {
|
||||
val data = Array(
|
||||
Vectors.dense(1, 0, Long.MinValue),
|
||||
|
@ -38,7 +40,7 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
|
|||
Vectors.sparse(3, Array(0, 2), Array(5, 5)),
|
||||
Vectors.sparse(3, Array(0), Array(-2.5)))
|
||||
|
||||
val df = spark.createDataFrame(data.zip(expected)).toDF("features", "expected")
|
||||
val df = data.zip(expected).toSeq.toDF("features", "expected")
|
||||
val scaler = new MinMaxScaler()
|
||||
.setInputCol("features")
|
||||
.setOutputCol("scaled")
|
||||
|
@ -57,14 +59,13 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
|
|||
|
||||
test("MinMaxScaler arguments max must be larger than min") {
|
||||
withClue("arguments max must be larger than min") {
|
||||
val dummyDF = spark.createDataFrame(Seq(
|
||||
(1, Vectors.dense(1.0, 2.0)))).toDF("id", "feature")
|
||||
val dummyDF = Seq((1, Vectors.dense(1.0, 2.0))).toDF("id", "features")
|
||||
intercept[IllegalArgumentException] {
|
||||
val scaler = new MinMaxScaler().setMin(10).setMax(0).setInputCol("feature")
|
||||
val scaler = new MinMaxScaler().setMin(10).setMax(0).setInputCol("features")
|
||||
scaler.transformSchema(dummyDF.schema)
|
||||
}
|
||||
intercept[IllegalArgumentException] {
|
||||
val scaler = new MinMaxScaler().setMin(0).setMax(0).setInputCol("feature")
|
||||
val scaler = new MinMaxScaler().setMin(0).setMax(0).setInputCol("features")
|
||||
scaler.transformSchema(dummyDF.schema)
|
||||
}
|
||||
}
|
||||
|
@ -104,7 +105,7 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
|
|||
Vectors.dense(-1.0, Double.NaN, -5.0, -5.0),
|
||||
Vectors.dense(5.0, 0.0, 5.0, Double.NaN))
|
||||
|
||||
val df = spark.createDataFrame(data.zip(expected)).toDF("features", "expected")
|
||||
val df = data.zip(expected).toSeq.toDF("features", "expected")
|
||||
val scaler = new MinMaxScaler()
|
||||
.setInputCol("features")
|
||||
.setOutputCol("scaled")
|
||||
|
|
|
@ -28,17 +28,18 @@ import org.apache.spark.sql.{Dataset, Row}
|
|||
case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String])
|
||||
|
||||
class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import org.apache.spark.ml.feature.NGramSuite._
|
||||
import testImplicits._
|
||||
|
||||
test("default behavior yields bigram features") {
|
||||
val nGram = new NGram()
|
||||
.setInputCol("inputTokens")
|
||||
.setOutputCol("nGrams")
|
||||
val dataset = spark.createDataFrame(Seq(
|
||||
NGramTestData(
|
||||
Array("Test", "for", "ngram", "."),
|
||||
Array("Test for", "for ngram", "ngram .")
|
||||
)))
|
||||
val dataset = Seq(NGramTestData(
|
||||
Array("Test", "for", "ngram", "."),
|
||||
Array("Test for", "for ngram", "ngram .")
|
||||
)).toDF()
|
||||
testNGram(nGram, dataset)
|
||||
}
|
||||
|
||||
|
@ -47,11 +48,10 @@ class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRe
|
|||
.setInputCol("inputTokens")
|
||||
.setOutputCol("nGrams")
|
||||
.setN(4)
|
||||
val dataset = spark.createDataFrame(Seq(
|
||||
NGramTestData(
|
||||
Array("a", "b", "c", "d", "e"),
|
||||
Array("a b c d", "b c d e")
|
||||
)))
|
||||
val dataset = Seq(NGramTestData(
|
||||
Array("a", "b", "c", "d", "e"),
|
||||
Array("a b c d", "b c d e")
|
||||
)).toDF()
|
||||
testNGram(nGram, dataset)
|
||||
}
|
||||
|
||||
|
@ -60,11 +60,7 @@ class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRe
|
|||
.setInputCol("inputTokens")
|
||||
.setOutputCol("nGrams")
|
||||
.setN(4)
|
||||
val dataset = spark.createDataFrame(Seq(
|
||||
NGramTestData(
|
||||
Array(),
|
||||
Array()
|
||||
)))
|
||||
val dataset = Seq(NGramTestData(Array(), Array())).toDF()
|
||||
testNGram(nGram, dataset)
|
||||
}
|
||||
|
||||
|
@ -73,11 +69,10 @@ class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRe
|
|||
.setInputCol("inputTokens")
|
||||
.setOutputCol("nGrams")
|
||||
.setN(6)
|
||||
val dataset = spark.createDataFrame(Seq(
|
||||
NGramTestData(
|
||||
Array("a", "b", "c", "d", "e"),
|
||||
Array()
|
||||
)))
|
||||
val dataset = Seq(NGramTestData(
|
||||
Array("a", "b", "c", "d", "e"),
|
||||
Array()
|
||||
)).toDF()
|
||||
testNGram(nGram, dataset)
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.spark.sql.{DataFrame, Row}
|
|||
|
||||
class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var data: Array[Vector] = _
|
||||
@transient var dataFrame: DataFrame = _
|
||||
@transient var normalizer: Normalizer = _
|
||||
|
@ -61,7 +63,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
|
|||
Vectors.sparse(3, Seq())
|
||||
)
|
||||
|
||||
dataFrame = spark.createDataFrame(sc.parallelize(data, 2).map(NormalizerSuite.FeatureData))
|
||||
dataFrame = data.map(NormalizerSuite.FeatureData).toSeq.toDF()
|
||||
normalizer = new Normalizer()
|
||||
.setInputCol("features")
|
||||
.setOutputCol("normalized_features")
|
||||
|
|
|
@ -30,9 +30,11 @@ import org.apache.spark.sql.types._
|
|||
class OneHotEncoderSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
def stringIndexed(): DataFrame = {
|
||||
val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2)
|
||||
val df = spark.createDataFrame(data).toDF("id", "label")
|
||||
val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
|
||||
val df = data.toDF("id", "label")
|
||||
val indexer = new StringIndexer()
|
||||
.setInputCol("label")
|
||||
.setOutputCol("labelIndex")
|
||||
|
@ -83,7 +85,7 @@ class OneHotEncoderSuite
|
|||
|
||||
test("input column with ML attribute") {
|
||||
val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large")
|
||||
val df = spark.createDataFrame(Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply)).toDF("size")
|
||||
val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("size")
|
||||
.select(col("size").as("size", attr.toMetadata()))
|
||||
val encoder = new OneHotEncoder()
|
||||
.setInputCol("size")
|
||||
|
@ -96,7 +98,7 @@ class OneHotEncoderSuite
|
|||
}
|
||||
|
||||
test("input column without ML attribute") {
|
||||
val df = spark.createDataFrame(Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply)).toDF("index")
|
||||
val df = Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply).toDF("index")
|
||||
val encoder = new OneHotEncoder()
|
||||
.setInputCol("index")
|
||||
.setOutputCol("encoded")
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.spark.sql.Row
|
|||
|
||||
class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new PCA)
|
||||
val mat = Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix]
|
||||
|
@ -50,7 +52,7 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
|
|||
val pc = mat.computePrincipalComponents(3)
|
||||
val expected = mat.multiply(pc).rows.map(_.asML)
|
||||
|
||||
val df = spark.createDataFrame(dataRDD.zip(expected)).toDF("features", "expected")
|
||||
val df = dataRDD.zip(expected).toDF("features", "expected")
|
||||
|
||||
val pca = new PCA()
|
||||
.setInputCol("features")
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.spark.sql.Row
|
|||
class PolynomialExpansionSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new PolynomialExpansion)
|
||||
}
|
||||
|
@ -59,7 +61,7 @@ class PolynomialExpansionSuite
|
|||
Vectors.sparse(19, Array.empty, Array.empty))
|
||||
|
||||
test("Polynomial expansion with default parameter") {
|
||||
val df = spark.createDataFrame(data.zip(twoDegreeExpansion)).toDF("features", "expected")
|
||||
val df = data.zip(twoDegreeExpansion).toSeq.toDF("features", "expected")
|
||||
|
||||
val polynomialExpansion = new PolynomialExpansion()
|
||||
.setInputCol("features")
|
||||
|
@ -76,7 +78,7 @@ class PolynomialExpansionSuite
|
|||
}
|
||||
|
||||
test("Polynomial expansion with setter") {
|
||||
val df = spark.createDataFrame(data.zip(threeDegreeExpansion)).toDF("features", "expected")
|
||||
val df = data.zip(threeDegreeExpansion).toSeq.toDF("features", "expected")
|
||||
|
||||
val polynomialExpansion = new PolynomialExpansion()
|
||||
.setInputCol("features")
|
||||
|
@ -94,7 +96,7 @@ class PolynomialExpansionSuite
|
|||
}
|
||||
|
||||
test("Polynomial expansion with degree 1 is identity on vectors") {
|
||||
val df = spark.createDataFrame(data.zip(data)).toDF("features", "expected")
|
||||
val df = data.zip(data).toSeq.toDF("features", "expected")
|
||||
|
||||
val polynomialExpansion = new PolynomialExpansion()
|
||||
.setInputCol("features")
|
||||
|
@ -124,8 +126,7 @@ class PolynomialExpansionSuite
|
|||
(Vectors.dense(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), 8007, 12375)
|
||||
)
|
||||
|
||||
val df = spark.createDataFrame(data)
|
||||
.toDF("features", "expectedPoly10size", "expectedPoly11size")
|
||||
val df = data.toSeq.toDF("features", "expectedPoly10size", "expectedPoly11size")
|
||||
|
||||
val t = new PolynomialExpansion()
|
||||
.setInputCol("features")
|
||||
|
|
|
@ -26,22 +26,23 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
|
|||
import org.apache.spark.sql.types.DoubleType
|
||||
|
||||
class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new RFormula())
|
||||
}
|
||||
|
||||
test("transform numeric data") {
|
||||
val formula = new RFormula().setFormula("id ~ v1 + v2")
|
||||
val original = spark.createDataFrame(
|
||||
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
|
||||
val original = Seq((0, 1.0, 3.0), (2, 2.0, 5.0)).toDF("id", "v1", "v2")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val resultSchema = model.transformSchema(original.schema)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
(0, 1.0, 3.0, Vectors.dense(1.0, 3.0), 0.0),
|
||||
(2, 2.0, 5.0, Vectors.dense(2.0, 5.0), 2.0))
|
||||
).toDF("id", "v1", "v2", "features", "label")
|
||||
val expected = Seq(
|
||||
(0, 1.0, 3.0, Vectors.dense(1.0, 3.0), 0.0),
|
||||
(2, 2.0, 5.0, Vectors.dense(2.0, 5.0), 2.0)
|
||||
).toDF("id", "v1", "v2", "features", "label")
|
||||
// TODO(ekl) make schema comparisons ignore metadata, to avoid .toString
|
||||
assert(result.schema.toString == resultSchema.toString)
|
||||
assert(resultSchema == expected.schema)
|
||||
|
@ -50,7 +51,7 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("features column already exists") {
|
||||
val formula = new RFormula().setFormula("y ~ x").setFeaturesCol("x")
|
||||
val original = spark.createDataFrame(Seq((0, 1.0), (2, 2.0))).toDF("x", "y")
|
||||
val original = Seq((0, 1.0), (2, 2.0)).toDF("x", "y")
|
||||
intercept[IllegalArgumentException] {
|
||||
formula.fit(original)
|
||||
}
|
||||
|
@ -58,7 +59,7 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("label column already exists") {
|
||||
val formula = new RFormula().setFormula("y ~ x").setLabelCol("y")
|
||||
val original = spark.createDataFrame(Seq((0, 1.0), (2, 2.0))).toDF("x", "y")
|
||||
val original = Seq((0, 1.0), (2, 2.0)).toDF("x", "y")
|
||||
val model = formula.fit(original)
|
||||
val resultSchema = model.transformSchema(original.schema)
|
||||
assert(resultSchema.length == 3)
|
||||
|
@ -67,7 +68,7 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("label column already exists but is not numeric type") {
|
||||
val formula = new RFormula().setFormula("y ~ x").setLabelCol("y")
|
||||
val original = spark.createDataFrame(Seq((0, true), (2, false))).toDF("x", "y")
|
||||
val original = Seq((0, true), (2, false)).toDF("x", "y")
|
||||
val model = formula.fit(original)
|
||||
intercept[IllegalArgumentException] {
|
||||
model.transformSchema(original.schema)
|
||||
|
@ -79,7 +80,7 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("allow missing label column for test datasets") {
|
||||
val formula = new RFormula().setFormula("y ~ x").setLabelCol("label")
|
||||
val original = spark.createDataFrame(Seq((0, 1.0), (2, 2.0))).toDF("x", "_not_y")
|
||||
val original = Seq((0, 1.0), (2, 2.0)).toDF("x", "_not_y")
|
||||
val model = formula.fit(original)
|
||||
val resultSchema = model.transformSchema(original.schema)
|
||||
assert(resultSchema.length == 3)
|
||||
|
@ -88,37 +89,32 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
}
|
||||
|
||||
test("allow empty label") {
|
||||
val original = spark.createDataFrame(
|
||||
Seq((1, 2.0, 3.0), (4, 5.0, 6.0), (7, 8.0, 9.0))
|
||||
).toDF("id", "a", "b")
|
||||
val original = Seq((1, 2.0, 3.0), (4, 5.0, 6.0), (7, 8.0, 9.0)).toDF("id", "a", "b")
|
||||
val formula = new RFormula().setFormula("~ a + b")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val resultSchema = model.transformSchema(original.schema)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
(1, 2.0, 3.0, Vectors.dense(2.0, 3.0)),
|
||||
(4, 5.0, 6.0, Vectors.dense(5.0, 6.0)),
|
||||
(7, 8.0, 9.0, Vectors.dense(8.0, 9.0)))
|
||||
).toDF("id", "a", "b", "features")
|
||||
val expected = Seq(
|
||||
(1, 2.0, 3.0, Vectors.dense(2.0, 3.0)),
|
||||
(4, 5.0, 6.0, Vectors.dense(5.0, 6.0)),
|
||||
(7, 8.0, 9.0, Vectors.dense(8.0, 9.0))
|
||||
).toDF("id", "a", "b", "features")
|
||||
assert(result.schema.toString == resultSchema.toString)
|
||||
assert(result.collect() === expected.collect())
|
||||
}
|
||||
|
||||
test("encodes string terms") {
|
||||
val formula = new RFormula().setFormula("id ~ a + b")
|
||||
val original = spark.createDataFrame(
|
||||
Seq((1, "foo", 4), (2, "bar", 4), (3, "bar", 5), (4, "baz", 5))
|
||||
).toDF("id", "a", "b")
|
||||
val original = Seq((1, "foo", 4), (2, "bar", 4), (3, "bar", 5), (4, "baz", 5))
|
||||
.toDF("id", "a", "b")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val resultSchema = model.transformSchema(original.schema)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
val expected = Seq(
|
||||
(1, "foo", 4, Vectors.dense(0.0, 1.0, 4.0), 1.0),
|
||||
(2, "bar", 4, Vectors.dense(1.0, 0.0, 4.0), 2.0),
|
||||
(3, "bar", 5, Vectors.dense(1.0, 0.0, 5.0), 3.0),
|
||||
(4, "baz", 5, Vectors.dense(0.0, 0.0, 5.0), 4.0))
|
||||
(4, "baz", 5, Vectors.dense(0.0, 0.0, 5.0), 4.0)
|
||||
).toDF("id", "a", "b", "features", "label")
|
||||
assert(result.schema.toString == resultSchema.toString)
|
||||
assert(result.collect() === expected.collect())
|
||||
|
@ -126,17 +122,16 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("index string label") {
|
||||
val formula = new RFormula().setFormula("id ~ a + b")
|
||||
val original = spark.createDataFrame(
|
||||
val original =
|
||||
Seq(("male", "foo", 4), ("female", "bar", 4), ("female", "bar", 5), ("male", "baz", 5))
|
||||
).toDF("id", "a", "b")
|
||||
.toDF("id", "a", "b")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
val expected = Seq(
|
||||
("male", "foo", 4, Vectors.dense(0.0, 1.0, 4.0), 1.0),
|
||||
("female", "bar", 4, Vectors.dense(1.0, 0.0, 4.0), 0.0),
|
||||
("female", "bar", 5, Vectors.dense(1.0, 0.0, 5.0), 0.0),
|
||||
("male", "baz", 5, Vectors.dense(0.0, 0.0, 5.0), 1.0))
|
||||
("male", "baz", 5, Vectors.dense(0.0, 0.0, 5.0), 1.0)
|
||||
).toDF("id", "a", "b", "features", "label")
|
||||
// assert(result.schema.toString == resultSchema.toString)
|
||||
assert(result.collect() === expected.collect())
|
||||
|
@ -144,9 +139,8 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("attribute generation") {
|
||||
val formula = new RFormula().setFormula("id ~ a + b")
|
||||
val original = spark.createDataFrame(
|
||||
Seq((1, "foo", 4), (2, "bar", 4), (3, "bar", 5), (4, "baz", 5))
|
||||
).toDF("id", "a", "b")
|
||||
val original = Seq((1, "foo", 4), (2, "bar", 4), (3, "bar", 5), (4, "baz", 5))
|
||||
.toDF("id", "a", "b")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val attrs = AttributeGroup.fromStructField(result.schema("features"))
|
||||
|
@ -161,9 +155,8 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("vector attribute generation") {
|
||||
val formula = new RFormula().setFormula("id ~ vec")
|
||||
val original = spark.createDataFrame(
|
||||
Seq((1, Vectors.dense(0.0, 1.0)), (2, Vectors.dense(1.0, 2.0)))
|
||||
).toDF("id", "vec")
|
||||
val original = Seq((1, Vectors.dense(0.0, 1.0)), (2, Vectors.dense(1.0, 2.0)))
|
||||
.toDF("id", "vec")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val attrs = AttributeGroup.fromStructField(result.schema("features"))
|
||||
|
@ -177,9 +170,8 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("vector attribute generation with unnamed input attrs") {
|
||||
val formula = new RFormula().setFormula("id ~ vec2")
|
||||
val base = spark.createDataFrame(
|
||||
Seq((1, Vectors.dense(0.0, 1.0)), (2, Vectors.dense(1.0, 2.0)))
|
||||
).toDF("id", "vec")
|
||||
val base = Seq((1, Vectors.dense(0.0, 1.0)), (2, Vectors.dense(1.0, 2.0)))
|
||||
.toDF("id", "vec")
|
||||
val metadata = new AttributeGroup(
|
||||
"vec2",
|
||||
Array[Attribute](
|
||||
|
@ -199,16 +191,13 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("numeric interaction") {
|
||||
val formula = new RFormula().setFormula("a ~ b:c:d")
|
||||
val original = spark.createDataFrame(
|
||||
Seq((1, 2, 4, 2), (2, 3, 4, 1))
|
||||
).toDF("a", "b", "c", "d")
|
||||
val original = Seq((1, 2, 4, 2), (2, 3, 4, 1)).toDF("a", "b", "c", "d")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
(1, 2, 4, 2, Vectors.dense(16.0), 1.0),
|
||||
(2, 3, 4, 1, Vectors.dense(12.0), 2.0))
|
||||
).toDF("a", "b", "c", "d", "features", "label")
|
||||
val expected = Seq(
|
||||
(1, 2, 4, 2, Vectors.dense(16.0), 1.0),
|
||||
(2, 3, 4, 1, Vectors.dense(12.0), 2.0)
|
||||
).toDF("a", "b", "c", "d", "features", "label")
|
||||
assert(result.collect() === expected.collect())
|
||||
val attrs = AttributeGroup.fromStructField(result.schema("features"))
|
||||
val expectedAttrs = new AttributeGroup(
|
||||
|
@ -219,20 +208,19 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("factor numeric interaction") {
|
||||
val formula = new RFormula().setFormula("id ~ a:b")
|
||||
val original = spark.createDataFrame(
|
||||
val original =
|
||||
Seq((1, "foo", 4), (2, "bar", 4), (3, "bar", 5), (4, "baz", 5), (4, "baz", 5), (4, "baz", 5))
|
||||
).toDF("id", "a", "b")
|
||||
.toDF("id", "a", "b")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
(1, "foo", 4, Vectors.dense(0.0, 0.0, 4.0), 1.0),
|
||||
(2, "bar", 4, Vectors.dense(0.0, 4.0, 0.0), 2.0),
|
||||
(3, "bar", 5, Vectors.dense(0.0, 5.0, 0.0), 3.0),
|
||||
(4, "baz", 5, Vectors.dense(5.0, 0.0, 0.0), 4.0),
|
||||
(4, "baz", 5, Vectors.dense(5.0, 0.0, 0.0), 4.0),
|
||||
(4, "baz", 5, Vectors.dense(5.0, 0.0, 0.0), 4.0))
|
||||
).toDF("id", "a", "b", "features", "label")
|
||||
val expected = Seq(
|
||||
(1, "foo", 4, Vectors.dense(0.0, 0.0, 4.0), 1.0),
|
||||
(2, "bar", 4, Vectors.dense(0.0, 4.0, 0.0), 2.0),
|
||||
(3, "bar", 5, Vectors.dense(0.0, 5.0, 0.0), 3.0),
|
||||
(4, "baz", 5, Vectors.dense(5.0, 0.0, 0.0), 4.0),
|
||||
(4, "baz", 5, Vectors.dense(5.0, 0.0, 0.0), 4.0),
|
||||
(4, "baz", 5, Vectors.dense(5.0, 0.0, 0.0), 4.0)
|
||||
).toDF("id", "a", "b", "features", "label")
|
||||
assert(result.collect() === expected.collect())
|
||||
val attrs = AttributeGroup.fromStructField(result.schema("features"))
|
||||
val expectedAttrs = new AttributeGroup(
|
||||
|
@ -246,17 +234,15 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
|
||||
test("factor factor interaction") {
|
||||
val formula = new RFormula().setFormula("id ~ a:b")
|
||||
val original = spark.createDataFrame(
|
||||
Seq((1, "foo", "zq"), (2, "bar", "zq"), (3, "bar", "zz"))
|
||||
).toDF("id", "a", "b")
|
||||
val original =
|
||||
Seq((1, "foo", "zq"), (2, "bar", "zq"), (3, "bar", "zz")).toDF("id", "a", "b")
|
||||
val model = formula.fit(original)
|
||||
val result = model.transform(original)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq(
|
||||
(1, "foo", "zq", Vectors.dense(0.0, 0.0, 1.0, 0.0), 1.0),
|
||||
(2, "bar", "zq", Vectors.dense(1.0, 0.0, 0.0, 0.0), 2.0),
|
||||
(3, "bar", "zz", Vectors.dense(0.0, 1.0, 0.0, 0.0), 3.0))
|
||||
).toDF("id", "a", "b", "features", "label")
|
||||
val expected = Seq(
|
||||
(1, "foo", "zq", Vectors.dense(0.0, 0.0, 1.0, 0.0), 1.0),
|
||||
(2, "bar", "zq", Vectors.dense(1.0, 0.0, 0.0, 0.0), 2.0),
|
||||
(3, "bar", "zz", Vectors.dense(0.0, 1.0, 0.0, 0.0), 3.0)
|
||||
).toDF("id", "a", "b", "features", "label")
|
||||
assert(result.collect() === expected.collect())
|
||||
val attrs = AttributeGroup.fromStructField(result.schema("features"))
|
||||
val expectedAttrs = new AttributeGroup(
|
||||
|
@ -295,9 +281,7 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
|
|||
}
|
||||
}
|
||||
|
||||
val dataset = spark.createDataFrame(
|
||||
Seq((1, "foo", "zq"), (2, "bar", "zq"), (3, "bar", "zz"))
|
||||
).toDF("id", "a", "b")
|
||||
val dataset = Seq((1, "foo", "zq"), (2, "bar", "zq"), (3, "bar", "zz")).toDF("id", "a", "b")
|
||||
|
||||
val rFormula = new RFormula().setFormula("id ~ a:b")
|
||||
|
||||
|
|
|
@ -26,19 +26,19 @@ import org.apache.spark.sql.types.{LongType, StructField, StructType}
|
|||
class SQLTransformerSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new SQLTransformer())
|
||||
}
|
||||
|
||||
test("transform numeric data") {
|
||||
val original = spark.createDataFrame(
|
||||
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
|
||||
val original = Seq((0, 1.0, 3.0), (2, 2.0, 5.0)).toDF("id", "v1", "v2")
|
||||
val sqlTrans = new SQLTransformer().setStatement(
|
||||
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
|
||||
val result = sqlTrans.transform(original)
|
||||
val resultSchema = sqlTrans.transformSchema(original.schema)
|
||||
val expected = spark.createDataFrame(
|
||||
Seq((0, 1.0, 3.0, 4.0, 3.0), (2, 2.0, 5.0, 7.0, 10.0)))
|
||||
val expected = Seq((0, 1.0, 3.0, 4.0, 3.0), (2, 2.0, 5.0, 7.0, 10.0))
|
||||
.toDF("id", "v1", "v2", "v3", "v4")
|
||||
assert(result.schema.toString == resultSchema.toString)
|
||||
assert(resultSchema == expected.schema)
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.spark.sql.{DataFrame, Row}
|
|||
class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext
|
||||
with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var data: Array[Vector] = _
|
||||
@transient var resWithStd: Array[Vector] = _
|
||||
@transient var resWithMean: Array[Vector] = _
|
||||
|
@ -73,7 +75,7 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("Standardization with default parameter") {
|
||||
val df0 = spark.createDataFrame(data.zip(resWithStd)).toDF("features", "expected")
|
||||
val df0 = data.zip(resWithStd).toSeq.toDF("features", "expected")
|
||||
|
||||
val standardScaler0 = new StandardScaler()
|
||||
.setInputCol("features")
|
||||
|
@ -84,9 +86,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("Standardization with setter") {
|
||||
val df1 = spark.createDataFrame(data.zip(resWithBoth)).toDF("features", "expected")
|
||||
val df2 = spark.createDataFrame(data.zip(resWithMean)).toDF("features", "expected")
|
||||
val df3 = spark.createDataFrame(data.zip(data)).toDF("features", "expected")
|
||||
val df1 = data.zip(resWithBoth).toSeq.toDF("features", "expected")
|
||||
val df2 = data.zip(resWithMean).toSeq.toDF("features", "expected")
|
||||
val df3 = data.zip(data).toSeq.toDF("features", "expected")
|
||||
|
||||
val standardScaler1 = new StandardScaler()
|
||||
.setInputCol("features")
|
||||
|
@ -120,7 +122,7 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
Vectors.sparse(3, Array(1, 2), Array(-5.1, 1.0)),
|
||||
Vectors.dense(1.7, -0.6, 3.3)
|
||||
)
|
||||
val df = spark.createDataFrame(someSparseData.zip(resWithMean)).toDF("features", "expected")
|
||||
val df = someSparseData.zip(resWithMean).toSeq.toDF("features", "expected")
|
||||
val standardScaler = new StandardScaler()
|
||||
.setInputCol("features")
|
||||
.setOutputCol("standardized_features")
|
||||
|
|
|
@ -37,19 +37,20 @@ class StopWordsRemoverSuite
|
|||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import StopWordsRemoverSuite._
|
||||
import testImplicits._
|
||||
|
||||
test("StopWordsRemover default") {
|
||||
val remover = new StopWordsRemover()
|
||||
.setInputCol("raw")
|
||||
.setOutputCol("filtered")
|
||||
val dataSet = spark.createDataFrame(Seq(
|
||||
val dataSet = Seq(
|
||||
(Seq("test", "test"), Seq("test", "test")),
|
||||
(Seq("a", "b", "c", "d"), Seq("b", "c")),
|
||||
(Seq("a", "the", "an"), Seq()),
|
||||
(Seq("A", "The", "AN"), Seq()),
|
||||
(Seq(null), Seq(null)),
|
||||
(Seq(), Seq())
|
||||
)).toDF("raw", "expected")
|
||||
).toDF("raw", "expected")
|
||||
|
||||
testStopWordsRemover(remover, dataSet)
|
||||
}
|
||||
|
@ -60,14 +61,14 @@ class StopWordsRemoverSuite
|
|||
.setInputCol("raw")
|
||||
.setOutputCol("filtered")
|
||||
.setStopWords(stopWords)
|
||||
val dataSet = spark.createDataFrame(Seq(
|
||||
val dataSet = Seq(
|
||||
(Seq("test", "test"), Seq()),
|
||||
(Seq("a", "b", "c", "d"), Seq("b", "c", "d")),
|
||||
(Seq("a", "the", "an"), Seq()),
|
||||
(Seq("A", "The", "AN"), Seq()),
|
||||
(Seq(null), Seq(null)),
|
||||
(Seq(), Seq())
|
||||
)).toDF("raw", "expected")
|
||||
).toDF("raw", "expected")
|
||||
|
||||
testStopWordsRemover(remover, dataSet)
|
||||
}
|
||||
|
@ -77,10 +78,10 @@ class StopWordsRemoverSuite
|
|||
.setInputCol("raw")
|
||||
.setOutputCol("filtered")
|
||||
.setCaseSensitive(true)
|
||||
val dataSet = spark.createDataFrame(Seq(
|
||||
val dataSet = Seq(
|
||||
(Seq("A"), Seq("A")),
|
||||
(Seq("The", "the"), Seq("The"))
|
||||
)).toDF("raw", "expected")
|
||||
).toDF("raw", "expected")
|
||||
|
||||
testStopWordsRemover(remover, dataSet)
|
||||
}
|
||||
|
@ -98,10 +99,10 @@ class StopWordsRemoverSuite
|
|||
.setInputCol("raw")
|
||||
.setOutputCol("filtered")
|
||||
.setStopWords(stopWords)
|
||||
val dataSet = spark.createDataFrame(Seq(
|
||||
val dataSet = Seq(
|
||||
(Seq("acaba", "ama", "biri"), Seq()),
|
||||
(Seq("hep", "her", "scala"), Seq("scala"))
|
||||
)).toDF("raw", "expected")
|
||||
).toDF("raw", "expected")
|
||||
|
||||
testStopWordsRemover(remover, dataSet)
|
||||
}
|
||||
|
@ -112,10 +113,10 @@ class StopWordsRemoverSuite
|
|||
.setInputCol("raw")
|
||||
.setOutputCol("filtered")
|
||||
.setStopWords(stopWords.toArray)
|
||||
val dataSet = spark.createDataFrame(Seq(
|
||||
val dataSet = Seq(
|
||||
(Seq("python", "scala", "a"), Seq("python", "scala", "a")),
|
||||
(Seq("Python", "Scala", "swift"), Seq("Python", "Scala", "swift"))
|
||||
)).toDF("raw", "expected")
|
||||
).toDF("raw", "expected")
|
||||
|
||||
testStopWordsRemover(remover, dataSet)
|
||||
}
|
||||
|
@ -126,10 +127,10 @@ class StopWordsRemoverSuite
|
|||
.setInputCol("raw")
|
||||
.setOutputCol("filtered")
|
||||
.setStopWords(stopWords.toArray)
|
||||
val dataSet = spark.createDataFrame(Seq(
|
||||
val dataSet = Seq(
|
||||
(Seq("python", "scala", "a"), Seq()),
|
||||
(Seq("Python", "Scala", "swift"), Seq("swift"))
|
||||
)).toDF("raw", "expected")
|
||||
).toDF("raw", "expected")
|
||||
|
||||
testStopWordsRemover(remover, dataSet)
|
||||
}
|
||||
|
@ -148,9 +149,7 @@ class StopWordsRemoverSuite
|
|||
val remover = new StopWordsRemover()
|
||||
.setInputCol("raw")
|
||||
.setOutputCol(outputCol)
|
||||
val dataSet = spark.createDataFrame(Seq(
|
||||
(Seq("The", "the", "swift"), Seq("swift"))
|
||||
)).toDF("raw", outputCol)
|
||||
val dataSet = Seq((Seq("The", "the", "swift"), Seq("swift"))).toDF("raw", outputCol)
|
||||
|
||||
val thrown = intercept[IllegalArgumentException] {
|
||||
testStopWordsRemover(remover, dataSet)
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructTy
|
|||
class StringIndexerSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new StringIndexer)
|
||||
val model = new StringIndexerModel("indexer", Array("a", "b"))
|
||||
|
@ -38,8 +40,8 @@ class StringIndexerSuite
|
|||
}
|
||||
|
||||
test("StringIndexer") {
|
||||
val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2)
|
||||
val df = spark.createDataFrame(data).toDF("id", "label")
|
||||
val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
|
||||
val df = data.toDF("id", "label")
|
||||
val indexer = new StringIndexer()
|
||||
.setInputCol("label")
|
||||
.setOutputCol("labelIndex")
|
||||
|
@ -61,10 +63,10 @@ class StringIndexerSuite
|
|||
}
|
||||
|
||||
test("StringIndexerUnseen") {
|
||||
val data = sc.parallelize(Seq((0, "a"), (1, "b"), (4, "b")), 2)
|
||||
val data2 = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c")), 2)
|
||||
val df = spark.createDataFrame(data).toDF("id", "label")
|
||||
val df2 = spark.createDataFrame(data2).toDF("id", "label")
|
||||
val data = Seq((0, "a"), (1, "b"), (4, "b"))
|
||||
val data2 = Seq((0, "a"), (1, "b"), (2, "c"))
|
||||
val df = data.toDF("id", "label")
|
||||
val df2 = data2.toDF("id", "label")
|
||||
val indexer = new StringIndexer()
|
||||
.setInputCol("label")
|
||||
.setOutputCol("labelIndex")
|
||||
|
@ -92,8 +94,8 @@ class StringIndexerSuite
|
|||
}
|
||||
|
||||
test("StringIndexer with a numeric input column") {
|
||||
val data = sc.parallelize(Seq((0, 100), (1, 200), (2, 300), (3, 100), (4, 100), (5, 300)), 2)
|
||||
val df = spark.createDataFrame(data).toDF("id", "label")
|
||||
val data = Seq((0, 100), (1, 200), (2, 300), (3, 100), (4, 100), (5, 300))
|
||||
val df = data.toDF("id", "label")
|
||||
val indexer = new StringIndexer()
|
||||
.setInputCol("label")
|
||||
.setOutputCol("labelIndex")
|
||||
|
@ -119,7 +121,7 @@ class StringIndexerSuite
|
|||
}
|
||||
|
||||
test("StringIndexerModel can't overwrite output column") {
|
||||
val df = spark.createDataFrame(Seq((1, 2), (3, 4))).toDF("input", "output")
|
||||
val df = Seq((1, 2), (3, 4)).toDF("input", "output")
|
||||
intercept[IllegalArgumentException] {
|
||||
new StringIndexer()
|
||||
.setInputCol("input")
|
||||
|
@ -161,9 +163,7 @@ class StringIndexerSuite
|
|||
|
||||
test("IndexToString.transform") {
|
||||
val labels = Array("a", "b", "c")
|
||||
val df0 = spark.createDataFrame(Seq(
|
||||
(0, "a"), (1, "b"), (2, "c"), (0, "a")
|
||||
)).toDF("index", "expected")
|
||||
val df0 = Seq((0, "a"), (1, "b"), (2, "c"), (0, "a")).toDF("index", "expected")
|
||||
|
||||
val idxToStr0 = new IndexToString()
|
||||
.setInputCol("index")
|
||||
|
@ -187,8 +187,8 @@ class StringIndexerSuite
|
|||
}
|
||||
|
||||
test("StringIndexer, IndexToString are inverses") {
|
||||
val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2)
|
||||
val df = spark.createDataFrame(data).toDF("id", "label")
|
||||
val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
|
||||
val df = data.toDF("id", "label")
|
||||
val indexer = new StringIndexer()
|
||||
.setInputCol("label")
|
||||
.setOutputCol("labelIndex")
|
||||
|
@ -220,8 +220,8 @@ class StringIndexerSuite
|
|||
}
|
||||
|
||||
test("StringIndexer metadata") {
|
||||
val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2)
|
||||
val df = spark.createDataFrame(data).toDF("id", "label")
|
||||
val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
|
||||
val df = data.toDF("id", "label")
|
||||
val indexer = new StringIndexer()
|
||||
.setInputCol("label")
|
||||
.setOutputCol("labelIndex")
|
||||
|
|
|
@ -46,6 +46,7 @@ class RegexTokenizerSuite
|
|||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import org.apache.spark.ml.feature.RegexTokenizerSuite._
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new RegexTokenizer)
|
||||
|
@ -57,26 +58,26 @@ class RegexTokenizerSuite
|
|||
.setPattern("\\w+|\\p{Punct}")
|
||||
.setInputCol("rawText")
|
||||
.setOutputCol("tokens")
|
||||
val dataset0 = spark.createDataFrame(Seq(
|
||||
val dataset0 = Seq(
|
||||
TokenizerTestData("Test for tokenization.", Array("test", "for", "tokenization", ".")),
|
||||
TokenizerTestData("Te,st. punct", Array("te", ",", "st", ".", "punct"))
|
||||
))
|
||||
).toDF()
|
||||
testRegexTokenizer(tokenizer0, dataset0)
|
||||
|
||||
val dataset1 = spark.createDataFrame(Seq(
|
||||
val dataset1 = Seq(
|
||||
TokenizerTestData("Test for tokenization.", Array("test", "for", "tokenization")),
|
||||
TokenizerTestData("Te,st. punct", Array("punct"))
|
||||
))
|
||||
).toDF()
|
||||
tokenizer0.setMinTokenLength(3)
|
||||
testRegexTokenizer(tokenizer0, dataset1)
|
||||
|
||||
val tokenizer2 = new RegexTokenizer()
|
||||
.setInputCol("rawText")
|
||||
.setOutputCol("tokens")
|
||||
val dataset2 = spark.createDataFrame(Seq(
|
||||
val dataset2 = Seq(
|
||||
TokenizerTestData("Test for tokenization.", Array("test", "for", "tokenization.")),
|
||||
TokenizerTestData("Te,st. punct", Array("te,st.", "punct"))
|
||||
))
|
||||
).toDF()
|
||||
testRegexTokenizer(tokenizer2, dataset2)
|
||||
}
|
||||
|
||||
|
@ -85,10 +86,10 @@ class RegexTokenizerSuite
|
|||
.setInputCol("rawText")
|
||||
.setOutputCol("tokens")
|
||||
.setToLowercase(false)
|
||||
val dataset = spark.createDataFrame(Seq(
|
||||
val dataset = Seq(
|
||||
TokenizerTestData("JAVA SCALA", Array("JAVA", "SCALA")),
|
||||
TokenizerTestData("java scala", Array("java", "scala"))
|
||||
))
|
||||
).toDF()
|
||||
testRegexTokenizer(tokenizer, dataset)
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.spark.sql.functions.col
|
|||
class VectorAssemblerSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("params") {
|
||||
ParamsSuite.checkParams(new VectorAssembler)
|
||||
}
|
||||
|
@ -57,9 +59,9 @@ class VectorAssemblerSuite
|
|||
}
|
||||
|
||||
test("VectorAssembler") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
(0, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 10L)
|
||||
)).toDF("id", "x", "y", "name", "z", "n")
|
||||
).toDF("id", "x", "y", "name", "z", "n")
|
||||
val assembler = new VectorAssembler()
|
||||
.setInputCols(Array("x", "y", "z", "n"))
|
||||
.setOutputCol("features")
|
||||
|
@ -70,7 +72,7 @@ class VectorAssemblerSuite
|
|||
}
|
||||
|
||||
test("transform should throw an exception in case of unsupported type") {
|
||||
val df = spark.createDataFrame(Seq(("a", "b", "c"))).toDF("a", "b", "c")
|
||||
val df = Seq(("a", "b", "c")).toDF("a", "b", "c")
|
||||
val assembler = new VectorAssembler()
|
||||
.setInputCols(Array("a", "b", "c"))
|
||||
.setOutputCol("features")
|
||||
|
@ -87,7 +89,7 @@ class VectorAssemblerSuite
|
|||
NominalAttribute.defaultAttr.withName("gender").withValues("male", "female"),
|
||||
NumericAttribute.defaultAttr.withName("salary")))
|
||||
val row = (1.0, 0.5, 1, Vectors.dense(1.0, 1000.0), Vectors.sparse(2, Array(1), Array(2.0)))
|
||||
val df = spark.createDataFrame(Seq(row)).toDF("browser", "hour", "count", "user", "ad")
|
||||
val df = Seq(row).toDF("browser", "hour", "count", "user", "ad")
|
||||
.select(
|
||||
col("browser").as("browser", browser.toMetadata()),
|
||||
col("hour").as("hour", hour.toMetadata()),
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.spark.sql.DataFrame
|
|||
class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
|
||||
with DefaultReadWriteTest with Logging {
|
||||
|
||||
import testImplicits._
|
||||
import VectorIndexerSuite.FeatureData
|
||||
|
||||
// identical, of length 3
|
||||
|
@ -85,11 +86,13 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
checkPair(densePoints1Seq, sparsePoints1Seq)
|
||||
checkPair(densePoints2Seq, sparsePoints2Seq)
|
||||
|
||||
densePoints1 = spark.createDataFrame(sc.parallelize(densePoints1Seq, 2).map(FeatureData))
|
||||
sparsePoints1 = spark.createDataFrame(sc.parallelize(sparsePoints1Seq, 2).map(FeatureData))
|
||||
densePoints2 = spark.createDataFrame(sc.parallelize(densePoints2Seq, 2).map(FeatureData))
|
||||
sparsePoints2 = spark.createDataFrame(sc.parallelize(sparsePoints2Seq, 2).map(FeatureData))
|
||||
badPoints = spark.createDataFrame(sc.parallelize(badPointsSeq, 2).map(FeatureData))
|
||||
densePoints1 = densePoints1Seq.map(FeatureData).toDF()
|
||||
sparsePoints1 = sparsePoints1Seq.map(FeatureData).toDF()
|
||||
// TODO: If we directly use `toDF` without parallelize, the test in
|
||||
// "Throws error when given RDDs with different size vectors" is failed for an unknown reason.
|
||||
densePoints2 = sc.parallelize(densePoints2Seq, 2).map(FeatureData).toDF()
|
||||
sparsePoints2 = sparsePoints2Seq.map(FeatureData).toDF()
|
||||
badPoints = badPointsSeq.map(FeatureData).toDF()
|
||||
}
|
||||
|
||||
private def getIndexer: VectorIndexer =
|
||||
|
@ -102,7 +105,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("Cannot fit an empty DataFrame") {
|
||||
val rdd = spark.createDataFrame(sc.parallelize(Array.empty[Vector], 2).map(FeatureData))
|
||||
val rdd = Array.empty[Vector].map(FeatureData).toSeq.toDF()
|
||||
val vectorIndexer = getIndexer
|
||||
intercept[IllegalArgumentException] {
|
||||
vectorIndexer.fit(rdd)
|
||||
|
|
|
@ -31,23 +31,22 @@ import org.apache.spark.sql.{DataFrame, Row}
|
|||
class AFTSurvivalRegressionSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var datasetUnivariate: DataFrame = _
|
||||
@transient var datasetMultivariate: DataFrame = _
|
||||
@transient var datasetUnivariateScaled: DataFrame = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
datasetUnivariate = spark.createDataFrame(
|
||||
sc.parallelize(generateAFTInput(
|
||||
1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0)))
|
||||
datasetMultivariate = spark.createDataFrame(
|
||||
sc.parallelize(generateAFTInput(
|
||||
2, Array(0.9, -1.3), Array(0.7, 1.2), 1000, 42, 1.5, 2.5, 2.0)))
|
||||
datasetUnivariateScaled = spark.createDataFrame(
|
||||
sc.parallelize(generateAFTInput(
|
||||
1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0)).map { x =>
|
||||
AFTPoint(Vectors.dense(x.features(0) * 1.0E3), x.label, x.censor)
|
||||
})
|
||||
datasetUnivariate = generateAFTInput(
|
||||
1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0).toDF()
|
||||
datasetMultivariate = generateAFTInput(
|
||||
2, Array(0.9, -1.3), Array(0.7, 1.2), 1000, 42, 1.5, 2.5, 2.0).toDF()
|
||||
datasetUnivariateScaled = sc.parallelize(
|
||||
generateAFTInput(1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0)).map { x =>
|
||||
AFTPoint(Vectors.dense(x.features(0) * 1.0E3), x.label, x.censor)
|
||||
}.toDF()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -396,9 +395,8 @@ class AFTSurvivalRegressionSuite
|
|||
// the parallelism is bigger than that. Because the issue was about `AFTAggregator`s
|
||||
// being merged incorrectly when it has an empty partition, running the codes below
|
||||
// should not throw an exception.
|
||||
val dataset = spark.createDataFrame(
|
||||
sc.parallelize(generateAFTInput(
|
||||
1, Array(5.5), Array(0.8), 2, 42, 1.0, 2.0, 2.0), numSlices = 3))
|
||||
val dataset = sc.parallelize(generateAFTInput(
|
||||
1, Array(5.5), Array(0.8), 2, 42, 1.0, 2.0, 2.0), numSlices = 3).toDF()
|
||||
val trainer = new AFTSurvivalRegression()
|
||||
trainer.fit(dataset)
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
with DefaultReadWriteTest {
|
||||
|
||||
import GBTRegressorSuite.compareAPIs
|
||||
import testImplicits._
|
||||
|
||||
// Combinations for estimators, learning rates and subsamplingRate
|
||||
private val testCombinations =
|
||||
|
@ -76,14 +77,14 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
}
|
||||
|
||||
test("GBTRegressor behaves reasonably on toy data") {
|
||||
val df = spark.createDataFrame(Seq(
|
||||
val df = Seq(
|
||||
LabeledPoint(10, Vectors.dense(1, 2, 3, 4)),
|
||||
LabeledPoint(-5, Vectors.dense(6, 3, 2, 1)),
|
||||
LabeledPoint(11, Vectors.dense(2, 2, 3, 4)),
|
||||
LabeledPoint(-6, Vectors.dense(6, 4, 2, 1)),
|
||||
LabeledPoint(9, Vectors.dense(1, 2, 6, 4)),
|
||||
LabeledPoint(-4, Vectors.dense(6, 3, 2, 2))
|
||||
))
|
||||
).toDF()
|
||||
val gbt = new GBTRegressor()
|
||||
.setMaxDepth(2)
|
||||
.setMaxIter(2)
|
||||
|
@ -103,7 +104,7 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext
|
|||
val path = tempDir.toURI.toString
|
||||
sc.setCheckpointDir(path)
|
||||
|
||||
val df = spark.createDataFrame(data)
|
||||
val df = data.toDF()
|
||||
val gbt = new GBTRegressor()
|
||||
.setMaxDepth(2)
|
||||
.setMaxIter(5)
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.spark.sql.functions._
|
|||
class GeneralizedLinearRegressionSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
private val seed: Int = 42
|
||||
@transient var datasetGaussianIdentity: DataFrame = _
|
||||
@transient var datasetGaussianLog: DataFrame = _
|
||||
|
@ -52,23 +54,20 @@ class GeneralizedLinearRegressionSuite
|
|||
|
||||
import GeneralizedLinearRegressionSuite._
|
||||
|
||||
datasetGaussianIdentity = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gaussian", link = "identity"), 2))
|
||||
datasetGaussianIdentity = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gaussian", link = "identity").toDF()
|
||||
|
||||
datasetGaussianLog = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gaussian", link = "log"), 2))
|
||||
datasetGaussianLog = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gaussian", link = "log").toDF()
|
||||
|
||||
datasetGaussianInverse = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gaussian", link = "inverse"), 2))
|
||||
datasetGaussianInverse = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gaussian", link = "inverse").toDF()
|
||||
|
||||
datasetBinomial = {
|
||||
val nPoints = 10000
|
||||
|
@ -80,44 +79,38 @@ class GeneralizedLinearRegressionSuite
|
|||
generateMultinomialLogisticInput(coefficients, xMean, xVariance,
|
||||
addIntercept = true, nPoints, seed)
|
||||
|
||||
spark.createDataFrame(sc.parallelize(testData, 2))
|
||||
testData.toDF()
|
||||
}
|
||||
|
||||
datasetPoissonLog = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "poisson", link = "log"), 2))
|
||||
datasetPoissonLog = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "poisson", link = "log").toDF()
|
||||
|
||||
datasetPoissonIdentity = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "poisson", link = "identity"), 2))
|
||||
datasetPoissonIdentity = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "poisson", link = "identity").toDF()
|
||||
|
||||
datasetPoissonSqrt = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "poisson", link = "sqrt"), 2))
|
||||
datasetPoissonSqrt = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "poisson", link = "sqrt").toDF()
|
||||
|
||||
datasetGammaInverse = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gamma", link = "inverse"), 2))
|
||||
datasetGammaInverse = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gamma", link = "inverse").toDF()
|
||||
|
||||
datasetGammaIdentity = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gamma", link = "identity"), 2))
|
||||
datasetGammaIdentity = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gamma", link = "identity").toDF()
|
||||
|
||||
datasetGammaLog = spark.createDataFrame(
|
||||
sc.parallelize(generateGeneralizedLinearRegressionInput(
|
||||
intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gamma", link = "log"), 2))
|
||||
datasetGammaLog = generateGeneralizedLinearRegressionInput(
|
||||
intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
|
||||
family = "gamma", link = "log").toDF()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -540,12 +533,12 @@ class GeneralizedLinearRegressionSuite
|
|||
w <- c(1, 2, 3, 4)
|
||||
df <- as.data.frame(cbind(A, b))
|
||||
*/
|
||||
val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq(
|
||||
val datasetWithWeight = Seq(
|
||||
Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(29.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2))
|
||||
).toDF()
|
||||
/*
|
||||
R code:
|
||||
|
||||
|
@ -668,12 +661,12 @@ class GeneralizedLinearRegressionSuite
|
|||
w <- c(1, 2, 3, 4)
|
||||
df <- as.data.frame(cbind(A, b))
|
||||
*/
|
||||
val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq(
|
||||
val datasetWithWeight = Seq(
|
||||
Instance(1.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(0.0, 2.0, Vectors.dense(1.0, 2.0)),
|
||||
Instance(1.0, 3.0, Vectors.dense(2.0, 1.0)),
|
||||
Instance(0.0, 4.0, Vectors.dense(3.0, 3.0))
|
||||
), 2))
|
||||
).toDF()
|
||||
/*
|
||||
R code:
|
||||
|
||||
|
@ -782,12 +775,12 @@ class GeneralizedLinearRegressionSuite
|
|||
w <- c(1, 2, 3, 4)
|
||||
df <- as.data.frame(cbind(A, b))
|
||||
*/
|
||||
val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq(
|
||||
val datasetWithWeight = Seq(
|
||||
Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(9.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2))
|
||||
).toDF()
|
||||
/*
|
||||
R code:
|
||||
|
||||
|
@ -899,12 +892,12 @@ class GeneralizedLinearRegressionSuite
|
|||
w <- c(1, 2, 3, 4)
|
||||
df <- as.data.frame(cbind(A, b))
|
||||
*/
|
||||
val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq(
|
||||
val datasetWithWeight = Seq(
|
||||
Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(9.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2))
|
||||
).toDF()
|
||||
/*
|
||||
R code:
|
||||
|
||||
|
@ -1054,12 +1047,12 @@ class GeneralizedLinearRegressionSuite
|
|||
[1] 12.92681
|
||||
[1] 13.32836
|
||||
*/
|
||||
val dataset = spark.createDataFrame(Seq(
|
||||
val dataset = Seq(
|
||||
LabeledPoint(1, Vectors.dense(5, 0)),
|
||||
LabeledPoint(0, Vectors.dense(2, 1)),
|
||||
LabeledPoint(1, Vectors.dense(1, 2)),
|
||||
LabeledPoint(0, Vectors.dense(3, 3))
|
||||
))
|
||||
).toDF()
|
||||
val expected = Seq(12.88188, 12.92681, 13.32836)
|
||||
|
||||
var idx = 0
|
||||
|
|
|
@ -27,15 +27,15 @@ import org.apache.spark.sql.{DataFrame, Row}
|
|||
class IsotonicRegressionSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
private def generateIsotonicInput(labels: Seq[Double]): DataFrame = {
|
||||
spark.createDataFrame(
|
||||
labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) }
|
||||
).toDF("label", "features", "weight")
|
||||
labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) }
|
||||
.toDF("label", "features", "weight")
|
||||
}
|
||||
|
||||
private def generatePredictionInput(features: Seq[Double]): DataFrame = {
|
||||
spark.createDataFrame(features.map(Tuple1.apply))
|
||||
.toDF("features")
|
||||
features.map(Tuple1.apply).toDF("features")
|
||||
}
|
||||
|
||||
test("isotonic regression predictions") {
|
||||
|
@ -145,10 +145,10 @@ class IsotonicRegressionSuite
|
|||
}
|
||||
|
||||
test("vector features column with feature index") {
|
||||
val dataset = spark.createDataFrame(Seq(
|
||||
val dataset = Seq(
|
||||
(4.0, Vectors.dense(0.0, 1.0)),
|
||||
(3.0, Vectors.dense(0.0, 2.0)),
|
||||
(5.0, Vectors.sparse(2, Array(1), Array(3.0))))
|
||||
(5.0, Vectors.sparse(2, Array(1), Array(3.0)))
|
||||
).toDF("label", "features")
|
||||
|
||||
val ir = new IsotonicRegression()
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.spark.sql.{DataFrame, Row}
|
|||
class LinearRegressionSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
private val seed: Int = 42
|
||||
@transient var datasetWithDenseFeature: DataFrame = _
|
||||
@transient var datasetWithDenseFeatureWithoutIntercept: DataFrame = _
|
||||
|
@ -42,29 +44,27 @@ class LinearRegressionSuite
|
|||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
datasetWithDenseFeature = spark.createDataFrame(
|
||||
sc.parallelize(LinearDataGenerator.generateLinearInput(
|
||||
intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML))
|
||||
datasetWithDenseFeature = sc.parallelize(LinearDataGenerator.generateLinearInput(
|
||||
intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML).toDF()
|
||||
/*
|
||||
datasetWithDenseFeatureWithoutIntercept is not needed for correctness testing
|
||||
but is useful for illustrating training model without intercept
|
||||
*/
|
||||
datasetWithDenseFeatureWithoutIntercept = spark.createDataFrame(
|
||||
sc.parallelize(LinearDataGenerator.generateLinearInput(
|
||||
datasetWithDenseFeatureWithoutIntercept = sc.parallelize(
|
||||
LinearDataGenerator.generateLinearInput(
|
||||
intercept = 0.0, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML))
|
||||
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML).toDF()
|
||||
|
||||
val r = new Random(seed)
|
||||
// When feature size is larger than 4096, normal optimizer is choosed
|
||||
// as the solver of linear regression in the case of "auto" mode.
|
||||
val featureSize = 4100
|
||||
datasetWithSparseFeature = spark.createDataFrame(
|
||||
sc.parallelize(LinearDataGenerator.generateLinearInput(
|
||||
datasetWithSparseFeature = sc.parallelize(LinearDataGenerator.generateLinearInput(
|
||||
intercept = 0.0, weights = Seq.fill(featureSize)(r.nextDouble()).toArray,
|
||||
xMean = Seq.fill(featureSize)(r.nextDouble()).toArray,
|
||||
xVariance = Seq.fill(featureSize)(r.nextDouble()).toArray, nPoints = 200,
|
||||
seed, eps = 0.1, sparsity = 0.7), 2).map(_.asML))
|
||||
seed, eps = 0.1, sparsity = 0.7), 2).map(_.asML).toDF()
|
||||
|
||||
/*
|
||||
R code:
|
||||
|
@ -74,13 +74,12 @@ class LinearRegressionSuite
|
|||
w <- c(1, 2, 3, 4)
|
||||
df <- as.data.frame(cbind(A, b))
|
||||
*/
|
||||
datasetWithWeight = spark.createDataFrame(
|
||||
sc.parallelize(Seq(
|
||||
Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(29.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2))
|
||||
datasetWithWeight = sc.parallelize(Seq(
|
||||
Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(29.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2).toDF()
|
||||
|
||||
/*
|
||||
R code:
|
||||
|
@ -90,20 +89,18 @@ class LinearRegressionSuite
|
|||
w <- c(1, 2, 3, 4)
|
||||
df.const.label <- as.data.frame(cbind(A, b.const))
|
||||
*/
|
||||
datasetWithWeightConstantLabel = spark.createDataFrame(
|
||||
sc.parallelize(Seq(
|
||||
Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(17.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(17.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2))
|
||||
datasetWithWeightZeroLabel = spark.createDataFrame(
|
||||
sc.parallelize(Seq(
|
||||
Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(0.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2))
|
||||
datasetWithWeightConstantLabel = sc.parallelize(Seq(
|
||||
Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(17.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(17.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2).toDF()
|
||||
datasetWithWeightZeroLabel = sc.parallelize(Seq(
|
||||
Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
|
||||
Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)),
|
||||
Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)),
|
||||
Instance(0.0, 4.0, Vectors.dense(3.0, 13.0))
|
||||
), 2).toDF()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -839,8 +836,7 @@ class LinearRegressionSuite
|
|||
}
|
||||
val data2 = weightedSignedData ++ weightedNoiseData
|
||||
|
||||
(spark.createDataFrame(sc.parallelize(data1, 4)),
|
||||
spark.createDataFrame(sc.parallelize(data2, 4)))
|
||||
(sc.parallelize(data1, 4).toDF(), sc.parallelize(data2, 4).toDF())
|
||||
}
|
||||
|
||||
val trainer1a = (new LinearRegression).setFitIntercept(true)
|
||||
|
|
|
@ -32,13 +32,15 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
|
|||
*/
|
||||
class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext with Logging {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("runWithValidation stops early and performs better on a validation dataset") {
|
||||
// Set numIterations large enough so that it stops early.
|
||||
val numIterations = 20
|
||||
val trainRdd = sc.parallelize(OldGBTSuite.trainData, 2).map(_.asML)
|
||||
val validateRdd = sc.parallelize(OldGBTSuite.validateData, 2).map(_.asML)
|
||||
val trainDF = spark.createDataFrame(trainRdd)
|
||||
val validateDF = spark.createDataFrame(validateRdd)
|
||||
val trainDF = trainRdd.toDF()
|
||||
val validateDF = validateRdd.toDF()
|
||||
|
||||
val algos = Array(Regression, Regression, Classification)
|
||||
val losses = Array(SquaredError, AbsoluteError, LogLoss)
|
||||
|
|
|
@ -35,12 +35,13 @@ import org.apache.spark.sql.types.StructType
|
|||
class CrossValidatorSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
@transient var dataset: Dataset[_] = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
dataset = spark.createDataFrame(
|
||||
sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2))
|
||||
dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF()
|
||||
}
|
||||
|
||||
test("cross validation with logistic regression") {
|
||||
|
@ -67,9 +68,10 @@ class CrossValidatorSuite
|
|||
}
|
||||
|
||||
test("cross validation with linear regression") {
|
||||
val dataset = spark.createDataFrame(
|
||||
sc.parallelize(LinearDataGenerator.generateLinearInput(
|
||||
6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2).map(_.asML))
|
||||
val dataset = sc.parallelize(
|
||||
LinearDataGenerator.generateLinearInput(
|
||||
6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2)
|
||||
.map(_.asML).toDF()
|
||||
|
||||
val trainer = new LinearRegression().setSolver("l-bfgs")
|
||||
val lrParamMaps = new ParamGridBuilder()
|
||||
|
|
|
@ -33,9 +33,11 @@ import org.apache.spark.sql.types.StructType
|
|||
|
||||
class TrainValidationSplitSuite
|
||||
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("train validation with logistic regression") {
|
||||
val dataset = spark.createDataFrame(
|
||||
sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2))
|
||||
val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF()
|
||||
|
||||
val lr = new LogisticRegression
|
||||
val lrParamMaps = new ParamGridBuilder()
|
||||
|
@ -58,9 +60,10 @@ class TrainValidationSplitSuite
|
|||
}
|
||||
|
||||
test("train validation with linear regression") {
|
||||
val dataset = spark.createDataFrame(
|
||||
sc.parallelize(LinearDataGenerator.generateLinearInput(
|
||||
6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2).map(_.asML))
|
||||
val dataset = sc.parallelize(
|
||||
LinearDataGenerator.generateLinearInput(
|
||||
6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2)
|
||||
.map(_.asML).toDF()
|
||||
|
||||
val trainer = new LinearRegression().setSolver("l-bfgs")
|
||||
val lrParamMaps = new ParamGridBuilder()
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.apache.spark.util.Utils
|
|||
|
||||
class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
test("epsilon computation") {
|
||||
assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
|
||||
assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.")
|
||||
|
@ -255,9 +257,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
val z = Vectors.dense(4.0)
|
||||
val p = (5.0, z)
|
||||
val w = Vectors.dense(6.0).asML
|
||||
val df = spark.createDataFrame(Seq(
|
||||
(0, x, y, p, w)
|
||||
)).toDF("id", "x", "y", "p", "w")
|
||||
val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
|
||||
.withColumn("x", col("x"), metadata)
|
||||
val newDF1 = convertVectorColumnsToML(df)
|
||||
assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.")
|
||||
|
@ -282,9 +282,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
val z = Vectors.dense(4.0).asML
|
||||
val p = (5.0, z)
|
||||
val w = Vectors.dense(6.0)
|
||||
val df = spark.createDataFrame(Seq(
|
||||
(0, x, y, p, w)
|
||||
)).toDF("id", "x", "y", "p", "w")
|
||||
val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
|
||||
.withColumn("x", col("x"), metadata)
|
||||
val newDF1 = convertVectorColumnsFromML(df)
|
||||
assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.")
|
||||
|
@ -309,9 +307,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
val z = Matrices.ones(1, 1)
|
||||
val p = (5.0, z)
|
||||
val w = Matrices.dense(1, 1, Array(4.5)).asML
|
||||
val df = spark.createDataFrame(Seq(
|
||||
(0, x, y, p, w)
|
||||
)).toDF("id", "x", "y", "p", "w")
|
||||
val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
|
||||
.withColumn("x", col("x"), metadata)
|
||||
val newDF1 = convertMatrixColumnsToML(df)
|
||||
assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.")
|
||||
|
@ -336,9 +332,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
val z = Matrices.ones(1, 1).asML
|
||||
val p = (5.0, z)
|
||||
val w = Matrices.dense(1, 1, Array(4.5))
|
||||
val df = spark.createDataFrame(Seq(
|
||||
(0, x, y, p, w)
|
||||
)).toDF("id", "x", "y", "p", "w")
|
||||
val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
|
||||
.withColumn("x", col("x"), metadata)
|
||||
val newDF1 = convertMatrixColumnsFromML(df)
|
||||
assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.")
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.scalatest.Suite
|
|||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.ml.util.TempDirectory
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.{SparkSession, SQLContext, SQLImplicits}
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
trait MLlibTestSparkContext extends TempDirectory { self: Suite =>
|
||||
|
@ -55,4 +55,15 @@ trait MLlibTestSparkContext extends TempDirectory { self: Suite =>
|
|||
super.afterAll()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper object for importing SQL implicits.
|
||||
*
|
||||
* Note that the alternative of importing `spark.implicits._` is not possible here.
|
||||
* This is because we create the [[SQLContext]] immediately before the first test is run,
|
||||
* but the implicits import is needed in the constructor.
|
||||
*/
|
||||
protected object testImplicits extends SQLImplicits {
|
||||
protected override def _sqlContext: SQLContext = self.spark.sqlContext
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue