[SPARK-1808] Route bin/pyspark through Spark submit

**Problem.** For `bin/pyspark`, there is currently no other way to specify Spark configuration properties other than through `SPARK_JAVA_OPTS` in `conf/spark-env.sh`. However, this mechanism is supposedly deprecated. Instead, it needs to pick up configurations explicitly specified in `conf/spark-defaults.conf`.

**Solution.** Have `bin/pyspark` invoke `bin/spark-submit`, like all of its counterparts in Scala land (i.e. `bin/spark-shell`, `bin/run-example`). This has the additional benefit of making the invocation of all the user facing Spark scripts consistent.

**Details.** `bin/pyspark` inherently handles two cases: (1) running python applications and (2) running the python shell. For (1), Spark submit already handles running python applications. For cases in which `bin/pyspark` is given a python file, we can simply call pass the file directly to Spark submit and let it handle the rest.

For case (2), `bin/pyspark` starts a python process as before, which launches the JVM as a sub-process. The existing code already provides a code path to do this. All we needed to change is to use `bin/spark-submit` instead of `spark-class` to launch the JVM. This requires modifications to Spark submit to handle the pyspark shell as a special case.

This has been tested locally (OSX and Windows 7), on a standalone cluster, and on a YARN cluster. Running IPython also works as before, except now it takes in Spark submit arguments too.

Author: Andrew Or <andrewor14@gmail.com>

Closes #799 from andrewor14/pyspark-submit and squashes the following commits:

bf37e36 [Andrew Or] Minor changes
01066fa [Andrew Or] bin/pyspark for Windows
c8cb3bf [Andrew Or] Handle perverse app names (with escaped quotes)
1866f85 [Andrew Or] Windows is not cooperating
456d844 [Andrew Or] Guard against shlex hanging if PYSPARK_SUBMIT_ARGS is not set
7eebda8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
b7ba0d8 [Andrew Or] Address a few comments (minor)
06eb138 [Andrew Or] Use shlex instead of writing our own parser
05879fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
a823661 [Andrew Or] Fix --die-on-broken-pipe not propagated properly
6fba412 [Andrew Or] Deal with quotes + address various comments
fe4c8a7 [Andrew Or] Update --help for bin/pyspark
afe47bf [Andrew Or] Fix spark shell
f04aaa4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit
a371d26 [Andrew Or] Route bin/pyspark through Spark submit
This commit is contained in:
Andrew Or 2014-05-16 22:34:38 -07:00 committed by Patrick Wendell
parent c0ab85d732
commit 4b8ec6fcfd
10 changed files with 107 additions and 34 deletions

View file

@ -25,6 +25,12 @@ export SPARK_HOME="$FWDIR"
SCALA_VERSION=2.10 SCALA_VERSION=2.10
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./bin/pyspark [options]"
./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi
# Exit if the user hasn't compiled Spark # Exit if the user hasn't compiled Spark
if [ ! -f "$FWDIR/RELEASE" ]; then if [ ! -f "$FWDIR/RELEASE" ]; then
# Exit if the user hasn't compiled Spark # Exit if the user hasn't compiled Spark
@ -52,13 +58,34 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
# If IPython options are specified, assume user wants to run IPython
if [ -n "$IPYTHON_OPTS" ]; then if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1 IPYTHON=1
fi fi
# Build up arguments list manually to preserve quotes and backslashes.
# We export Spark submit arguments as an environment variable because shell.py must run as a
# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
PYSPARK_SUBMIT_ARGS=""
whitespace="[[:space:]]"
for i in "$@"; do
if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi
if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi
PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
done
export PYSPARK_SUBMIT_ARGS
# If a python file is provided, directly run spark-submit.
if [[ "$1" =~ \.py$ ]]; then
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2
echo -e "Use ./bin/spark-submit <python file>\n" 1>&2
exec $FWDIR/bin/spark-submit "$@"
else
# Only use ipython if no command line arguments were provided [SPARK-1134] # Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then if [[ "$IPYTHON" = "1" ]]; then
exec ipython $IPYTHON_OPTS exec ipython $IPYTHON_OPTS
else else
exec "$PYSPARK_PYTHON" "$@" exec "$PYSPARK_PYTHON"
fi
fi fi

View file

@ -31,7 +31,7 @@ set FOUND_JAR=0
for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do ( for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
set FOUND_JAR=1 set FOUND_JAR=1
) )
if "%FOUND_JAR%"=="0" ( if [%FOUND_JAR%] == [0] (
echo Failed to find Spark assembly JAR. echo Failed to find Spark assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program. echo You need to build Spark with sbt\sbt assembly before running this program.
goto exit goto exit
@ -42,15 +42,30 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Figure out which Python to use. rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python
set PYTHONPATH=%FWDIR%python;%PYTHONPATH% set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH% set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
set PYSPARK_SUBMIT_ARGS=%*
echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH% echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%
"%PYSPARK_PYTHON%" %* rem Check whether the argument is a file
for /f %%i in ('echo %1^| findstr /R "\.py"') do (
set PYTHON_FILE=%%i
)
if [%PYTHON_FILE%] == [] (
%PYSPARK_PYTHON%
) else (
echo.
echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0.
echo Use ./bin/spark-submit ^<python file^>
echo.
"%FWDIR%\bin\spark-submit.cmd" %PYSPARK_SUBMIT_ARGS%
)
:exit :exit

View file

@ -28,7 +28,7 @@ esac
# Enter posix mode for bash # Enter posix mode for bash
set -o posix set -o posix
if [[ "$@" == *--help* ]]; then if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./bin/spark-shell [options]" echo "Usage: ./bin/spark-shell [options]"
./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0 exit 0
@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562). # (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1 stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
stty icanon echo > /dev/null 2>&1 stty icanon echo > /dev/null 2>&1
else else
export SPARK_SUBMIT_OPTS export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
fi fi
} }

