[SPARK-3167] Handle special driver configs in Windows

This is an effort to bring the Windows scripts up to speed after recent splashing changes in #1845.

Author: Andrew Or <andrewor14@gmail.com>

Closes #2129 from andrewor14/windows-config and squashes the following commits:

881a8f0 [Andrew Or] Add reference to Windows taskkill
92e6047 [Andrew Or] Update a few comments (minor)
22b1acd [Andrew Or] Fix style again (minor)
afcffea [Andrew Or] Fix style (minor)
72004c2 [Andrew Or] Actually respect --driver-java-options
803218b [Andrew Or] Actually respect SPARK_*_CLASSPATH
eeb34a0 [Andrew Or] Update outdated comment (minor)
35caecc [Andrew Or] In Windows, actually kill Java processes on exit
f97daa2 [Andrew Or] Fix Windows spark shell stdin issue
83ebe60 [Andrew Or] Parse special driver configs in Windows (broken)
This commit is contained in:
Andrew Or 2014-08-26 22:52:16 -07:00 committed by Patrick Wendell
parent bf719056b7
commit 7557c4cfef
6 changed files with 95 additions and 26 deletions

View file

@ -36,7 +36,8 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Build up classpath
set CLASSPATH=%FWDIR%conf
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf
if exist "%FWDIR%RELEASE" (
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
set ASSEMBLY_JAR=%%d

46
bin/spark-class2.cmd Executable file → Normal file
View file

@ -17,6 +17,8 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
setlocal enabledelayedexpansion
set SCALA_VERSION=2.10
@ -38,7 +40,7 @@ if not "x%1"=="x" goto arg_given
if not "x%SPARK_MEM%"=="x" (
echo Warning: SPARK_MEM is deprecated, please use a more specific config option
echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
echo e.g., spark.executor.memory or spark.driver.memory.
)
rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
@ -67,10 +69,18 @@ rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
) else if "%1"=="org.apache.spark.repl.Main" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
rem The repl also uses SPARK_REPL_OPTS.
) else if "%1"=="org.apache.spark.deploy.SparkSubmit" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS%
if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" (
set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH%
) else if not "x%SPARK_LIBRARY_PATH%"=="x" (
set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH%
)
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY%
) else (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
@ -80,9 +90,9 @@ rem Set JAVA_OPTS to be able to load native libraries and to set heap size
for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i
for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i
if "%jversion%" geq "1.8.0" (
set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
set JAVA_OPTS=%OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
) else (
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
)
rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
@ -115,5 +125,27 @@ rem Figure out where java is.
set RUNNER=java
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching
rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM
rem to prepare the launch environment of this driver JVM.
rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.
rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must
rem be done here because the Windows "shift" command does not work in a conditional block.
set BOOTSTRAP_ARGS=
shift
:start_parse
if "%~1" == "" goto end_parse
set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1
shift
goto start_parse
:end_parse
if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] (
set SPARK_CLASS=1
"%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS%
) else (
"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
)
:exit

View file

@ -17,7 +17,7 @@
# limitations under the License.
#
# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!
# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=("$@")

View file

@ -17,23 +17,28 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
set SPARK_HOME=%~dp0..
set ORIG_ARGS=%*
rem Clear the values of all variables used
set DEPLOY_MODE=
set DRIVER_MEMORY=
rem Reset the values of all variables used
set SPARK_SUBMIT_DEPLOY_MODE=client
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf
set SPARK_SUBMIT_DRIVER_MEMORY=
set SPARK_SUBMIT_LIBRARY_PATH=
set SPARK_SUBMIT_CLASSPATH=
set SPARK_SUBMIT_OPTS=
set SPARK_DRIVER_MEMORY=
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=
:loop
if [%1] == [] goto continue
if [%1] == [--deploy-mode] (
set DEPLOY_MODE=%2
set SPARK_SUBMIT_DEPLOY_MODE=%2
) else if [%1] == [--properties-file] (
set SPARK_SUBMIT_PROPERTIES_FILE=%2
) else if [%1] == [--driver-memory] (
set DRIVER_MEMORY=%2
set SPARK_SUBMIT_DRIVER_MEMORY=%2
) else if [%1] == [--driver-library-path] (
set SPARK_SUBMIT_LIBRARY_PATH=%2
) else if [%1] == [--driver-class-path] (
@ -45,12 +50,19 @@ if [%1] == [] goto continue
goto loop
:continue
if [%DEPLOY_MODE%] == [] (
set DEPLOY_MODE=client
)
rem For client mode, the driver will be launched in the same JVM that launches
rem SparkSubmit, so we may need to read the properties file for any extra class
rem paths, library paths, java options and memory early on. Otherwise, it will
rem be too late by the time the driver JVM has started.
if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] (
set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY%
if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
rem Parse the properties file only if the special configs exist
for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
%SPARK_SUBMIT_PROPERTIES_FILE%') do (
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
)
)
)
cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%

View file

@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper {
val process = builder.start()
// Redirect stdin, stdout, and stderr to/from the child JVM
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
stdinThread.start()
stdoutThread.start()
stderrThread.start()
// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
stdinThread.join()
process.destroy()
// In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
// a thread that contends with the subprocess in reading from System.in.
if (Utils.isWindows) {
// For the PySpark shell, the termination of this process is handled in java_gateway.py
process.waitFor()
} else {
// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
stdinThread.start()
stdinThread.join()
process.destroy()
}
}
}

View file

@ -15,6 +15,7 @@
# limitations under the License.
#
import atexit
import os
import sys
import signal
@ -69,6 +70,22 @@ def launch_gateway():
error_msg += "--------------------------------------------------------------\n"
raise Exception(error_msg)
# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)
# Create a thread to echo output from the GatewayServer, which is required
# for Java log output to show up:
class EchoOutputThread(Thread):