11d2b07b74
### What changes were proposed in this pull request?
This PR proposes to ignore S4 generic methods under SparkR namespace in closure cleaning to support R 4.0.0+.
Currently, when you run the codes that runs R native codes, it fails as below with R 4.0.0:
```r
df <- createDataFrame(lapply(seq(100), function (e) list(value=e)))
count(dapply(df, function(x) as.data.frame(x[x$value < 50,]), schema(df)))
```
```
org.apache.spark.SparkException: R unexpectedly exited.
R worker produced errors: Error in lapply(part, FUN) : attempt to bind a variable to R_UnboundValue
```
The root cause seems to be related to when an S4 generic method is manually included into the closure's environment via `SparkR:::cleanClosure`. For example, when an RRDD is created via `createDataFrame` with calling `lapply` to convert, `lapply` itself:
f53d8c63e8/R/pkg/R/RDD.R (L484)
is added into the environment of the cleaned closure - because this is not an exposed namespace; however, this is broken in R 4.0.0+ for an unknown reason with an error message such as "attempt to bind a variable to R_UnboundValue".
Actually, we don't need to add the `lapply` into the environment of the closure because it is not supposed to be called in worker side. In fact, there is no private generic methods supposed to be called in worker side in SparkR at all from my understanding.
Therefore, this PR takes a simpler path to work around just by explicitly excluding the S4 generic methods under SparkR namespace to support R 4.0.0. in SparkR.
### Why are the changes needed?
To support R 4.0.0+ with SparkR, and unblock the releases on CRAN. CRAN requires the tests pass with the latest R.
### Does this PR introduce _any_ user-facing change?
Yes, it will support R 4.0.0 to end-users.
### How was this patch tested?
Manually tested. Both CRAN and tests with R 4.0.1:
```
══ testthat results ═══════════════════════════════════════════════════════════
[ OK: 13 | SKIPPED: 0 | WARNINGS: 0 | FAILED: 0 ]
✔ | OK F W S | Context
✔ | 11 | binary functions [2.5 s]
✔ | 4 | functions on binary files [2.1 s]
✔ | 2 | broadcast variables [0.5 s]
✔ | 5 | functions in client.R
✔ | 46 | test functions in sparkR.R [6.3 s]
✔ | 2 | include R packages [0.3 s]
✔ | 2 | JVM API [0.2 s]
✔ | 75 | MLlib classification algorithms, except for tree-based algorithms [86.3 s]
✔ | 70 | MLlib clustering algorithms [44.5 s]
✔ | 6 | MLlib frequent pattern mining [3.0 s]
✔ | 8 | MLlib recommendation algorithms [9.6 s]
✔ | 136 | MLlib regression algorithms, except for tree-based algorithms [76.0 s]
✔ | 8 | MLlib statistics algorithms [0.6 s]
✔ | 94 | MLlib tree-based algorithms [85.2 s]
✔ | 29 | parallelize() and collect() [0.5 s]
✔ | 428 | basic RDD functions [25.3 s]
✔ | 39 | SerDe functionality [2.2 s]
✔ | 20 | partitionBy, groupByKey, reduceByKey etc. [3.9 s]
✔ | 4 | functions in sparkR.R
✔ | 16 | SparkSQL Arrow optimization [19.2 s]
✔ | 6 | test show SparkDataFrame when eager execution is enabled. [1.1 s]
✔ | 1175 | SparkSQL functions [134.8 s]
✔ | 42 | Structured Streaming [478.2 s]
✔ | 16 | tests RDD function take() [1.1 s]
✔ | 14 | the textFile() function [2.9 s]
✔ | 46 | functions in utils.R [0.7 s]
✔ | 0 1 | Windows-specific tests
────────────────────────────────────────────────────────────────────────────────
test_Windows.R:22: skip: sparkJars tag in SparkContext
Reason: This test is only for Windows, skipped
────────────────────────────────────────────────────────────────────────────────
══ Results ═════════════════════════════════════════════════════════════════════
Duration: 987.3 s
OK: 2304
Failed: 0
Warnings: 0
Skipped: 1
...
Status: OK
+ popd
Tests passed.
```
Note that I tested to build SparkR in R 4.0.0, and run the tests with R 3.6.3. It all passed. See also [the comment in the JIRA](https://issues.apache.org/jira/browse/SPARK-31918?focusedCommentId=17142837&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17142837).
Closes #28907 from HyukjinKwon/SPARK-31918.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
614 lines
24 KiB
R
614 lines
24 KiB
R
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
library(testthat)
|
|
|
|
context("MLlib regression algorithms, except for tree-based algorithms")
|
|
|
|
# Tests for MLlib regression algorithms in SparkR
|
|
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
|
|
|
|
test_that("formula of spark.glm", {
|
|
training <- suppressWarnings(createDataFrame(iris))
|
|
# directly calling the spark API
|
|
# dot minus and intercept vs native glm
|
|
model <- spark.glm(training, Sepal_Width ~ . - Species + 0)
|
|
vals <- collect(select(predict(model, training), "prediction"))
|
|
rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
|
|
# feature interaction vs native glm
|
|
model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length)
|
|
vals <- collect(select(predict(model, training), "prediction"))
|
|
rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
|
|
# glm should work with long formula
|
|
training <- suppressWarnings(createDataFrame(iris))
|
|
training$LongLongLongLongLongName <- training$Sepal_Width
|
|
training$VeryLongLongLongLonLongName <- training$Sepal_Length
|
|
training$AnotherLongLongLongLongName <- training$Species
|
|
model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName +
|
|
AnotherLongLongLongLongName)
|
|
vals <- collect(select(predict(model, training), "prediction"))
|
|
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
})
|
|
|
|
test_that("spark.glm and predict", {
|
|
training <- suppressWarnings(createDataFrame(iris))
|
|
# gaussian family
|
|
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
|
|
prediction <- predict(model, training)
|
|
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
|
|
vals <- collect(select(prediction, "prediction"))
|
|
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
|
|
# poisson family
|
|
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
|
|
family = poisson(link = identity))
|
|
prediction <- predict(model, training)
|
|
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
|
|
vals <- collect(select(prediction, "prediction"))
|
|
rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
|
|
data = iris, family = poisson(link = identity)), iris))
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
|
|
# Gamma family
|
|
x <- runif(100, -1, 1)
|
|
y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
|
|
df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
|
|
model <- glm(y ~ x, family = Gamma, df)
|
|
out <- capture.output(print(summary(model)))
|
|
expect_true(any(grepl("Dispersion parameter for gamma family", out)))
|
|
|
|
# tweedie family
|
|
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
|
|
family = "tweedie", var.power = 1.2, link.power = 0.0)
|
|
prediction <- predict(model, training)
|
|
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
|
|
vals <- collect(select(prediction, "prediction"))
|
|
|
|
# manual calculation of the R predicted values to avoid dependence on statmod
|
|
#' library(statmod)
|
|
#' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
|
|
#' family = tweedie(var.power = 1.2, link.power = 0.0))
|
|
#' print(coef(rModel))
|
|
|
|
rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
|
|
rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
|
|
data = iris) %*% rCoef))
|
|
expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)
|
|
|
|
# Test stats::predict is working
|
|
x <- rnorm(15)
|
|
y <- x + rnorm(15)
|
|
expect_equal(length(predict(lm(y ~ x))), 15)
|
|
})
|
|
|
|
test_that("spark.glm summary", {
|
|
# prepare dataset
|
|
Sepal.Length <- c(2.0, 1.5, 1.8, 3.4, 5.1, 1.8, 1.0, 2.3)
|
|
Sepal.Width <- c(2.1, 2.3, 5.4, 4.7, 3.1, 2.1, 3.1, 5.5)
|
|
Petal.Length <- c(1.8, 2.1, 7.1, 2.5, 3.7, 6.3, 2.2, 7.2)
|
|
Species <- c("setosa", "versicolor", "versicolor", "versicolor", "virginica", "virginica",
|
|
"versicolor", "virginica")
|
|
dataset <- data.frame(Sepal.Length, Sepal.Width, Petal.Length, Species, stringsAsFactors = TRUE)
|
|
|
|
# gaussian family
|
|
training <- suppressWarnings(createDataFrame(dataset))
|
|
stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species))
|
|
rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset))
|
|
|
|
# test summary coefficients return matrix type
|
|
expect_true(any(class(stats$coefficients) == "matrix"))
|
|
expect_true(class(stats$coefficients[, 1]) == "numeric")
|
|
|
|
coefs <- stats$coefficients
|
|
rCoefs <- rStats$coefficients
|
|
expect_true(all(abs(rCoefs - coefs) < 1e-4))
|
|
expect_true(all(
|
|
rownames(stats$coefficients) ==
|
|
c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
|
|
expect_equal(stats$dispersion, rStats$dispersion)
|
|
expect_equal(stats$null.deviance, rStats$null.deviance)
|
|
expect_equal(stats$deviance, rStats$deviance)
|
|
expect_equal(stats$df.null, rStats$df.null)
|
|
expect_equal(stats$df.residual, rStats$df.residual)
|
|
expect_equal(stats$aic, rStats$aic)
|
|
|
|
out <- capture.output(print(stats))
|
|
expect_match(out[2], "Deviance Residuals:")
|
|
expect_true(any(grepl("AIC: 35.84", out)))
|
|
|
|
# binomial family
|
|
df <- suppressWarnings(createDataFrame(dataset))
|
|
training <- df[df$Species %in% c("versicolor", "virginica"), ]
|
|
stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width,
|
|
family = binomial(link = "logit")))
|
|
|
|
rTraining <- dataset[dataset$Species %in% c("versicolor", "virginica"), ]
|
|
rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
|
|
family = binomial(link = "logit")))
|
|
|
|
coefs <- stats$coefficients
|
|
rCoefs <- rStats$coefficients
|
|
expect_true(all(abs(rCoefs - coefs) < 1e-4))
|
|
expect_true(all(
|
|
rownames(stats$coefficients) ==
|
|
c("(Intercept)", "Sepal_Length", "Sepal_Width")))
|
|
expect_equal(stats$dispersion, rStats$dispersion)
|
|
expect_equal(stats$null.deviance, rStats$null.deviance)
|
|
expect_equal(stats$deviance, rStats$deviance)
|
|
expect_equal(stats$df.null, rStats$df.null)
|
|
expect_equal(stats$df.residual, rStats$df.residual)
|
|
expect_equal(stats$aic, rStats$aic)
|
|
|
|
# Test spark.glm works with weighted dataset
|
|
a1 <- c(0, 1, 2, 3)
|
|
a2 <- c(5, 2, 1, 3)
|
|
w <- c(1, 2, 3, 4)
|
|
b <- c(1, 0, 1, 0)
|
|
data <- as.data.frame(cbind(a1, a2, w, b))
|
|
df <- createDataFrame(data)
|
|
|
|
stats <- summary(spark.glm(df, b ~ a1 + a2, family = "binomial", weightCol = "w"))
|
|
rStats <- summary(glm(b ~ a1 + a2, family = "binomial", data = data, weights = w))
|
|
|
|
coefs <- stats$coefficients
|
|
rCoefs <- rStats$coefficients
|
|
expect_true(all(abs(rCoefs - coefs) < 1e-3))
|
|
expect_true(all(rownames(stats$coefficients) == c("(Intercept)", "a1", "a2")))
|
|
expect_equal(stats$dispersion, rStats$dispersion)
|
|
expect_equal(stats$null.deviance, rStats$null.deviance)
|
|
expect_equal(stats$deviance, rStats$deviance)
|
|
expect_equal(stats$df.null, rStats$df.null)
|
|
expect_equal(stats$df.residual, rStats$df.residual)
|
|
expect_equal(stats$aic, rStats$aic)
|
|
|
|
# Test spark.glm works with offset
|
|
training <- suppressWarnings(createDataFrame(dataset))
|
|
stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
|
|
family = poisson(), offsetCol = "Petal_Length"))
|
|
rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + Species,
|
|
data = dataset, family = poisson(), offset = dataset$Petal.Length)))
|
|
expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3))
|
|
|
|
# Test summary works on base GLM models
|
|
baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = dataset)
|
|
baseSummary <- summary(baseModel)
|
|
expect_true(abs(baseSummary$deviance - 11.84013) < 1e-4)
|
|
|
|
# Test spark.glm works with regularization parameter
|
|
data <- as.data.frame(cbind(a1, a2, b))
|
|
df <- suppressWarnings(createDataFrame(data))
|
|
regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0))
|
|
expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result
|
|
|
|
# Test spark.glm works on collinear data
|
|
A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2)
|
|
b <- c(1, 2, 3, 4)
|
|
data <- as.data.frame(cbind(A, b))
|
|
df <- createDataFrame(data)
|
|
stats <- summary(spark.glm(df, b ~ . - 1))
|
|
coefs <- stats$coefficients
|
|
expect_true(all(abs(c(0.5, 0.25) - coefs) < 1e-4))
|
|
})
|
|
|
|
test_that("spark.glm save/load", {
|
|
training <- suppressWarnings(createDataFrame(iris))
|
|
m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
|
|
s <- summary(m)
|
|
|
|
modelPath <- tempfile(pattern = "spark-glm", fileext = ".tmp")
|
|
write.ml(m, modelPath)
|
|
expect_error(write.ml(m, modelPath))
|
|
write.ml(m, modelPath, overwrite = TRUE)
|
|
m2 <- read.ml(modelPath)
|
|
s2 <- summary(m2)
|
|
|
|
expect_equal(s$coefficients, s2$coefficients)
|
|
expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
|
|
expect_equal(s$dispersion, s2$dispersion)
|
|
expect_equal(s$null.deviance, s2$null.deviance)
|
|
expect_equal(s$deviance, s2$deviance)
|
|
expect_equal(s$df.null, s2$df.null)
|
|
expect_equal(s$df.residual, s2$df.residual)
|
|
expect_equal(s$aic, s2$aic)
|
|
expect_equal(s$iter, s2$iter)
|
|
expect_true(!s$is.loaded)
|
|
expect_true(s2$is.loaded)
|
|
|
|
unlink(modelPath)
|
|
})
|
|
|
|
test_that("formula of glm", {
|
|
training <- suppressWarnings(createDataFrame(iris))
|
|
# dot minus and intercept vs native glm
|
|
model <- glm(Sepal_Width ~ . - Species + 0, data = training)
|
|
vals <- collect(select(predict(model, training), "prediction"))
|
|
rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
|
|
# feature interaction vs native glm
|
|
model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
|
|
vals <- collect(select(predict(model, training), "prediction"))
|
|
rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
|
|
# glm should work with long formula
|
|
training <- suppressWarnings(createDataFrame(iris))
|
|
training$LongLongLongLongLongName <- training$Sepal_Width
|
|
training$VeryLongLongLongLonLongName <- training$Sepal_Length
|
|
training$AnotherLongLongLongLongName <- training$Species
|
|
model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName,
|
|
data = training)
|
|
vals <- collect(select(predict(model, training), "prediction"))
|
|
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
})
|
|
|
|
test_that("glm and predict", {
|
|
training <- suppressWarnings(createDataFrame(iris))
|
|
# gaussian family
|
|
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
|
|
prediction <- predict(model, training)
|
|
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
|
|
vals <- collect(select(prediction, "prediction"))
|
|
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
|
|
# poisson family
|
|
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
|
|
family = poisson(link = identity))
|
|
prediction <- predict(model, training)
|
|
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
|
|
vals <- collect(select(prediction, "prediction"))
|
|
rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
|
|
data = iris, family = poisson(link = identity)), iris))
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
|
|
# tweedie family
|
|
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
|
|
family = "tweedie", var.power = 1.2, link.power = 0.0)
|
|
prediction <- predict(model, training)
|
|
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
|
|
vals <- collect(select(prediction, "prediction"))
|
|
|
|
# manual calculation of the R predicted values to avoid dependence on statmod
|
|
#' library(statmod)
|
|
#' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
|
|
#' family = tweedie(var.power = 1.2, link.power = 0.0))
|
|
#' print(coef(rModel))
|
|
|
|
rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
|
|
rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
|
|
data = iris) %*% rCoef))
|
|
expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)
|
|
|
|
# Test stats::predict is working
|
|
x <- rnorm(15)
|
|
y <- x + rnorm(15)
|
|
expect_equal(length(predict(lm(y ~ x))), 15)
|
|
})
|
|
|
|
test_that("glm summary", {
|
|
# prepare dataset
|
|
Sepal.Length <- c(2.0, 1.5, 1.8, 3.4, 5.1, 1.8, 1.0, 2.3)
|
|
Sepal.Width <- c(2.1, 2.3, 5.4, 4.7, 3.1, 2.1, 3.1, 5.5)
|
|
Petal.Length <- c(1.8, 2.1, 7.1, 2.5, 3.7, 6.3, 2.2, 7.2)
|
|
Species <- c("setosa", "versicolor", "versicolor", "versicolor", "virginica", "virginica",
|
|
"versicolor", "virginica")
|
|
dataset <- data.frame(Sepal.Length, Sepal.Width, Petal.Length, Species, stringsAsFactors = TRUE)
|
|
|
|
# gaussian family
|
|
training <- suppressWarnings(createDataFrame(dataset))
|
|
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))
|
|
|
|
rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset))
|
|
|
|
coefs <- stats$coefficients
|
|
rCoefs <- rStats$coefficients
|
|
expect_true(all(abs(rCoefs - coefs) < 1e-4))
|
|
expect_true(all(
|
|
rownames(stats$coefficients) ==
|
|
c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
|
|
expect_equal(stats$dispersion, rStats$dispersion)
|
|
expect_equal(stats$null.deviance, rStats$null.deviance)
|
|
expect_equal(stats$deviance, rStats$deviance)
|
|
expect_equal(stats$df.null, rStats$df.null)
|
|
expect_equal(stats$df.residual, rStats$df.residual)
|
|
expect_equal(stats$aic, rStats$aic)
|
|
|
|
# binomial family
|
|
df <- suppressWarnings(createDataFrame(dataset))
|
|
training <- df[df$Species %in% c("versicolor", "virginica"), ]
|
|
stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
|
|
family = binomial(link = "logit")))
|
|
|
|
rTraining <- dataset[dataset$Species %in% c("versicolor", "virginica"), ]
|
|
rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
|
|
family = binomial(link = "logit")))
|
|
|
|
coefs <- stats$coefficients
|
|
rCoefs <- rStats$coefficients
|
|
expect_true(all(abs(rCoefs - coefs) < 1e-4))
|
|
expect_true(all(
|
|
rownames(stats$coefficients) ==
|
|
c("(Intercept)", "Sepal_Length", "Sepal_Width")))
|
|
expect_equal(stats$dispersion, rStats$dispersion)
|
|
expect_equal(stats$null.deviance, rStats$null.deviance)
|
|
expect_equal(stats$deviance, rStats$deviance)
|
|
expect_equal(stats$df.null, rStats$df.null)
|
|
expect_equal(stats$df.residual, rStats$df.residual)
|
|
expect_equal(stats$aic, rStats$aic)
|
|
|
|
# Test summary works on base GLM models
|
|
baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
|
|
baseSummary <- summary(baseModel)
|
|
expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
|
|
})
|
|
|
|
test_that("glm save/load", {
|
|
training <- suppressWarnings(createDataFrame(iris))
|
|
m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
|
|
s <- summary(m)
|
|
|
|
modelPath <- tempfile(pattern = "glm", fileext = ".tmp")
|
|
write.ml(m, modelPath)
|
|
expect_error(write.ml(m, modelPath))
|
|
write.ml(m, modelPath, overwrite = TRUE)
|
|
m2 <- read.ml(modelPath)
|
|
s2 <- summary(m2)
|
|
|
|
expect_equal(s$coefficients, s2$coefficients)
|
|
expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
|
|
expect_equal(s$dispersion, s2$dispersion)
|
|
expect_equal(s$null.deviance, s2$null.deviance)
|
|
expect_equal(s$deviance, s2$deviance)
|
|
expect_equal(s$df.null, s2$df.null)
|
|
expect_equal(s$df.residual, s2$df.residual)
|
|
expect_equal(s$aic, s2$aic)
|
|
expect_equal(s$iter, s2$iter)
|
|
expect_true(!s$is.loaded)
|
|
expect_true(s2$is.loaded)
|
|
|
|
unlink(modelPath)
|
|
})
|
|
|
|
test_that("spark.glm and glm with string encoding", {
|
|
t <- as.data.frame(Titanic, stringsAsFactors = FALSE)
|
|
df <- createDataFrame(t)
|
|
|
|
# base R
|
|
rm <- stats::glm(Freq ~ Sex + Age, family = "gaussian", data = t)
|
|
# spark.glm with default stringIndexerOrderType = "frequencyDesc"
|
|
sm0 <- spark.glm(df, Freq ~ Sex + Age, family = "gaussian")
|
|
# spark.glm with stringIndexerOrderType = "alphabetDesc"
|
|
sm1 <- spark.glm(df, Freq ~ Sex + Age, family = "gaussian",
|
|
stringIndexerOrderType = "alphabetDesc")
|
|
# glm with stringIndexerOrderType = "alphabetDesc"
|
|
sm2 <- glm(Freq ~ Sex + Age, family = "gaussian", data = df,
|
|
stringIndexerOrderType = "alphabetDesc")
|
|
|
|
rStats <- summary(rm)
|
|
rCoefs <- rStats$coefficients
|
|
sStats <- lapply(list(sm0, sm1, sm2), summary)
|
|
# order by coefficient size since column rendering may be different
|
|
o <- order(rCoefs[, 1])
|
|
|
|
# default encoding does not produce same results as R
|
|
expect_false(all(abs(rCoefs[o, ] - sStats[[1]]$coefficients[o, ]) < 1e-4))
|
|
|
|
# all estimates should be the same as R with stringIndexerOrderType = "alphabetDesc"
|
|
test <- lapply(sStats[2:3], function(stats) {
|
|
expect_true(all(abs(rCoefs[o, ] - stats$coefficients[o, ]) < 1e-4))
|
|
expect_equal(stats$dispersion, rStats$dispersion)
|
|
expect_equal(stats$null.deviance, rStats$null.deviance)
|
|
expect_equal(stats$deviance, rStats$deviance)
|
|
expect_equal(stats$df.null, rStats$df.null)
|
|
expect_equal(stats$df.residual, rStats$df.residual)
|
|
expect_equal(stats$aic, rStats$aic)
|
|
})
|
|
|
|
# fitted values should be equal regardless of string encoding
|
|
rVals <- predict(rm, t)
|
|
test <- lapply(list(sm0, sm1, sm2), function(sm) {
|
|
vals <- collect(select(predict(sm, df), "prediction"))
|
|
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
|
|
})
|
|
})
|
|
|
|
test_that("spark.isoreg", {
|
|
label <- c(7.0, 5.0, 3.0, 5.0, 1.0)
|
|
feature <- c(0.0, 1.0, 2.0, 3.0, 4.0)
|
|
weight <- c(1.0, 1.0, 1.0, 1.0, 1.0)
|
|
data <- as.data.frame(cbind(label, feature, weight))
|
|
df <- createDataFrame(data)
|
|
|
|
model <- spark.isoreg(df, label ~ feature, isotonic = FALSE,
|
|
weightCol = "weight")
|
|
# only allow one variable on the right hand side of the formula
|
|
expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE))
|
|
result <- summary(model)
|
|
expect_equal(result$predictions, list(7, 5, 4, 4, 1))
|
|
|
|
# Test model prediction
|
|
predict_data <- list(list(-2.0), list(-1.0), list(0.5),
|
|
list(0.75), list(1.0), list(2.0), list(9.0))
|
|
predict_df <- createDataFrame(predict_data, c("feature"))
|
|
predict_result <- collect(select(predict(model, predict_df), "prediction"))
|
|
expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0))
|
|
|
|
# Test model save/load
|
|
if (windows_with_hadoop()) {
|
|
modelPath <- tempfile(pattern = "spark-isoreg", fileext = ".tmp")
|
|
write.ml(model, modelPath)
|
|
expect_error(write.ml(model, modelPath))
|
|
write.ml(model, modelPath, overwrite = TRUE)
|
|
model2 <- read.ml(modelPath)
|
|
expect_equal(result, summary(model2))
|
|
|
|
unlink(modelPath)
|
|
}
|
|
})
|
|
|
|
test_that("spark.survreg", {
|
|
# R code to reproduce the result.
|
|
#
|
|
#' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
|
|
#' x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
|
|
#' library(survival)
|
|
#' model <- survreg(Surv(time, status) ~ x + sex, rData)
|
|
#' summary(model)
|
|
#' predict(model, data)
|
|
#
|
|
# -- output of 'summary(model)'
|
|
#
|
|
# Value Std. Error z p
|
|
# (Intercept) 1.315 0.270 4.88 1.07e-06
|
|
# x -0.190 0.173 -1.10 2.72e-01
|
|
# sex -0.253 0.329 -0.77 4.42e-01
|
|
# Log(scale) -1.160 0.396 -2.93 3.41e-03
|
|
#
|
|
# -- output of 'predict(model, data)'
|
|
#
|
|
# 1 2 3 4 5 6 7
|
|
# 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269
|
|
#
|
|
data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0),
|
|
list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1))
|
|
df <- createDataFrame(data, c("time", "status", "x", "sex"))
|
|
model <- spark.survreg(df, Surv(time, status) ~ x + sex)
|
|
stats <- summary(model)
|
|
coefs <- as.vector(stats$coefficients[, 1])
|
|
rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800)
|
|
expect_equal(coefs, rCoefs, tolerance = 1e-4)
|
|
expect_true(all(
|
|
rownames(stats$coefficients) ==
|
|
c("(Intercept)", "x", "sex", "Log(scale)")))
|
|
p <- collect(select(predict(model, df), "prediction"))
|
|
expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035,
|
|
2.390146, 2.891269, 2.891269), tolerance = 1e-4)
|
|
|
|
# Test model save/load
|
|
if (windows_with_hadoop()) {
|
|
modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp")
|
|
write.ml(model, modelPath)
|
|
expect_error(write.ml(model, modelPath))
|
|
write.ml(model, modelPath, overwrite = TRUE)
|
|
model2 <- read.ml(modelPath)
|
|
stats2 <- summary(model2)
|
|
coefs2 <- as.vector(stats2$coefficients[, 1])
|
|
expect_equal(coefs, coefs2)
|
|
expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients))
|
|
|
|
unlink(modelPath)
|
|
}
|
|
|
|
# Test survival::survreg
|
|
if (requireNamespace("survival", quietly = TRUE)) {
|
|
rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
|
|
x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
|
|
expect_error(
|
|
model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData),
|
|
NA)
|
|
expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4)
|
|
|
|
# Test stringIndexerOrderType
|
|
rData <- as.data.frame(rData)
|
|
rData$sex2 <- c("female", "male")[rData$sex + 1]
|
|
df <- createDataFrame(rData)
|
|
expect_error(
|
|
rModel <- survival::survreg(survival::Surv(time, status) ~ x + sex2, rData), NA)
|
|
rCoefs <- as.numeric(summary(rModel)$table[, 1])
|
|
model <- spark.survreg(df, Surv(time, status) ~ x + sex2)
|
|
coefs <- as.vector(summary(model)$coefficients[, 1])
|
|
o <- order(rCoefs)
|
|
# stringIndexerOrderType = "frequencyDesc" produces different estimates from R
|
|
expect_false(all(abs(rCoefs[o] - coefs[o]) < 1e-4))
|
|
|
|
# stringIndexerOrderType = "alphabetDesc" produces the same estimates as R
|
|
model <- spark.survreg(df, Surv(time, status) ~ x + sex2,
|
|
stringIndexerOrderType = "alphabetDesc")
|
|
coefs <- as.vector(summary(model)$coefficients[, 1])
|
|
expect_true(all(abs(rCoefs[o] - coefs[o]) < 1e-4))
|
|
}
|
|
|
|
test_that("spark.lm", {
|
|
df <- suppressWarnings(createDataFrame(iris))
|
|
|
|
model <- spark.lm(
|
|
df, Sepal_Width ~ .,
|
|
regParam = 0.01, maxIter = 10
|
|
)
|
|
|
|
prediction1 <- predict(model, df)
|
|
expect_is(prediction1, "SparkDataFrame")
|
|
|
|
# Test model save/load
|
|
if (windows_with_hadoop()) {
|
|
modelPath <- tempfile(pattern = "spark-lm", fileext = ".tmp")
|
|
write.ml(model, modelPath)
|
|
model2 <- read.ml(modelPath)
|
|
|
|
expect_is(model2, "LinearRegressionModel")
|
|
expect_equal(summary(model), summary(model2))
|
|
|
|
prediction2 <- predict(model2, df)
|
|
expect_equal(
|
|
collect(prediction1),
|
|
collect(prediction2)
|
|
)
|
|
unlink(modelPath)
|
|
}
|
|
})
|
|
})
|
|
|
|
|
|
test_that("spark.fmRegressor", {
|
|
df <- suppressWarnings(createDataFrame(iris))
|
|
|
|
model <- spark.fmRegressor(
|
|
df, Sepal_Width ~ .,
|
|
regParam = 0.01, maxIter = 10, fitLinear = TRUE
|
|
)
|
|
|
|
prediction1 <- predict(model, df)
|
|
expect_is(prediction1, "SparkDataFrame")
|
|
|
|
# Test model save/load
|
|
if (windows_with_hadoop()) {
|
|
modelPath <- tempfile(pattern = "spark-fmregressor", fileext = ".tmp")
|
|
write.ml(model, modelPath)
|
|
model2 <- read.ml(modelPath)
|
|
|
|
expect_is(model2, "FMRegressionModel")
|
|
expect_equal(summary(model), summary(model2))
|
|
|
|
prediction2 <- predict(model2, df)
|
|
expect_equal(
|
|
collect(prediction1),
|
|
collect(prediction2)
|
|
)
|
|
unlink(modelPath)
|
|
}
|
|
})
|
|
|
|
sparkR.session.stop()
|