View file

@ -19,4 +19,4 @@ rem
set SPARK_HOME=%~dp0.. set SPARK_HOME=%~dp0..
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-internal %* --class org.apache.spark.repl.Main cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main

View file

@ -42,7 +42,7 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument // python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String] val pathElements = new ArrayBuffer[String]
pathElements ++= pyFiles.split(",") pathElements ++= Option(pyFiles).getOrElse("").split(",")
pathElements += PythonUtils.sparkPythonPath pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "") pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)

View file

@ -41,10 +41,10 @@ object SparkSubmit {
private var clusterManager: Int = LOCAL private var clusterManager: Int = LOCAL
/** /**
* A special jar name that indicates the class being run is inside of Spark itself, * Special primary resource names that represent shells rather than application jars.
* and therefore no user jar is needed.
*/ */
private val RESERVED_JAR_NAME = "spark-internal" private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
def main(args: Array[String]) { def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args) val appArgs = new SparkSubmitArguments(args)
@ -71,8 +71,8 @@ object SparkSubmit {
* entries for the child, a list of system properties, a list of env vars * entries for the child, a list of system properties, a list of env vars
* and the main class for the child * and the main class for the child
*/ */
private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String], private[spark] def createLaunchEnv(args: SparkSubmitArguments)
ArrayBuffer[String], Map[String, String], String) = { : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) { if (args.master.startsWith("local")) {
clusterManager = LOCAL clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) { } else if (args.master.startsWith("yarn")) {
@ -121,24 +121,30 @@ object SparkSubmit {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos") printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
} }
// If we're running a Python app, set the Java class to run to be our PythonRunner, add // If we're running a python app, set the main class to our specific python runner
// Python files to deployment list, and pass the main file and Python path to PythonRunner
if (isPython) { if (isPython) {
if (deployOnCluster) { if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster") printErrorAndExit("Cannot currently run Python driver programs on cluster")
} }
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner" args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource) args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
val pyFiles = Option(args.pyFiles).getOrElse("") val pyFiles = Option(args.pyFiles).getOrElse("")
args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs args.files = mergeFileLists(args.files, pyFiles)
args.primaryResource = RESERVED_JAR_NAME
sysProps("spark.submit.pyFiles") = pyFiles sysProps("spark.submit.pyFiles") = pyFiles
} }
// If we're deploying into YARN, use yarn.Client as a wrapper around the user class // If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) { if (!deployOnCluster) {
childMainClass = args.mainClass childMainClass = args.mainClass
if (args.primaryResource != RESERVED_JAR_NAME) { if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource childClasspath += args.primaryResource
} }
} else if (clusterManager == YARN) { } else if (clusterManager == YARN) {
@ -219,7 +225,7 @@ object SparkSubmit {
// For python files, the primary resource is already distributed as a regular file // For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) { if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) { if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource) jars = jars ++ Seq(args.primaryResource)
} }
sysProps.put("spark.jars", jars.mkString(",")) sysProps.put("spark.jars", jars.mkString(","))
@ -293,7 +299,7 @@ object SparkSubmit {
} }
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(new URI(localJar).getPath()) val localJarFile = new File(new URI(localJar).getPath)
if (!localJarFile.exists()) { if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.") printWarning(s"Jar $localJar does not exist, skipping.")
} }
@ -302,6 +308,27 @@ object SparkSubmit {
loader.addURL(url) loader.addURL(url)
} }
/**
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
}
/**
* Return whether the given primary resource represents a shell.
*/
private def isShell(primaryResource: String): Boolean = {
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
}
/**
* Return whether the given primary resource requires running python.
*/
private[spark] def isPython(primaryResource: String): Boolean = {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}
/** /**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate * Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string. * no files, into a single comma-separated string.

View file

@ -298,11 +298,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
case v => case v =>
primaryResource = v primaryResource = v
inSparkOpts = false inSparkOpts = false
isPython = v.endsWith(".py") isPython = SparkSubmit.isPython(v)
parse(tail) parse(tail)
} }
} else { } else {
if (!value.isEmpty) {
childArgs += value childArgs += value
}
parse(tail) parse(tail)
} }

View file

@ -1101,7 +1101,7 @@ private[spark] object Utils extends Logging {
* Strip the directory from a path name * Strip the directory from a path name
*/ */
def stripDirectory(path: String): String = { def stripDirectory(path: String): String = {
path.split(File.separator).last new File(path).getName
} }
/** /**

View file

@ -18,12 +18,12 @@
import os import os
import sys import sys
import signal import signal
import shlex
import platform import platform
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from threading import Thread from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.java_gateway import java_import, JavaGateway, GatewayClient
def launch_gateway(): def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"] SPARK_HOME = os.environ["SPARK_HOME"]
@ -34,9 +34,11 @@ def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the # Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh # proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows" on_windows = platform.system() == "Windows"
script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
"--die-on-broken-pipe", "0"] submit_args = submit_args if submit_args is not None else ""
submit_args = shlex.split(submit_args)
command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
if not on_windows: if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway: # Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func(): def preexec_func():

View file

@ -40,7 +40,7 @@ add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES"
if os.environ.get("SPARK_EXECUTOR_URI"): if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files) sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
print("""Welcome to print("""Welcome to
____ __ ____ __