[SPARK-6324] [CORE] Centralize handling of script usage messages.
Reorganize code so that the launcher library handles most of the work of printing usage messages, instead of having an awkward protocol between the library and the scripts for that. This mostly applies to SparkSubmit, since the launcher lib does not do command line parsing for classes invoked in other ways, and thus cannot handle failures for those. Most scripts end up going through SparkSubmit, though, so it all works. The change adds a new, internal command line switch, "--usage-error", which prints the usage message and exits with a non-zero status. Scripts can override the command printed in the usage message by setting an environment variable - this avoids having to grep the output of SparkSubmit to remove references to the "spark-submit" script. The only sub-optimal part of the change is the special handling for the spark-sql usage, which is now done in SparkSubmitArguments. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5841 from vanzin/SPARK-6324 and squashes the following commits: 2821481 [Marcelo Vanzin] Merge branch 'master' into SPARK-6324 bf139b5 [Marcelo Vanzin] Filter output of Spark SQL CLI help. c6609bf [Marcelo Vanzin] Fix exit code never being used when printing usage messages. 6bc1b41 [Marcelo Vanzin] [SPARK-6324] [core] Centralize handling of script usage messages.
This commit is contained in:
parent
019dc9f558
commit
700312e12f
16
bin/pyspark
16
bin/pyspark
|
@ -17,24 +17,10 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
# Figure out where Spark is installed
|
|
||||||
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
|
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
|
||||||
|
|
||||||
source "$SPARK_HOME"/bin/load-spark-env.sh
|
source "$SPARK_HOME"/bin/load-spark-env.sh
|
||||||
|
export _SPARK_CMD_USAGE="Usage: ./bin/pyspark [options]"
|
||||||
function usage() {
|
|
||||||
if [ -n "$1" ]; then
|
|
||||||
echo $1
|
|
||||||
fi
|
|
||||||
echo "Usage: ./bin/pyspark [options]" 1>&2
|
|
||||||
"$SPARK_HOME"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
|
|
||||||
exit $2
|
|
||||||
}
|
|
||||||
export -f usage
|
|
||||||
|
|
||||||
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
|
|
||||||
usage
|
|
||||||
fi
|
|
||||||
|
|
||||||
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
|
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
|
||||||
# executable, while the worker would still be launched using PYSPARK_PYTHON.
|
# executable, while the worker would still be launched using PYSPARK_PYTHON.
|
||||||
|
|
|
@ -21,6 +21,7 @@ rem Figure out where the Spark framework is installed
|
||||||
set SPARK_HOME=%~dp0..
|
set SPARK_HOME=%~dp0..
|
||||||
|
|
||||||
call %SPARK_HOME%\bin\load-spark-env.cmd
|
call %SPARK_HOME%\bin\load-spark-env.cmd
|
||||||
|
set _SPARK_CMD_USAGE=Usage: bin\pyspark.cmd [options]
|
||||||
|
|
||||||
rem Figure out which Python to use.
|
rem Figure out which Python to use.
|
||||||
if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
|
if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
|
||||||
|
|
|
@ -16,18 +16,12 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
set -e
|
|
||||||
|
|
||||||
# Figure out where Spark is installed
|
# Figure out where Spark is installed
|
||||||
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
|
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
|
||||||
|
|
||||||
. "$SPARK_HOME"/bin/load-spark-env.sh
|
. "$SPARK_HOME"/bin/load-spark-env.sh
|
||||||
|
|
||||||
if [ -z "$1" ]; then
|
|
||||||
echo "Usage: spark-class <class> [<args>]" 1>&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Find the java binary
|
# Find the java binary
|
||||||
if [ -n "${JAVA_HOME}" ]; then
|
if [ -n "${JAVA_HOME}" ]; then
|
||||||
RUNNER="${JAVA_HOME}/bin/java"
|
RUNNER="${JAVA_HOME}/bin/java"
|
||||||
|
@ -98,9 +92,4 @@ CMD=()
|
||||||
while IFS= read -d '' -r ARG; do
|
while IFS= read -d '' -r ARG; do
|
||||||
CMD+=("$ARG")
|
CMD+=("$ARG")
|
||||||
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
|
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
|
||||||
|
exec "${CMD[@]}"
|
||||||
if [ "${CMD[0]}" = "usage" ]; then
|
|
||||||
"${CMD[@]}"
|
|
||||||
else
|
|
||||||
exec "${CMD[@]}"
|
|
||||||
fi
|
|
||||||
|
|
|
@ -29,20 +29,7 @@ esac
|
||||||
set -o posix
|
set -o posix
|
||||||
|
|
||||||
export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
|
export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
|
||||||
|
export _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]"
|
||||||
usage() {
|
|
||||||
if [ -n "$1" ]; then
|
|
||||||
echo "$1"
|
|
||||||
fi
|
|
||||||
echo "Usage: ./bin/spark-shell [options]"
|
|
||||||
"$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
|
|
||||||
exit "$2"
|
|
||||||
}
|
|
||||||
export -f usage
|
|
||||||
|
|
||||||
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
|
|
||||||
usage "" 0
|
|
||||||
fi
|
|
||||||
|
|
||||||
# SPARK-4161: scala does not assume use of the java classpath,
|
# SPARK-4161: scala does not assume use of the java classpath,
|
||||||
# so we need to add the "-Dscala.usejavacp=true" flag manually. We
|
# so we need to add the "-Dscala.usejavacp=true" flag manually. We
|
||||||
|
|
|
@ -18,12 +18,7 @@ rem limitations under the License.
|
||||||
rem
|
rem
|
||||||
|
|
||||||
set SPARK_HOME=%~dp0..
|
set SPARK_HOME=%~dp0..
|
||||||
|
set _SPARK_CMD_USAGE=Usage: .\bin\spark-shell.cmd [options]
|
||||||
echo "%*" | findstr " \<--help\> \<-h\>" >nul
|
|
||||||
if %ERRORLEVEL% equ 0 (
|
|
||||||
call :usage
|
|
||||||
exit /b 0
|
|
||||||
)
|
|
||||||
|
|
||||||
rem SPARK-4161: scala does not assume use of the java classpath,
|
rem SPARK-4161: scala does not assume use of the java classpath,
|
||||||
rem so we need to add the "-Dscala.usejavacp=true" flag manually. We
|
rem so we need to add the "-Dscala.usejavacp=true" flag manually. We
|
||||||
|
@ -37,16 +32,4 @@ if "x%SPARK_SUBMIT_OPTS%"=="x" (
|
||||||
set SPARK_SUBMIT_OPTS="%SPARK_SUBMIT_OPTS% -Dscala.usejavacp=true"
|
set SPARK_SUBMIT_OPTS="%SPARK_SUBMIT_OPTS% -Dscala.usejavacp=true"
|
||||||
|
|
||||||
:run_shell
|
:run_shell
|
||||||
call %SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main %*
|
%SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main %*
|
||||||
set SPARK_ERROR_LEVEL=%ERRORLEVEL%
|
|
||||||
if not "x%SPARK_LAUNCHER_USAGE_ERROR%"=="x" (
|
|
||||||
call :usage
|
|
||||||
exit /b 1
|
|
||||||
)
|
|
||||||
exit /b %SPARK_ERROR_LEVEL%
|
|
||||||
|
|
||||||
:usage
|
|
||||||
echo %SPARK_LAUNCHER_USAGE_ERROR%
|
|
||||||
echo "Usage: .\bin\spark-shell.cmd [options]" >&2
|
|
||||||
call %SPARK_HOME%\bin\spark-submit2.cmd --help 2>&1 | findstr /V "Usage" 1>&2
|
|
||||||
goto :eof
|
|
||||||
|
|
|
@ -17,41 +17,6 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
#
|
|
||||||
# Shell script for starting the Spark SQL CLI
|
|
||||||
|
|
||||||
# Enter posix mode for bash
|
|
||||||
set -o posix
|
|
||||||
|
|
||||||
# NOTE: This exact class name is matched downstream by SparkSubmit.
|
|
||||||
# Any changes need to be reflected there.
|
|
||||||
export CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
|
|
||||||
|
|
||||||
# Figure out where Spark is installed
|
|
||||||
export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
|
export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
|
||||||
|
export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
|
||||||
function usage {
|
exec "$FWDIR"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
|
||||||
if [ -n "$1" ]; then
|
|
||||||
echo "$1"
|
|
||||||
fi
|
|
||||||
echo "Usage: ./bin/spark-sql [options] [cli option]"
|
|
||||||
pattern="usage"
|
|
||||||
pattern+="\|Spark assembly has been built with Hive"
|
|
||||||
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
|
|
||||||
pattern+="\|Spark Command: "
|
|
||||||
pattern+="\|--help"
|
|
||||||
pattern+="\|======="
|
|
||||||
|
|
||||||
"$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
|
|
||||||
echo
|
|
||||||
echo "CLI options:"
|
|
||||||
"$FWDIR"/bin/spark-class "$CLASS" --help 2>&1 | grep -v "$pattern" 1>&2
|
|
||||||
exit "$2"
|
|
||||||
}
|
|
||||||
export -f usage
|
|
||||||
|
|
||||||
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
|
|
||||||
usage "" 0
|
|
||||||
fi
|
|
||||||
|
|
||||||
exec "$FWDIR"/bin/spark-submit --class "$CLASS" "$@"
|
|
||||||
|
|
|
@ -22,16 +22,4 @@ SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
|
||||||
# disable randomized hash for string in Python 3.3+
|
# disable randomized hash for string in Python 3.3+
|
||||||
export PYTHONHASHSEED=0
|
export PYTHONHASHSEED=0
|
||||||
|
|
||||||
# Only define a usage function if an upstream script hasn't done so.
|
|
||||||
if ! type -t usage >/dev/null 2>&1; then
|
|
||||||
usage() {
|
|
||||||
if [ -n "$1" ]; then
|
|
||||||
echo "$1"
|
|
||||||
fi
|
|
||||||
"$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit --help
|
|
||||||
exit "$2"
|
|
||||||
}
|
|
||||||
export -f usage
|
|
||||||
fi
|
|
||||||
|
|
||||||
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
|
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
|
||||||
|
|
|
@ -24,15 +24,4 @@ rem disable randomized hash for string in Python 3.3+
|
||||||
set PYTHONHASHSEED=0
|
set PYTHONHASHSEED=0
|
||||||
|
|
||||||
set CLASS=org.apache.spark.deploy.SparkSubmit
|
set CLASS=org.apache.spark.deploy.SparkSubmit
|
||||||
call %~dp0spark-class2.cmd %CLASS% %*
|
%~dp0spark-class2.cmd %CLASS% %*
|
||||||
set SPARK_ERROR_LEVEL=%ERRORLEVEL%
|
|
||||||
if not "x%SPARK_LAUNCHER_USAGE_ERROR%"=="x" (
|
|
||||||
call :usage
|
|
||||||
exit /b 1
|
|
||||||
)
|
|
||||||
exit /b %SPARK_ERROR_LEVEL%
|
|
||||||
|
|
||||||
:usage
|
|
||||||
echo %SPARK_LAUNCHER_USAGE_ERROR%
|
|
||||||
call %SPARK_HOME%\bin\spark-class2.cmd %CLASS% --help
|
|
||||||
goto :eof
|
|
||||||
|
|
18
bin/sparkR
18
bin/sparkR
|
@ -17,23 +17,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
# Figure out where Spark is installed
|
|
||||||
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
|
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
|
||||||
|
|
||||||
source "$SPARK_HOME"/bin/load-spark-env.sh
|
source "$SPARK_HOME"/bin/load-spark-env.sh
|
||||||
|
export _SPARK_CMD_USAGE="Usage: ./bin/sparkR [options]"
|
||||||
function usage() {
|
|
||||||
if [ -n "$1" ]; then
|
|
||||||
echo $1
|
|
||||||
fi
|
|
||||||
echo "Usage: ./bin/sparkR [options]" 1>&2
|
|
||||||
"$SPARK_HOME"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
|
|
||||||
exit $2
|
|
||||||
}
|
|
||||||
export -f usage
|
|
||||||
|
|
||||||
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
|
|
||||||
usage
|
|
||||||
fi
|
|
||||||
|
|
||||||
exec "$SPARK_HOME"/bin/spark-submit sparkr-shell-main "$@"
|
exec "$SPARK_HOME"/bin/spark-submit sparkr-shell-main "$@"
|
||||||
|
|
|
@ -82,13 +82,13 @@ object SparkSubmit {
|
||||||
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
|
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
|
||||||
|
|
||||||
// Exposed for testing
|
// Exposed for testing
|
||||||
private[spark] var exitFn: () => Unit = () => System.exit(1)
|
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
|
||||||
private[spark] var printStream: PrintStream = System.err
|
private[spark] var printStream: PrintStream = System.err
|
||||||
private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
|
private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
|
||||||
private[spark] def printErrorAndExit(str: String): Unit = {
|
private[spark] def printErrorAndExit(str: String): Unit = {
|
||||||
printStream.println("Error: " + str)
|
printStream.println("Error: " + str)
|
||||||
printStream.println("Run with --help for usage help or --verbose for debug output")
|
printStream.println("Run with --help for usage help or --verbose for debug output")
|
||||||
exitFn()
|
exitFn(1)
|
||||||
}
|
}
|
||||||
private[spark] def printVersionAndExit(): Unit = {
|
private[spark] def printVersionAndExit(): Unit = {
|
||||||
printStream.println("""Welcome to
|
printStream.println("""Welcome to
|
||||||
|
@ -99,7 +99,7 @@ object SparkSubmit {
|
||||||
/_/
|
/_/
|
||||||
""".format(SPARK_VERSION))
|
""".format(SPARK_VERSION))
|
||||||
printStream.println("Type --help for more information.")
|
printStream.println("Type --help for more information.")
|
||||||
exitFn()
|
exitFn(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
|
@ -160,7 +160,7 @@ object SparkSubmit {
|
||||||
// detect exceptions with empty stack traces here, and treat them differently.
|
// detect exceptions with empty stack traces here, and treat them differently.
|
||||||
if (e.getStackTrace().length == 0) {
|
if (e.getStackTrace().length == 0) {
|
||||||
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
|
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
|
||||||
exitFn()
|
exitFn(1)
|
||||||
} else {
|
} else {
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
@ -700,7 +700,7 @@ object SparkSubmit {
|
||||||
/**
|
/**
|
||||||
* Return whether the given main class represents a sql shell.
|
* Return whether the given main class represents a sql shell.
|
||||||
*/
|
*/
|
||||||
private def isSqlShell(mainClass: String): Boolean = {
|
private[deploy] def isSqlShell(mainClass: String): Boolean = {
|
||||||
mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
|
mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,12 +17,15 @@
|
||||||
|
|
||||||
package org.apache.spark.deploy
|
package org.apache.spark.deploy
|
||||||
|
|
||||||
|
import java.io.{ByteArrayOutputStream, PrintStream}
|
||||||
|
import java.lang.reflect.InvocationTargetException
|
||||||
import java.net.URI
|
import java.net.URI
|
||||||
import java.util.{List => JList}
|
import java.util.{List => JList}
|
||||||
import java.util.jar.JarFile
|
import java.util.jar.JarFile
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.collection.mutable.{ArrayBuffer, HashMap}
|
import scala.collection.mutable.{ArrayBuffer, HashMap}
|
||||||
|
import scala.io.Source
|
||||||
|
|
||||||
import org.apache.spark.deploy.SparkSubmitAction._
|
import org.apache.spark.deploy.SparkSubmitAction._
|
||||||
import org.apache.spark.launcher.SparkSubmitArgumentsParser
|
import org.apache.spark.launcher.SparkSubmitArgumentsParser
|
||||||
|
@ -412,6 +415,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
||||||
case VERSION =>
|
case VERSION =>
|
||||||
SparkSubmit.printVersionAndExit()
|
SparkSubmit.printVersionAndExit()
|
||||||
|
|
||||||
|
case USAGE_ERROR =>
|
||||||
|
printUsageAndExit(1)
|
||||||
|
|
||||||
case _ =>
|
case _ =>
|
||||||
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
|
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
|
||||||
}
|
}
|
||||||
|
@ -449,11 +455,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
||||||
if (unknownParam != null) {
|
if (unknownParam != null) {
|
||||||
outStream.println("Unknown/unsupported param " + unknownParam)
|
outStream.println("Unknown/unsupported param " + unknownParam)
|
||||||
}
|
}
|
||||||
outStream.println(
|
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
|
||||||
"""Usage: spark-submit [options] <app jar | python file> [app arguments]
|
"""Usage: spark-submit [options] <app jar | python file> [app arguments]
|
||||||
|Usage: spark-submit --kill [submission ID] --master [spark://...]
|
|Usage: spark-submit --kill [submission ID] --master [spark://...]
|
||||||
|Usage: spark-submit --status [submission ID] --master [spark://...]
|
|Usage: spark-submit --status [submission ID] --master [spark://...]""".stripMargin)
|
||||||
|
|
outStream.println(command)
|
||||||
|
|
||||||
|
outStream.println(
|
||||||
|
"""
|
||||||
|Options:
|
|Options:
|
||||||
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
|
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
|
||||||
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
|
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
|
||||||
|
@ -525,6 +534,65 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
||||||
| delegation tokens periodically.
|
| delegation tokens periodically.
|
||||||
""".stripMargin
|
""".stripMargin
|
||||||
)
|
)
|
||||||
SparkSubmit.exitFn()
|
|
||||||
|
if (SparkSubmit.isSqlShell(mainClass)) {
|
||||||
|
outStream.println("CLI options:")
|
||||||
|
outStream.println(getSqlShellOptions())
|
||||||
|
}
|
||||||
|
|
||||||
|
SparkSubmit.exitFn(exitCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run the Spark SQL CLI main class with the "--help" option and catch its output. Then filter
|
||||||
|
* the results to remove unwanted lines.
|
||||||
|
*
|
||||||
|
* Since the CLI will call `System.exit()`, we install a security manager to prevent that call
|
||||||
|
* from working, and restore the original one afterwards.
|
||||||
|
*/
|
||||||
|
private def getSqlShellOptions(): String = {
|
||||||
|
val currentOut = System.out
|
||||||
|
val currentErr = System.err
|
||||||
|
val currentSm = System.getSecurityManager()
|
||||||
|
try {
|
||||||
|
val out = new ByteArrayOutputStream()
|
||||||
|
val stream = new PrintStream(out)
|
||||||
|
System.setOut(stream)
|
||||||
|
System.setErr(stream)
|
||||||
|
|
||||||
|
val sm = new SecurityManager() {
|
||||||
|
override def checkExit(status: Int): Unit = {
|
||||||
|
throw new SecurityException()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def checkPermission(perm: java.security.Permission): Unit = {}
|
||||||
|
}
|
||||||
|
System.setSecurityManager(sm)
|
||||||
|
|
||||||
|
try {
|
||||||
|
Class.forName(mainClass).getMethod("main", classOf[Array[String]])
|
||||||
|
.invoke(null, Array(HELP))
|
||||||
|
} catch {
|
||||||
|
case e: InvocationTargetException =>
|
||||||
|
// Ignore SecurityException, since we throw it above.
|
||||||
|
if (!e.getCause().isInstanceOf[SecurityException]) {
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.flush()
|
||||||
|
|
||||||
|
// Get the output and discard any unnecessary lines from it.
|
||||||
|
Source.fromString(new String(out.toByteArray())).getLines
|
||||||
|
.filter { line =>
|
||||||
|
!line.startsWith("log4j") && !line.startsWith("usage")
|
||||||
|
}
|
||||||
|
.mkString("\n")
|
||||||
|
} finally {
|
||||||
|
System.setSecurityManager(currentSm)
|
||||||
|
System.setOut(currentOut)
|
||||||
|
System.setErr(currentErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ class SparkSubmitSuite
|
||||||
SparkSubmit.printStream = printStream
|
SparkSubmit.printStream = printStream
|
||||||
|
|
||||||
@volatile var exitedCleanly = false
|
@volatile var exitedCleanly = false
|
||||||
SparkSubmit.exitFn = () => exitedCleanly = true
|
SparkSubmit.exitFn = (_) => exitedCleanly = true
|
||||||
|
|
||||||
val thread = new Thread {
|
val thread = new Thread {
|
||||||
override def run() = try {
|
override def run() = try {
|
||||||
|
|
|
@ -53,21 +53,33 @@ class Main {
|
||||||
List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
|
List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
|
||||||
String className = args.remove(0);
|
String className = args.remove(0);
|
||||||
|
|
||||||
boolean printLaunchCommand;
|
boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
|
||||||
boolean printUsage;
|
|
||||||
AbstractCommandBuilder builder;
|
AbstractCommandBuilder builder;
|
||||||
try {
|
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
|
||||||
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
|
try {
|
||||||
builder = new SparkSubmitCommandBuilder(args);
|
builder = new SparkSubmitCommandBuilder(args);
|
||||||
} else {
|
} catch (IllegalArgumentException e) {
|
||||||
builder = new SparkClassCommandBuilder(className, args);
|
printLaunchCommand = false;
|
||||||
|
System.err.println("Error: " + e.getMessage());
|
||||||
|
System.err.println();
|
||||||
|
|
||||||
|
MainClassOptionParser parser = new MainClassOptionParser();
|
||||||
|
try {
|
||||||
|
parser.parse(args);
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
// Ignore parsing exceptions.
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> help = new ArrayList<String>();
|
||||||
|
if (parser.className != null) {
|
||||||
|
help.add(parser.CLASS);
|
||||||
|
help.add(parser.className);
|
||||||
|
}
|
||||||
|
help.add(parser.USAGE_ERROR);
|
||||||
|
builder = new SparkSubmitCommandBuilder(help);
|
||||||
}
|
}
|
||||||
printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
|
} else {
|
||||||
printUsage = false;
|
builder = new SparkClassCommandBuilder(className, args);
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
builder = new UsageCommandBuilder(e.getMessage());
|
|
||||||
printLaunchCommand = false;
|
|
||||||
printUsage = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, String> env = new HashMap<String, String>();
|
Map<String, String> env = new HashMap<String, String>();
|
||||||
|
@ -78,13 +90,7 @@ class Main {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isWindows()) {
|
if (isWindows()) {
|
||||||
// When printing the usage message, we can't use "cmd /v" since that prevents the env
|
System.out.println(prepareWindowsCommand(cmd, env));
|
||||||
// variable from being seen in the caller script. So do not call prepareWindowsCommand().
|
|
||||||
if (printUsage) {
|
|
||||||
System.out.println(join(" ", cmd));
|
|
||||||
} else {
|
|
||||||
System.out.println(prepareWindowsCommand(cmd, env));
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// In bash, use NULL as the arg separator since it cannot be used in an argument.
|
// In bash, use NULL as the arg separator since it cannot be used in an argument.
|
||||||
List<String> bashCmd = prepareBashCommand(cmd, env);
|
List<String> bashCmd = prepareBashCommand(cmd, env);
|
||||||
|
@ -135,33 +141,30 @@ class Main {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal builder used when command line parsing fails. This will behave differently depending
|
* A parser used when command line parsing fails for spark-submit. It's used as a best-effort
|
||||||
* on the platform:
|
* at trying to identify the class the user wanted to invoke, since that may require special
|
||||||
*
|
* usage strings (handled by SparkSubmitArguments).
|
||||||
* - On Unix-like systems, it will print a call to the "usage" function with two arguments: the
|
|
||||||
* the error string, and the exit code to use. The function is expected to print the command's
|
|
||||||
* usage and exit with the provided exit code. The script should use "export -f usage" after
|
|
||||||
* declaring a function called "usage", so that the function is available to downstream scripts.
|
|
||||||
*
|
|
||||||
* - On Windows it will set the variable "SPARK_LAUNCHER_USAGE_ERROR" to the usage error message.
|
|
||||||
* The batch script should check for this variable and print its usage, since batch scripts
|
|
||||||
* don't really support the "export -f" functionality used in bash.
|
|
||||||
*/
|
*/
|
||||||
private static class UsageCommandBuilder extends AbstractCommandBuilder {
|
private static class MainClassOptionParser extends SparkSubmitOptionParser {
|
||||||
|
|
||||||
private final String message;
|
String className;
|
||||||
|
|
||||||
UsageCommandBuilder(String message) {
|
@Override
|
||||||
this.message = message;
|
protected boolean handle(String opt, String value) {
|
||||||
|
if (opt == CLASS) {
|
||||||
|
className = value;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> buildCommand(Map<String, String> env) {
|
protected boolean handleUnknown(String opt) {
|
||||||
if (isWindows()) {
|
return false;
|
||||||
return Arrays.asList("set", "SPARK_LAUNCHER_USAGE_ERROR=" + message);
|
}
|
||||||
} else {
|
|
||||||
return Arrays.asList("usage", message, "1");
|
@Override
|
||||||
}
|
protected void handleExtraArgs(List<String> extra) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private final List<String> sparkArgs;
|
private final List<String> sparkArgs;
|
||||||
|
private final boolean printHelp;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
|
* Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
|
||||||
|
@ -87,10 +88,11 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
|
||||||
|
|
||||||
SparkSubmitCommandBuilder() {
|
SparkSubmitCommandBuilder() {
|
||||||
this.sparkArgs = new ArrayList<String>();
|
this.sparkArgs = new ArrayList<String>();
|
||||||
|
this.printHelp = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
SparkSubmitCommandBuilder(List<String> args) {
|
SparkSubmitCommandBuilder(List<String> args) {
|
||||||
this();
|
this.sparkArgs = new ArrayList<String>();
|
||||||
List<String> submitArgs = args;
|
List<String> submitArgs = args;
|
||||||
if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) {
|
if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) {
|
||||||
this.allowsMixedArguments = true;
|
this.allowsMixedArguments = true;
|
||||||
|
@ -104,14 +106,16 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
|
||||||
this.allowsMixedArguments = false;
|
this.allowsMixedArguments = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
new OptionParser().parse(submitArgs);
|
OptionParser parser = new OptionParser();
|
||||||
|
parser.parse(submitArgs);
|
||||||
|
this.printHelp = parser.helpRequested;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> buildCommand(Map<String, String> env) throws IOException {
|
public List<String> buildCommand(Map<String, String> env) throws IOException {
|
||||||
if (PYSPARK_SHELL_RESOURCE.equals(appResource)) {
|
if (PYSPARK_SHELL_RESOURCE.equals(appResource) && !printHelp) {
|
||||||
return buildPySparkShellCommand(env);
|
return buildPySparkShellCommand(env);
|
||||||
} else if (SPARKR_SHELL_RESOURCE.equals(appResource)) {
|
} else if (SPARKR_SHELL_RESOURCE.equals(appResource) && !printHelp) {
|
||||||
return buildSparkRCommand(env);
|
return buildSparkRCommand(env);
|
||||||
} else {
|
} else {
|
||||||
return buildSparkSubmitCommand(env);
|
return buildSparkSubmitCommand(env);
|
||||||
|
@ -311,6 +315,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
|
||||||
|
|
||||||
private class OptionParser extends SparkSubmitOptionParser {
|
private class OptionParser extends SparkSubmitOptionParser {
|
||||||
|
|
||||||
|
boolean helpRequested = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean handle(String opt, String value) {
|
protected boolean handle(String opt, String value) {
|
||||||
if (opt.equals(MASTER)) {
|
if (opt.equals(MASTER)) {
|
||||||
|
@ -341,6 +347,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
|
||||||
allowsMixedArguments = true;
|
allowsMixedArguments = true;
|
||||||
appResource = specialClasses.get(value);
|
appResource = specialClasses.get(value);
|
||||||
}
|
}
|
||||||
|
} else if (opt.equals(HELP) || opt.equals(USAGE_ERROR)) {
|
||||||
|
helpRequested = true;
|
||||||
|
sparkArgs.add(opt);
|
||||||
} else {
|
} else {
|
||||||
sparkArgs.add(opt);
|
sparkArgs.add(opt);
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
|
@ -360,6 +369,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
|
||||||
appArgs.add(opt);
|
appArgs.add(opt);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
|
checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
|
||||||
sparkArgs.add(opt);
|
sparkArgs.add(opt);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,7 @@ class SparkSubmitOptionParser {
|
||||||
// Options that do not take arguments.
|
// Options that do not take arguments.
|
||||||
protected final String HELP = "--help";
|
protected final String HELP = "--help";
|
||||||
protected final String SUPERVISE = "--supervise";
|
protected final String SUPERVISE = "--supervise";
|
||||||
|
protected final String USAGE_ERROR = "--usage-error";
|
||||||
protected final String VERBOSE = "--verbose";
|
protected final String VERBOSE = "--verbose";
|
||||||
protected final String VERSION = "--version";
|
protected final String VERSION = "--version";
|
||||||
|
|
||||||
|
@ -120,6 +121,7 @@ class SparkSubmitOptionParser {
|
||||||
final String[][] switches = {
|
final String[][] switches = {
|
||||||
{ HELP, "-h" },
|
{ HELP, "-h" },
|
||||||
{ SUPERVISE },
|
{ SUPERVISE },
|
||||||
|
{ USAGE_ERROR },
|
||||||
{ VERBOSE, "-v" },
|
{ VERBOSE, "-v" },
|
||||||
{ VERSION },
|
{ VERSION },
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue