From bb5a2af034196620d869fc9b1a400e014e718b8c Mon Sep 17 00:00:00 2001 From: felixcheung Date: Fri, 30 Oct 2015 13:51:32 -0700 Subject: [PATCH] [SPARK-11340][SPARKR] Support setting driver properties when starting Spark from R programmatically or from RStudio Mapping spark.driver.memory from sparkEnvir to spark-submit commandline arguments. shivaram suggested that we possibly add other spark.driver.* properties - do we want to add all of those? I thought those could be set in SparkConf? sun-rui Author: felixcheung Closes #9290 from felixcheung/rdrivermem. --- R/pkg/R/sparkR.R | 45 +++++++++++++++++++++++++++++---- R/pkg/inst/tests/test_context.R | 27 ++++++++++++++++++++ docs/sparkr.md | 28 ++++++++++++++------ 3 files changed, 87 insertions(+), 13 deletions(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 043b0057bd..004d08e74e 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -77,7 +77,9 @@ sparkR.stop <- function() { #' Initialize a new Spark Context. #' -#' This function initializes a new SparkContext. +#' This function initializes a new SparkContext. For details on how to initialize +#' and use SparkR, refer to SparkR programming guide at +#' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext}. #' #' @param master The Spark master URL. #' @param appName Application name to register with cluster manager @@ -93,7 +95,7 @@ sparkR.stop <- function() { #' sc <- sparkR.init("local[2]", "SparkR", "/home/spark", #' list(spark.executor.memory="1g")) #' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark", -#' list(spark.executor.memory="1g"), +#' list(spark.executor.memory="4g"), #' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"), #' c("jarfile1.jar","jarfile2.jar")) #'} @@ -123,16 +125,21 @@ sparkR.init <- function( uriSep <- "////" } + sparkEnvirMap <- convertNamedListToEnv(sparkEnvir) + existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "") if (existingPort != "") { backendPort <- existingPort } else { path <- tempfile(pattern = "backend_port") + submitOps <- getClientModeSparkSubmitOpts( + Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"), + sparkEnvirMap) launchBackend( args = path, sparkHome = sparkHome, jars = jars, - sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"), + sparkSubmitOpts = submitOps, packages = sparkPackages) # wait atmost 100 seconds for JVM to launch wait <- 0.1 @@ -171,8 +178,6 @@ sparkR.init <- function( sparkHome <- suppressWarnings(normalizePath(sparkHome)) } - sparkEnvirMap <- convertNamedListToEnv(sparkEnvir) - sparkExecutorEnvMap <- convertNamedListToEnv(sparkExecutorEnv) if(is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) { sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- @@ -320,3 +325,33 @@ clearJobGroup <- function(sc) { cancelJobGroup <- function(sc, groupId) { callJMethod(sc, "cancelJobGroup", groupId) } + +sparkConfToSubmitOps <- new.env() +sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory" +sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path" +sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- "--driver-java-options" +sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-path" + +# Utility function that returns Spark Submit arguments as a string +# +# A few Spark Application and Runtime environment properties cannot take effect after driver +# JVM has started, as documented in: +# http://spark.apache.org/docs/latest/configuration.html#application-properties +# When starting SparkR without using spark-submit, for example, from Rstudio, add them to +# spark-submit commandline if not already set in SPARKR_SUBMIT_ARGS so that they can be effective. +getClientModeSparkSubmitOpts <- function(submitOps, sparkEnvirMap) { + envirToOps <- lapply(ls(sparkConfToSubmitOps), function(conf) { + opsValue <- sparkEnvirMap[[conf]] + # process only if --option is not already specified + if (!is.null(opsValue) && + nchar(opsValue) > 1 && + !grepl(sparkConfToSubmitOps[[conf]], submitOps)) { + # put "" around value in case it has spaces + paste0(sparkConfToSubmitOps[[conf]], " \"", opsValue, "\" ") + } else { + "" + } + }) + # --option must be before the application class "sparkr-shell" in submitOps + paste0(paste0(envirToOps, collapse = ""), submitOps) +} diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R index e99815ed15..80c1b89a4c 100644 --- a/R/pkg/inst/tests/test_context.R +++ b/R/pkg/inst/tests/test_context.R @@ -65,3 +65,30 @@ test_that("job group functions can be called", { cancelJobGroup(sc, "groupId") clearJobGroup(sc) }) + +test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", { + e <- new.env() + e[["spark.driver.memory"]] <- "512m" + ops <- getClientModeSparkSubmitOpts("sparkrmain", e) + expect_equal("--driver-memory \"512m\" sparkrmain", ops) + + e[["spark.driver.memory"]] <- "5g" + e[["spark.driver.extraClassPath"]] <- "/opt/class_path" # nolint + e[["spark.driver.extraJavaOptions"]] <- "-XX:+UseCompressedOops -XX:+UseCompressedStrings" + e[["spark.driver.extraLibraryPath"]] <- "/usr/local/hadoop/lib" # nolint + e[["random"]] <- "skipthis" + ops2 <- getClientModeSparkSubmitOpts("sparkr-shell", e) + # nolint start + expect_equal(ops2, paste0("--driver-class-path \"/opt/class_path\" --driver-java-options \"", + "-XX:+UseCompressedOops -XX:+UseCompressedStrings\" --driver-library-path \"", + "/usr/local/hadoop/lib\" --driver-memory \"5g\" sparkr-shell")) + # nolint end + + e[["spark.driver.extraClassPath"]] <- "/" # too short + ops3 <- getClientModeSparkSubmitOpts("--driver-memory 4g sparkr-shell2", e) + # nolint start + expect_equal(ops3, paste0("--driver-java-options \"-XX:+UseCompressedOops ", + "-XX:+UseCompressedStrings\" --driver-library-path \"/usr/local/hadoop/lib\"", + " --driver-memory 4g sparkr-shell2")) + # nolint end +}) diff --git a/docs/sparkr.md b/docs/sparkr.md index 7139d16b4a..497a276679 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -29,7 +29,7 @@ All of the examples on this page use sample data included in R or the Spark dist The entry point into SparkR is the `SparkContext` which connects your R program to a Spark cluster. You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name , any spark packages depended on, etc. Further, to work with DataFrames we will need a `SQLContext`, -which can be created from the SparkContext. If you are working from the SparkR shell, the +which can be created from the SparkContext. If you are working from the `sparkR` shell, the `SQLContext` and `SparkContext` should already be created for you. {% highlight r %} @@ -37,17 +37,29 @@ sc <- sparkR.init() sqlContext <- sparkRSQL.init(sc) {% endhighlight %} +In the event you are creating `SparkContext` instead of using `sparkR` shell or `spark-submit`, you +could also specify certain Spark driver properties. Normally these +[Application properties](configuration.html#application-properties) and +[Runtime Environment](configuration.html#runtime-environment) cannot be set programmatically, as the +driver JVM process would have been started, in this case SparkR takes care of this for you. To set +them, pass them as you would other configuration properties in the `sparkEnvir` argument to +`sparkR.init()`. + +{% highlight r %} +sc <- sparkR.init("local[*]", "SparkR", "/home/spark", list(spark.driver.memory="2g")) +{% endhighlight %} + ## Creating DataFrames With a `SQLContext`, applications can create `DataFrame`s from a local R data frame, from a [Hive table](sql-programming-guide.html#hive-tables), or from other [data sources](sql-programming-guide.html#data-sources). ### From local data frames -The simplest way to create a data frame is to convert a local R data frame into a SparkR DataFrame. Specifically we can use `createDataFrame` and pass in the local R data frame to create a SparkR DataFrame. As an example, the following creates a `DataFrame` based using the `faithful` dataset from R. +The simplest way to create a data frame is to convert a local R data frame into a SparkR DataFrame. Specifically we can use `createDataFrame` and pass in the local R data frame to create a SparkR DataFrame. As an example, the following creates a `DataFrame` based using the `faithful` dataset from R.
{% highlight r %} -df <- createDataFrame(sqlContext, faithful) +df <- createDataFrame(sqlContext, faithful) # Displays the content of the DataFrame to stdout head(df) @@ -96,7 +108,7 @@ printSchema(people)
The data sources API can also be used to save out DataFrames into multiple file formats. For example we can save the DataFrame from the previous example -to a Parquet file using `write.df` +to a Parquet file using `write.df`
{% highlight r %} @@ -139,7 +151,7 @@ Here we include some basic examples and a complete list can be found in the [API
{% highlight r %} # Create the DataFrame -df <- createDataFrame(sqlContext, faithful) +df <- createDataFrame(sqlContext, faithful) # Get basic information about the DataFrame df @@ -152,7 +164,7 @@ head(select(df, df$eruptions)) ##2 1.800 ##3 3.333 -# You can also pass in column name as strings +# You can also pass in column name as strings head(select(df, "eruptions")) # Filter the DataFrame to only retain rows with wait times shorter than 50 mins @@ -166,7 +178,7 @@ head(filter(df, df$waiting < 50))
-### Grouping, Aggregation +### Grouping, Aggregation SparkR data frames support a number of commonly used functions to aggregate data after grouping. For example we can compute a histogram of the `waiting` time in the `faithful` dataset as shown below @@ -194,7 +206,7 @@ head(arrange(waiting_counts, desc(waiting_counts$count))) ### Operating on Columns -SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions. +SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
{% highlight r %}