SPARK-929: Fully deprecate usage of SPARK_MEM

(Continued from old repo, prior discussion at https://github.com/apache/incubator-spark/pull/615)

This patch cements our deprecation of the SPARK_MEM environment variable by replacing it with three more specialized variables:
SPARK_DAEMON_MEMORY, SPARK_EXECUTOR_MEMORY, and SPARK_DRIVER_MEMORY

The creation of the latter two variables means that we can safely set driver/job memory without accidentally setting the executor memory. Neither is public.

SPARK_EXECUTOR_MEMORY is only used by the Mesos scheduler (and set within SparkContext). The proper way of configuring executor memory is through the "spark.executor.memory" property.

SPARK_DRIVER_MEMORY is the new way of specifying the amount of memory run by jobs launched by spark-class, without possibly affecting executor memory.

Other memory considerations:
- The repl's memory can be set through the "--drivermem" command-line option, which really just sets SPARK_DRIVER_MEMORY.
- run-example doesn't use spark-class, so the only way to modify examples' memory is actually an unusual use of SPARK_JAVA_OPTS (which is normally overriden in all cases by spark-class).

This patch also fixes a lurking bug where spark-shell misused spark-class (the first argument is supposed to be the main class name, not java options), as well as a bug in the Windows spark-class2.cmd. I have not yet tested this patch on either Windows or Mesos, however.

Author: Aaron Davidson <aaron@databricks.com>

Closes #99 from aarondav/sparkmem and squashes the following commits:

9df4c68 [Aaron Davidson] SPARK-929: Fully deprecate usage of SPARK_MEM
This commit is contained in:
Aaron Davidson 2014-03-09 11:08:39 -07:00
parent e59a3b6c41
commit 52834d761b
7 changed files with 91 additions and 60 deletions

View file

@ -40,34 +40,46 @@ if [ -z "$1" ]; then
exit 1 exit 1
fi fi
# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable if [ -n "$SPARK_MEM" ]; then
# values for that; it doesn't need a lot echo "Warning: SPARK_MEM is deprecated, please use a more specific config option"
if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then echo "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)."
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
# Do not overwrite SPARK_JAVA_OPTS environment variable in this script
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default
else
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
fi fi
# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
DEFAULT_MEM=${SPARK_MEM:-512m}
# Add java opts for master, worker, executor. The opts maybe null SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
# Add java opts and memory settings for master, worker, executors, and repl.
case "$1" in case "$1" in
# Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master') 'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS" OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;; ;;
'org.apache.spark.deploy.worker.Worker') 'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS" OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;; ;;
# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
'org.apache.spark.executor.CoarseGrainedExecutorBackend') 'org.apache.spark.executor.CoarseGrainedExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;; ;;
'org.apache.spark.executor.MesosExecutorBackend') 'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;; ;;
# All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
'org.apache.spark.repl.Main') 'org.apache.spark.repl.Main')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS" OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
;;
*)
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
;; ;;
esac esac
@ -83,14 +95,10 @@ else
fi fi
fi fi
# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size # Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS" JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists # Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"

View file

@ -34,22 +34,45 @@ if not "x%1"=="x" goto arg_given
goto exit goto exit
:arg_given :arg_given
set RUNNING_DAEMON=0 if not "x%SPARK_MEM%"=="x" (
if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1 echo Warning: SPARK_MEM is deprecated, please use a more specific config option
if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1 echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m )
set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY%
rem Do not overwrite SPARK_JAVA_OPTS environment variable in this script
if "%RUNNING_DAEMON%"=="0" set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
if "%RUNNING_DAEMON%"=="1" set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
rem Figure out how much memory to use per executor and set it as an environment rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
rem variable so that our process sees it and can report it to Mesos set OUR_JAVA_MEM=%SPARK_MEM%
if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m
set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
rem Add java opts and memory settings for master, worker, executors, and repl.
rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
if "%1"=="org.apache.spark.deploy.master.Master" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
) else if "%1"=="org.apache.spark.executor.MesosExecutorBackend" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
) else if "%1"=="org.apache.spark.repl.Main" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
) else (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
)
rem Set JAVA_OPTS to be able to load native libraries and to set heap size rem Set JAVA_OPTS to be able to load native libraries and to set heap size
set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM% set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala! rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!
rem Test whether the user has built Spark rem Test whether the user has built Spark

View file

