[SPARK-4415] [PySpark] JVM should exit after Python exit

When JVM is started in a Python process, it should exit once the stdin is closed.

test: add spark.driver.memory in conf/spark-defaults.conf

```
daviesdm:~/work/spark$ cat conf/spark-defaults.conf
spark.driver.memory       8g
daviesdm:~/work/spark$ bin/pyspark
>>> quit
daviesdm:~/work/spark$ jps
4931 Jps
286
daviesdm:~/work/spark$ python wc.py
943738
0.719928026199
daviesdm:~/work/spark$ jps
286
4990 Jps
```

Author: Davies Liu <davies@databricks.com>

Closes #3274 from davies/exit and squashes the following commits:

df0e524 [Davies Liu] address comments
ce8599c [Davies Liu] address comments
050651f [Davies Liu] JVM should exit after Python exit
This commit is contained in:
Davies Liu 2014-11-14 20:13:46 -08:00 committed by Andrew Or
parent 303a4e4d23
commit 7fe08b43c7
4 changed files with 9 additions and 9 deletions

View file

@ -132,7 +132,5 @@ if [[ "$1" =~ \.py$ ]]; then
gatherSparkSubmitOpts "$@"
exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}"
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
fi

View file

@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do (
)
if [%PYTHON_FILE%] == [] (
set PYSPARK_SHELL=1
if [%IPYTHON%] == [1] (
ipython %IPYTHON_OPTS%
) else (

View file

@ -149,14 +149,15 @@ private[spark] object SparkSubmitDriverBootstrapper {
// subprocess there already reads directly from our stdin, so we should avoid spawning a
// thread that contends with the subprocess in reading from System.in.
val isWindows = Utils.isWindows
val isPySparkShell = sys.env.contains("PYSPARK_SHELL")
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
if (!isWindows) {
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
stdinThread.start()
// For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM
// should terminate on broken pipe, which signals that the parent process has exited. In
// Windows, the termination logic for the PySpark shell is handled in java_gateway.py
if (isPySparkShell) {
// Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
// broken pipe, signaling that the parent process has exited. This is the case if the
// application is launched directly from python, as in the PySpark shell. In Windows,
// the termination logic is handled in java_gateway.py
if (isSubprocess) {
stdinThread.join()
process.destroy()
}

View file

@ -45,7 +45,9 @@ def launch_gateway():
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
env = dict(os.environ)
env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdout=PIPE, stdin=PIPE)