[SPARK-11340][SPARKR] Support setting driver properties when starting Spark from R programmatically or from RStudio
Mapping spark.driver.memory from sparkEnvir to spark-submit commandline arguments. shivaram suggested that we possibly add other spark.driver.* properties - do we want to add all of those? I thought those could be set in SparkConf? sun-rui Author: felixcheung <felixcheung_m@hotmail.com> Closes #9290 from felixcheung/rdrivermem.
This commit is contained in:
parent
729f983e66
commit
bb5a2af034
|
@ -77,7 +77,9 @@ sparkR.stop <- function() {
|
||||||
|
|
||||||
#' Initialize a new Spark Context.
|
#' Initialize a new Spark Context.
|
||||||
#'
|
#'
|
||||||
#' This function initializes a new SparkContext.
|
#' This function initializes a new SparkContext. For details on how to initialize
|
||||||
|
#' and use SparkR, refer to SparkR programming guide at
|
||||||
|
#' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext}.
|
||||||
#'
|
#'
|
||||||
#' @param master The Spark master URL.
|
#' @param master The Spark master URL.
|
||||||
#' @param appName Application name to register with cluster manager
|
#' @param appName Application name to register with cluster manager
|
||||||
|
@ -93,7 +95,7 @@ sparkR.stop <- function() {
|
||||||
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
|
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
|
||||||
#' list(spark.executor.memory="1g"))
|
#' list(spark.executor.memory="1g"))
|
||||||
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
|
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
|
||||||
#' list(spark.executor.memory="1g"),
|
#' list(spark.executor.memory="4g"),
|
||||||
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
|
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
|
||||||
#' c("jarfile1.jar","jarfile2.jar"))
|
#' c("jarfile1.jar","jarfile2.jar"))
|
||||||
#'}
|
#'}
|
||||||
|
@ -123,16 +125,21 @@ sparkR.init <- function(
|
||||||
uriSep <- "////"
|
uriSep <- "////"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sparkEnvirMap <- convertNamedListToEnv(sparkEnvir)
|
||||||
|
|
||||||
existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
|
existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
|
||||||
if (existingPort != "") {
|
if (existingPort != "") {
|
||||||
backendPort <- existingPort
|
backendPort <- existingPort
|
||||||
} else {
|
} else {
|
||||||
path <- tempfile(pattern = "backend_port")
|
path <- tempfile(pattern = "backend_port")
|
||||||
|
submitOps <- getClientModeSparkSubmitOpts(
|
||||||
|
Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
|
||||||
|
sparkEnvirMap)
|
||||||
launchBackend(
|
launchBackend(
|
||||||
args = path,
|
args = path,
|
||||||
sparkHome = sparkHome,
|
sparkHome = sparkHome,
|
||||||
jars = jars,
|
jars = jars,
|
||||||
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
|
sparkSubmitOpts = submitOps,
|
||||||
packages = sparkPackages)
|
packages = sparkPackages)
|
||||||
# wait atmost 100 seconds for JVM to launch
|
# wait atmost 100 seconds for JVM to launch
|
||||||
wait <- 0.1
|
wait <- 0.1
|
||||||
|
@ -171,8 +178,6 @@ sparkR.init <- function(
|
||||||
sparkHome <- suppressWarnings(normalizePath(sparkHome))
|
sparkHome <- suppressWarnings(normalizePath(sparkHome))
|
||||||
}
|
}
|
||||||
|
|
||||||
sparkEnvirMap <- convertNamedListToEnv(sparkEnvir)
|
|
||||||
|
|
||||||
sparkExecutorEnvMap <- convertNamedListToEnv(sparkExecutorEnv)
|
sparkExecutorEnvMap <- convertNamedListToEnv(sparkExecutorEnv)
|
||||||
if(is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
|
if(is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
|
||||||
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
|
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
|
||||||
|
@ -320,3 +325,33 @@ clearJobGroup <- function(sc) {
|
||||||
cancelJobGroup <- function(sc, groupId) {
|
cancelJobGroup <- function(sc, groupId) {
|
||||||
callJMethod(sc, "cancelJobGroup", groupId)
|
callJMethod(sc, "cancelJobGroup", groupId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sparkConfToSubmitOps <- new.env()
|
||||||
|
sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory"
|
||||||
|
sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path"
|
||||||
|
sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- "--driver-java-options"
|
||||||
|
sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-path"
|
||||||
|
|
||||||
|
# Utility function that returns Spark Submit arguments as a string
|
||||||
|
#
|
||||||
|
# A few Spark Application and Runtime environment properties cannot take effect after driver
|
||||||
|
# JVM has started, as documented in:
|
||||||
|
# http://spark.apache.org/docs/latest/configuration.html#application-properties
|
||||||
|
# When starting SparkR without using spark-submit, for example, from Rstudio, add them to
|
||||||
|
# spark-submit commandline if not already set in SPARKR_SUBMIT_ARGS so that they can be effective.
|
||||||
|
getClientModeSparkSubmitOpts <- function(submitOps, sparkEnvirMap) {
|
||||||
|
envirToOps <- lapply(ls(sparkConfToSubmitOps), function(conf) {
|
||||||
|
opsValue <- sparkEnvirMap[[conf]]
|
||||||
|
# process only if --option is not already specified
|
||||||
|
if (!is.null(opsValue) &&
|
||||||
|
nchar(opsValue) > 1 &&
|
||||||
|
!grepl(sparkConfToSubmitOps[[conf]], submitOps)) {
|
||||||
|
# put "" around value in case it has spaces
|
||||||
|
paste0(sparkConfToSubmitOps[[conf]], " \"", opsValue, "\" ")
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
}
|
||||||
|
})
|
||||||
|
# --option must be before the application class "sparkr-shell" in submitOps
|
||||||
|
paste0(paste0(envirToOps, collapse = ""), submitOps)
|
||||||
|
}
|
||||||
|
|
|
@ -65,3 +65,30 @@ test_that("job group functions can be called", {
|
||||||
cancelJobGroup(sc, "groupId")
|
cancelJobGroup(sc, "groupId")
|
||||||
clearJobGroup(sc)
|
clearJobGroup(sc)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
|
||||||
|
e <- new.env()
|
||||||
|
e[["spark.driver.memory"]] <- "512m"
|
||||||
|
ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
|
||||||
|
expect_equal("--driver-memory \"512m\" sparkrmain", ops)
|
||||||
|
|
||||||
|
e[["spark.driver.memory"]] <- "5g"
|
||||||
|
e[["spark.driver.extraClassPath"]] <- "/opt/class_path" # nolint
|
||||||
|
e[["spark.driver.extraJavaOptions"]] <- "-XX:+UseCompressedOops -XX:+UseCompressedStrings"
|
||||||
|
e[["spark.driver.extraLibraryPath"]] <- "/usr/local/hadoop/lib" # nolint
|
||||||
|
e[["random"]] <- "skipthis"
|
||||||
|
ops2 <- getClientModeSparkSubmitOpts("sparkr-shell", e)
|
||||||
|
# nolint start
|
||||||
|
expect_equal(ops2, paste0("--driver-class-path \"/opt/class_path\" --driver-java-options \"",
|
||||||
|
"-XX:+UseCompressedOops -XX:+UseCompressedStrings\" --driver-library-path \"",
|
||||||
|
"/usr/local/hadoop/lib\" --driver-memory \"5g\" sparkr-shell"))
|
||||||
|
# nolint end
|
||||||
|
|
||||||
|
e[["spark.driver.extraClassPath"]] <- "/" # too short
|
||||||
|
ops3 <- getClientModeSparkSubmitOpts("--driver-memory 4g sparkr-shell2", e)
|
||||||
|
# nolint start
|
||||||
|
expect_equal(ops3, paste0("--driver-java-options \"-XX:+UseCompressedOops ",
|
||||||
|
"-XX:+UseCompressedStrings\" --driver-library-path \"/usr/local/hadoop/lib\"",
|
||||||
|
" --driver-memory 4g sparkr-shell2"))
|
||||||
|
# nolint end
|
||||||
|
})
|
||||||
|
|
|
@ -29,7 +29,7 @@ All of the examples on this page use sample data included in R or the Spark dist
|
||||||
The entry point into SparkR is the `SparkContext` which connects your R program to a Spark cluster.
|
The entry point into SparkR is the `SparkContext` which connects your R program to a Spark cluster.
|
||||||
You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name
|
You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name
|
||||||
, any spark packages depended on, etc. Further, to work with DataFrames we will need a `SQLContext`,
|
, any spark packages depended on, etc. Further, to work with DataFrames we will need a `SQLContext`,
|
||||||
which can be created from the SparkContext. If you are working from the SparkR shell, the
|
which can be created from the SparkContext. If you are working from the `sparkR` shell, the
|
||||||
`SQLContext` and `SparkContext` should already be created for you.
|
`SQLContext` and `SparkContext` should already be created for you.
|
||||||
|
|
||||||
{% highlight r %}
|
{% highlight r %}
|
||||||
|
@ -37,6 +37,18 @@ sc <- sparkR.init()
|
||||||
sqlContext <- sparkRSQL.init(sc)
|
sqlContext <- sparkRSQL.init(sc)
|
||||||
{% endhighlight %}
|
{% endhighlight %}
|
||||||
|
|
||||||
|
In the event you are creating `SparkContext` instead of using `sparkR` shell or `spark-submit`, you
|
||||||
|
could also specify certain Spark driver properties. Normally these
|
||||||
|
[Application properties](configuration.html#application-properties) and
|
||||||
|
[Runtime Environment](configuration.html#runtime-environment) cannot be set programmatically, as the
|
||||||
|
driver JVM process would have been started, in this case SparkR takes care of this for you. To set
|
||||||
|
them, pass them as you would other configuration properties in the `sparkEnvir` argument to
|
||||||
|
`sparkR.init()`.
|
||||||
|
|
||||||
|
{% highlight r %}
|
||||||
|
sc <- sparkR.init("local[*]", "SparkR", "/home/spark", list(spark.driver.memory="2g"))
|
||||||
|
{% endhighlight %}
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
## Creating DataFrames
|
## Creating DataFrames
|
||||||
|
|
Loading…
Reference in a new issue