[SPARK-12019][SPARKR] Support character vector for sparkR.init(), check param and fix doc

and add tests.
Spark submit expects comma-separated list

Author: felixcheung <felixcheung_m@hotmail.com>

Closes #10034 from felixcheung/sparkrinitdoc.
This commit is contained in:
felixcheung 2015-12-03 13:25:20 -08:00 committed by Shivaram Venkataraman
parent a02d472773
commit 2213441e5e
5 changed files with 79 additions and 21 deletions

View file

@ -44,12 +44,16 @@ determineSparkSubmitBin <- function() {
}
generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
jars <- paste0(jars, collapse = ",")
if (jars != "") {
jars <- paste("--jars", jars)
# construct the jars argument with a space between --jars and comma-separated values
jars <- paste0("--jars ", jars)
}
if (!identical(packages, "")) {
packages <- paste("--packages", packages)
packages <- paste0(packages, collapse = ",")
if (packages != "") {
# construct the packages argument with a space between --packages and comma-separated values
packages <- paste0("--packages ", packages)
}
combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ")

View file

@ -86,13 +86,13 @@ sparkR.stop <- function() {
#' and use SparkR, refer to SparkR programming guide at
#' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext}.
#'
#' @param master The Spark master URL.
#' @param master The Spark master URL
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkPackages Character string vector of packages from spark-packages.org
#' @param sparkEnvir Named list of environment variables to set on worker nodes
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors
#' @param sparkJars Character vector of jar files to pass to the worker nodes
#' @param sparkPackages Character vector of packages from spark-packages.org
#' @export
#' @examples
#'\dontrun{
@ -102,7 +102,9 @@ sparkR.stop <- function() {
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="4g"),
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#' c("jarfile1.jar","jarfile2.jar"))
#' c("one.jar", "two.jar", "three.jar"),
#' c("com.databricks:spark-avro_2.10:2.0.1",
#' "com.databricks:spark-csv_2.10:1.3.0"))
#'}
sparkR.init <- function(
@ -120,15 +122,8 @@ sparkR.init <- function(
return(get(".sparkRjsc", envir = .sparkREnv))
}
jars <- suppressWarnings(normalizePath(as.character(sparkJars)))
# Classpath separator is ";" on Windows
# URI needs four /// as from http://stackoverflow.com/a/18522792
if (.Platform$OS.type == "unix") {
uriSep <- "//"
} else {
uriSep <- "////"
}
jars <- processSparkJars(sparkJars)
packages <- processSparkPackages(sparkPackages)
sparkEnvirMap <- convertNamedListToEnv(sparkEnvir)
@ -145,7 +140,7 @@ sparkR.init <- function(
sparkHome = sparkHome,
jars = jars,
sparkSubmitOpts = submitOps,
packages = sparkPackages)
packages = packages)
# wait atmost 100 seconds for JVM to launch
wait <- 0.1
for (i in 1:25) {
@ -195,8 +190,14 @@ sparkR.init <- function(
paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
}
nonEmptyJars <- Filter(function(x) { x != "" }, jars)
localJarPaths <- lapply(nonEmptyJars,
# Classpath separator is ";" on Windows
# URI needs four /// as from http://stackoverflow.com/a/18522792
if (.Platform$OS.type == "unix") {
uriSep <- "//"
} else {
uriSep <- "////"
}
localJarPaths <- lapply(jars,
function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
# Set the start time to identify jobjs
@ -366,3 +367,22 @@ getClientModeSparkSubmitOpts <- function(submitOps, sparkEnvirMap) {
# --option must be before the application class "sparkr-shell" in submitOps
paste0(paste0(envirToOps, collapse = ""), submitOps)
}
# Utility function that handles sparkJars argument, and normalize paths
processSparkJars <- function(jars) {
splittedJars <- splitString(jars)
if (length(splittedJars) > length(jars)) {
warning("sparkJars as a comma-separated string is deprecated, use character vector instead")
}
normalized <- suppressWarnings(normalizePath(splittedJars))
normalized
}
# Utility function that handles sparkPackages argument
processSparkPackages <- function(packages) {
splittedPackages <- splitString(packages)
if (length(splittedPackages) > length(packages)) {
warning("sparkPackages as a comma-separated string is deprecated, use character vector instead")
}
splittedPackages
}

View file

@ -636,3 +636,8 @@ assignNewEnv <- function(data) {
}
env
}
# Utility function to split by ',' and whitespace, remove empty tokens
splitString <- function(input) {
Filter(nzchar, unlist(strsplit(input, ",|\\s")))
}

View file

@ -34,3 +34,12 @@ test_that("no package specified doesn't add packages flag", {
test_that("multiple packages don't produce a warning", {
expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning()))
})
test_that("sparkJars sparkPackages as character vectors", {
args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
c("com.databricks:spark-avro_2.10:2.0.1",
"com.databricks:spark-csv_2.10:1.3.0"))
expect_match(args, "--jars one.jar,two.jar,three.jar")
expect_match(args,
"--packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.3.0")
})

View file

@ -92,3 +92,23 @@ test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whiteli
" --driver-memory 4g sparkr-shell2"))
# nolint end
})
test_that("sparkJars sparkPackages as comma-separated strings", {
expect_warning(processSparkJars(" a, b "))
jars <- suppressWarnings(processSparkJars(" a, b "))
expect_equal(jars, c("a", "b"))
jars <- suppressWarnings(processSparkJars(" abc ,, def "))
expect_equal(jars, c("abc", "def"))
jars <- suppressWarnings(processSparkJars(c(" abc ,, def ", "", "xyz", " ", "a,b")))
expect_equal(jars, c("abc", "def", "xyz", "a", "b"))
p <- processSparkPackages(c("ghi", "lmn"))
expect_equal(p, c("ghi", "lmn"))
# check normalizePath
f <- dir()[[1]]
expect_that(processSparkJars(f), not(gives_warning()))
expect_match(processSparkJars(f), f)
})