@ -45,13 +45,11 @@ if [ "$1" = "--help" ] || [ "$1" = "-h" ]; then
exit exit
fi fi
SPARK_SHELL_OPTS=""
for o in "$@"; do for o in "$@"; do
if [ "$1" = "-c" -o "$1" = "--cores" ]; then if [ "$1" = "-c" -o "$1" = "--cores" ]; then
shift shift
if [[ "$1" =~ $CORE_PATTERN ]]; then if [[ "$1" =~ $CORE_PATTERN ]]; then
SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.cores.max=$1" SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.cores.max=$1"
shift shift
else else
echo "ERROR: wrong format for -c/--cores" echo "ERROR: wrong format for -c/--cores"
@ -61,7 +59,7 @@ for o in "$@"; do
if [ "$1" = "-em" -o "$1" = "--execmem" ]; then if [ "$1" = "-em" -o "$1" = "--execmem" ]; then
shift shift
if [[ $1 =~ $MEM_PATTERN ]]; then if [[ $1 =~ $MEM_PATTERN ]]; then
SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.executor.memory=$1" SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.executor.memory=$1"
shift shift
else else
echo "ERROR: wrong format for --execmem/-em" echo "ERROR: wrong format for --execmem/-em"
@ -71,7 +69,7 @@ for o in "$@"; do
if [ "$1" = "-dm" -o "$1" = "--drivermem" ]; then if [ "$1" = "-dm" -o "$1" = "--drivermem" ]; then
shift shift
if [[ $1 =~ $MEM_PATTERN ]]; then if [[ $1 =~ $MEM_PATTERN ]]; then
export SPARK_MEM=$1 export SPARK_DRIVER_MEMORY=$1
shift shift
else else
echo "ERROR: wrong format for --drivermem/-dm" echo "ERROR: wrong format for --drivermem/-dm"
@ -125,16 +123,18 @@ if [[ ! $? ]]; then
fi fi
if $cygwin; then if $cygwin; then
# Workaround for issue involving JLine and Cygwin # Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/). # (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the # If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options # "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562). # (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1 stty -icanon min 1 -echo > /dev/null 2>&1
$FWDIR/bin/spark-class -Djline.terminal=unix $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@" export SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Djline.terminal=unix"
stty icanon echo > /dev/null 2>&1 $FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
else else
$FWDIR/bin/spark-class $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@" export SPARK_REPL_OPTS
$FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
fi fi
# record the exit status lest it be overwritten: # record the exit status lest it be overwritten:

View file

@ -162,19 +162,20 @@ class SparkContext(
jars.foreach(addJar) jars.foreach(addJar)
} }
def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
}
private[spark] val executorMemory = conf.getOption("spark.executor.memory") private[spark] val executorMemory = conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_MEM"))) .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
.map(Utils.memoryStringToMb) .map(Utils.memoryStringToMb)
.getOrElse(512) .getOrElse(512)
if (!conf.contains("spark.executor.memory") && sys.env.contains("SPARK_MEM")) {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, instead use spark.executor.memory")
}
// Environment variables to pass to our executors // Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]() private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS"); for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS");
value <- Option(System.getenv(key))) { value <- Option(System.getenv(key))) {
executorEnvs(key) = value executorEnvs(key) = value
@ -185,8 +186,9 @@ class SparkContext(
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value executorEnvs(envKey) = value
} }
// Since memory can be set with a system property too, use that // The Mesos scheduler backend relies on this environment variable to set executor memory.
executorEnvs("SPARK_MEM") = executorMemory + "m" // TODO: Set this only in the Mesos scheduler.
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= conf.getExecutorEnv executorEnvs ++= conf.getExecutorEnv
// Set SPARK_USER for user who is running SparkContext. // Set SPARK_USER for user who is running SparkContext.

View file

@ -532,8 +532,6 @@ private[spark] object Utils extends Logging {
/** /**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
* This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* environment variable.
*/ */
def memoryStringToMb(str: String): Int = { def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase val lower = str.toLowerCase

View file

@ -163,7 +163,7 @@ their work directories), *not* on your driver program.
**Cache Size Tuning** **Cache Size Tuning**
One important configuration parameter for GC is the amount of memory that should be used for caching RDDs. One important configuration parameter for GC is the amount of memory that should be used for caching RDDs.
By default, Spark uses 60% of the configured executor memory (`spark.executor.memory` or `SPARK_MEM`) to By default, Spark uses 60% of the configured executor memory (`spark.executor.memory`) to
cache RDDs. This means that 40% of memory is available for any objects created during task execution. cache RDDs. This means that 40% of memory is available for any objects created during task execution.
In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of

View file

@ -29,7 +29,7 @@ SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway(): def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the # Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and SPARK_MEM settings from spark-env.sh # proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows" on_windows = platform.system() == "Windows"
script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",