spark-instrumented-optimizer/R/pkg/inst/tests/testthat/test_mllib_classification.R
Yanbo Liang dbb06c689c [MINOR][ML] Fix some PySpark & SparkR flaky tests
## What changes were proposed in this pull request?
Some PySpark & SparkR tests run with tiny dataset and tiny ```maxIter```, which means they are not converged. I don’t think checking intermediate result during iteration make sense, and these intermediate result may vulnerable and not stable, so we should switch to check the converged result. We hit this issue at #17746 when we upgrade breeze to 0.13.1.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #17757 from yanboliang/flaky-test.
2017-04-26 21:34:18 +08:00

386 lines
14 KiB
R

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(testthat)
context("MLlib classification algorithms, except for tree-based algorithms")
# Tests for MLlib classification algorithms in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
absoluteSparkPath <- function(x) {
sparkHome <- sparkR.conf("spark.home")
file.path(sparkHome, x)
}
test_that("spark.svmLinear", {
df <- suppressWarnings(createDataFrame(iris))
training <- df[df$Species %in% c("versicolor", "virginica"), ]
model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10)
summary <- summary(model)
# test summary coefficients return matrix type
expect_true(class(summary$coefficients) == "matrix")
expect_true(class(summary$coefficients[, 1]) == "numeric")
coefs <- summary$coefficients[, "Estimate"]
expected_coefs <- c(-0.1563083, -0.460648, 0.2276626, 1.055085)
expect_true(all(abs(coefs - expected_coefs) < 0.1))
expect_equal(summary$intercept, -0.06004978, tolerance = 1e-2)
# Test prediction with string label
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
expected <- c("versicolor", "versicolor", "versicolor", "virginica", "virginica",
"virginica", "virginica", "virginica", "virginica", "virginica")
expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)
# Test model save and load
modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
coefs <- summary(model)$coefficients
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)
# Test prediction with numeric label
label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
data <- as.data.frame(cbind(label, feature))
df <- createDataFrame(data)
model <- spark.svmLinear(df, label ~ feature, regParam = 0.1)
prediction <- collect(select(predict(model, df), "prediction"))
expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
})
test_that("spark.logit", {
# R code to reproduce the result.
# nolint start
#' library(glmnet)
#' iris.x = as.matrix(iris[, 1:4])
#' iris.y = as.factor(as.character(iris[, 5]))
#' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
#' coef(logit)
#
# $setosa
# 5 x 1 sparse Matrix of class "dgCMatrix"
# s0
# 1.0981324
# Sepal.Length -0.2909860
# Sepal.Width 0.5510907
# Petal.Length -0.1915217
# Petal.Width -0.4211946
#
# $versicolor
# 5 x 1 sparse Matrix of class "dgCMatrix"
# s0
# 1.520061e+00
# Sepal.Length 2.524501e-02
# Sepal.Width -5.310313e-01
# Petal.Length 3.656543e-02
# Petal.Width -3.144464e-05
#
# $virginica
# 5 x 1 sparse Matrix of class "dgCMatrix"
# s0
# -2.61819385
# Sepal.Length 0.26574097
# Sepal.Width -0.02005932
# Petal.Length 0.15495629
# Petal.Width 0.42122607
# nolint end
# Test multinomial logistic regression againt three classes
df <- suppressWarnings(createDataFrame(iris))
model <- spark.logit(df, Species ~ ., regParam = 0.5)
summary <- summary(model)
# test summary coefficients return matrix type
expect_true(class(summary$coefficients) == "matrix")
expect_true(class(summary$coefficients[, 1]) == "numeric")
versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00)
virginicaCoefsR <- c(-2.62, 0.27, -0.02, 0.16, 0.42)
setosaCoefsR <- c(1.10, -0.29, 0.55, -0.19, -0.42)
versicolorCoefs <- summary$coefficients[, "versicolor"]
virginicaCoefs <- summary$coefficients[, "virginica"]
setosaCoefs <- summary$coefficients[, "setosa"]
expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1))
# Test model save and load
modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
coefs <- summary(model)$coefficients
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)
# R code to reproduce the result.
# nolint start
#' library(glmnet)
#' iris2 <- iris[iris$Species %in% c("versicolor", "virginica"), ]
#' iris.x = as.matrix(iris2[, 1:4])
#' iris.y = as.factor(as.character(iris2[, 5]))
#' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
#' coef(logit)
#
# $versicolor
# 5 x 1 sparse Matrix of class "dgCMatrix"
# s0
# 3.93844796
# Sepal.Length -0.13538675
# Sepal.Width -0.02386443
# Petal.Length -0.35076451
# Petal.Width -0.77971954
#
# $virginica
# 5 x 1 sparse Matrix of class "dgCMatrix"
# s0
# -3.93844796
# Sepal.Length 0.13538675
# Sepal.Width 0.02386443
# Petal.Length 0.35076451
# Petal.Width 0.77971954
#
#' logit = glmnet(iris.x, iris.y, family="binomial", alpha=0, lambda=0.5)
#' coef(logit)
#
# 5 x 1 sparse Matrix of class "dgCMatrix"
# s0
# (Intercept) -6.0824412
# Sepal.Length 0.2458260
# Sepal.Width 0.1642093
# Petal.Length 0.4759487
# Petal.Width 1.0383948
#
# nolint end
# Test multinomial logistic regression againt two classes
df <- suppressWarnings(createDataFrame(iris))
training <- df[df$Species %in% c("versicolor", "virginica"), ]
model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
summary <- summary(model)
versicolorCoefsR <- c(3.94, -0.16, -0.02, -0.35, -0.78)
virginicaCoefsR <- c(-3.94, 0.16, -0.02, 0.35, 0.78)
versicolorCoefs <- summary$coefficients[, "versicolor"]
virginicaCoefs <- summary$coefficients[, "virginica"]
expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
# Test binomial logistic regression againt two classes
model <- spark.logit(training, Species ~ ., regParam = 0.5)
summary <- summary(model)
coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
coefs <- summary$coefficients[, "Estimate"]
expect_true(all(abs(coefsR - coefs) < 0.1))
# Test prediction with string label
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
expected <- c("versicolor", "versicolor", "virginica", "versicolor", "versicolor",
"versicolor", "versicolor", "versicolor", "versicolor", "versicolor")
expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)
# Test prediction with numeric label
label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
data <- as.data.frame(cbind(label, feature))
df <- createDataFrame(data)
model <- spark.logit(df, label ~ feature)
prediction <- collect(select(predict(model, df), "prediction"))
expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
# Test prediction with weightCol
weight <- c(2.0, 2.0, 2.0, 1.0, 1.0)
data2 <- as.data.frame(cbind(label, feature, weight))
df2 <- createDataFrame(data2)
model2 <- spark.logit(df2, label ~ feature, weightCol = "weight")
prediction2 <- collect(select(predict(model2, df2), "prediction"))
expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))
})
test_that("spark.mlp", {
df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
source = "libsvm")
model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3),
solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
# Test summary method
summary <- summary(model)
expect_equal(summary$numOfInputs, 4)
expect_equal(summary$numOfOutputs, 3)
expect_equal(summary$layers, c(4, 5, 4, 3))
expect_equal(length(summary$weights), 64)
expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825),
tolerance = 1e-6)
# Test predict method
mlpTestDF <- df
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
# Test model save/load
modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)
expect_equal(summary2$numOfInputs, 4)
expect_equal(summary2$numOfOutputs, 3)
expect_equal(summary2$layers, c(4, 5, 4, 3))
expect_equal(length(summary2$weights), 64)
unlink(modelPath)
# Test default parameter
model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
# Test illegal parameter
expect_error(spark.mlp(df, label ~ features, layers = NULL),
"layers must be a integer vector with length > 1.")
expect_error(spark.mlp(df, label ~ features, layers = c()),
"layers must be a integer vector with length > 1.")
expect_error(spark.mlp(df, label ~ features, layers = c(3)),
"layers must be a integer vector with length > 1.")
# Test random seed
# default seed
model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10)
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
# seed equals 10
model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
# test initialWeights
model <- spark.mlp(df, label ~ features, layers = c(4, 3), initialWeights =
c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
# Test formula works well
df <- suppressWarnings(createDataFrame(iris))
model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width,
layers = c(4, 3))
summary <- summary(model)
expect_equal(summary$numOfInputs, 4)
expect_equal(summary$numOfOutputs, 3)
expect_equal(summary$layers, c(4, 3))
expect_equal(length(summary$weights), 15)
})
test_that("spark.naiveBayes", {
# R code to reproduce the result.
# We do not support instance weights yet. So we ignore the frequencies.
#
#' library(e1071)
#' t <- as.data.frame(Titanic)
#' t1 <- t[t$Freq > 0, -5]
#' m <- naiveBayes(Survived ~ ., data = t1)
#' m
#' predict(m, t1)
#
# -- output of 'm'
#
# A-priori probabilities:
# Y
# No Yes
# 0.4166667 0.5833333
#
# Conditional probabilities:
# Class
# Y 1st 2nd 3rd Crew
# No 0.2000000 0.2000000 0.4000000 0.2000000
# Yes 0.2857143 0.2857143 0.2857143 0.1428571
#
# Sex
# Y Male Female
# No 0.5 0.5
# Yes 0.5 0.5
#
# Age
# Y Child Adult
# No 0.2000000 0.8000000
# Yes 0.4285714 0.5714286
#
# -- output of 'predict(m, t1)'
#
# Yes Yes Yes Yes No No Yes Yes No No Yes Yes Yes Yes Yes Yes Yes Yes No No Yes Yes No No
#
t <- as.data.frame(Titanic)
t1 <- t[t$Freq > 0, -5]
df <- suppressWarnings(createDataFrame(t1))
m <- spark.naiveBayes(df, Survived ~ ., smoothing = 0.0)
s <- summary(m)
expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6)
expect_equal(sum(s$apriori), 1)
expect_equal(as.double(s$tables["Yes", "Age_Adult"]), 0.5714286, tolerance = 1e-6)
p <- collect(select(predict(m, df), "prediction"))
expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No",
"Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No",
"Yes", "Yes", "No", "No"))
# Test model save/load
modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp")
write.ml(m, modelPath)
expect_error(write.ml(m, modelPath))
write.ml(m, modelPath, overwrite = TRUE)
m2 <- read.ml(modelPath)
s2 <- summary(m2)
expect_equal(s$apriori, s2$apriori)
expect_equal(s$tables, s2$tables)
unlink(modelPath)
# Test e1071::naiveBayes
if (requireNamespace("e1071", quietly = TRUE)) {
expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA)
expect_equal(as.character(predict(m, t1[1, ])), "Yes")
}
# Test numeric response variable
t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1)
t2 <- t1[-4]
df <- suppressWarnings(createDataFrame(t2))
m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0)
s <- summary(m)
expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6)
expect_equal(sum(s$apriori), 1)
expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6)
})
sparkR.session.stop()