[SPARK-6890] [core] Fix launcher lib work with SPARK_PREPEND_CLASSES.
The fix for SPARK-6406 broke the case where sub-processes are launched when SPARK_PREPEND_CLASSES is set, because the code now would only add the launcher's build directory to the sub-process's classpath instead of the complete assembly. This patch fixes the problem by having the launch scripts stash the assembly's location in an environment variable. This is not the prettiest solution, but it avoids having to plumb that location all the way through the Worker code that launches executors. The env variable is always set by the launch scripts, so users cannot override it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5504 from vanzin/SPARK-6890 and squashes the following commits: 7aec921 [Marcelo Vanzin] Fix tests. ff87a60 [Marcelo Vanzin] Merge branch 'master' into SPARK-6890 31d3ce8 [Marcelo Vanzin] [SPARK-6890] [core] Fix launcher lib work with SPARK_PREPEND_CLASSES.
This commit is contained in:
parent
6de282e2de
commit
9717389365
|
@ -82,13 +82,22 @@ if [ $(command -v "$JAR_CMD") ] ; then
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
|
||||||
|
|
||||||
|
# Add the launcher build dir to the classpath if requested.
|
||||||
|
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
|
||||||
|
LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
|
||||||
|
fi
|
||||||
|
|
||||||
|
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
|
||||||
|
|
||||||
# The launcher library will print arguments separated by a NULL character, to allow arguments with
|
# The launcher library will print arguments separated by a NULL character, to allow arguments with
|
||||||
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
|
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
|
||||||
# an array that will be used to exec the final command.
|
# an array that will be used to exec the final command.
|
||||||
CMD=()
|
CMD=()
|
||||||
while IFS= read -d '' -r ARG; do
|
while IFS= read -d '' -r ARG; do
|
||||||
CMD+=("$ARG")
|
CMD+=("$ARG")
|
||||||
done < <("$RUNNER" -cp "$SPARK_ASSEMBLY_JAR" org.apache.spark.launcher.Main "$@")
|
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
|
||||||
|
|
||||||
if [ "${CMD[0]}" = "usage" ]; then
|
if [ "${CMD[0]}" = "usage" ]; then
|
||||||
"${CMD[@]}"
|
"${CMD[@]}"
|
||||||
|
|
|
@ -46,13 +46,22 @@ if "%SPARK_ASSEMBLY_JAR%"=="0" (
|
||||||
exit /b 1
|
exit /b 1
|
||||||
)
|
)
|
||||||
|
|
||||||
|
set LAUNCH_CLASSPATH=%SPARK_ASSEMBLY_JAR%
|
||||||
|
|
||||||
|
rem Add the launcher build dir to the classpath if requested.
|
||||||
|
if not "x%SPARK_PREPEND_CLASSES%"=="x" (
|
||||||
|
set LAUNCH_CLASSPATH=%SPARK_HOME%\launcher\target\scala-%SPARK_SCALA_VERSION%\classes;%LAUNCH_CLASSPATH%
|
||||||
|
)
|
||||||
|
|
||||||
|
set _SPARK_ASSEMBLY=%SPARK_ASSEMBLY_JAR%
|
||||||
|
|
||||||
rem Figure out where java is.
|
rem Figure out where java is.
|
||||||
set RUNNER=java
|
set RUNNER=java
|
||||||
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
|
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
|
||||||
|
|
||||||
rem The launcher library prints the command to be executed in a single line suitable for being
|
rem The launcher library prints the command to be executed in a single line suitable for being
|
||||||
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
|
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
|
||||||
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %SPARK_ASSEMBLY_JAR% org.apache.spark.launcher.Main %*"') do (
|
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
|
||||||
set SPARK_CMD=%%i
|
set SPARK_CMD=%%i
|
||||||
)
|
)
|
||||||
%SPARK_CMD%
|
%SPARK_CMD%
|
||||||
|
|
|
@ -186,8 +186,20 @@ abstract class AbstractCommandBuilder {
|
||||||
addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
|
addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
|
||||||
}
|
}
|
||||||
|
|
||||||
final String assembly = AbstractCommandBuilder.class.getProtectionDomain().getCodeSource().
|
// We can't rely on the ENV_SPARK_ASSEMBLY variable to be set. Certain situations, such as
|
||||||
getLocation().getPath();
|
// when running unit tests, or user code that embeds Spark and creates a SparkContext
|
||||||
|
// with a local or local-cluster master, will cause this code to be called from an
|
||||||
|
// environment where that env variable is not guaranteed to exist.
|
||||||
|
//
|
||||||
|
// For the testing case, we rely on the test code to set and propagate the test classpath
|
||||||
|
// appropriately.
|
||||||
|
//
|
||||||
|
// For the user code case, we fall back to looking for the Spark assembly under SPARK_HOME.
|
||||||
|
// That duplicates some of the code in the shell scripts that look for the assembly, though.
|
||||||
|
String assembly = getenv(ENV_SPARK_ASSEMBLY);
|
||||||
|
if (assembly == null && isEmpty(getenv("SPARK_TESTING"))) {
|
||||||
|
assembly = findAssembly();
|
||||||
|
}
|
||||||
addToClassPath(cp, assembly);
|
addToClassPath(cp, assembly);
|
||||||
|
|
||||||
// Datanucleus jars must be included on the classpath. Datanucleus jars do not work if only
|
// Datanucleus jars must be included on the classpath. Datanucleus jars do not work if only
|
||||||
|
@ -299,6 +311,30 @@ abstract class AbstractCommandBuilder {
|
||||||
return firstNonEmpty(childEnv.get(key), System.getenv(key));
|
return firstNonEmpty(childEnv.get(key), System.getenv(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String findAssembly() {
|
||||||
|
String sparkHome = getSparkHome();
|
||||||
|
File libdir;
|
||||||
|
if (new File(sparkHome, "RELEASE").isFile()) {
|
||||||
|
libdir = new File(sparkHome, "lib");
|
||||||
|
checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
|
||||||
|
libdir.getAbsolutePath());
|
||||||
|
} else {
|
||||||
|
libdir = new File(sparkHome, String.format("assembly/target/scala-%s", getScalaVersion()));
|
||||||
|
}
|
||||||
|
|
||||||
|
final Pattern re = Pattern.compile("spark-assembly.*hadoop.*\\.jar");
|
||||||
|
FileFilter filter = new FileFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(File file) {
|
||||||
|
return file.isFile() && re.matcher(file.getName()).matches();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
File[] assemblies = libdir.listFiles(filter);
|
||||||
|
checkState(assemblies != null && assemblies.length > 0, "No assemblies found in '%s'.", libdir);
|
||||||
|
checkState(assemblies.length == 1, "Multiple assemblies found in '%s'.", libdir);
|
||||||
|
return assemblies[0].getAbsolutePath();
|
||||||
|
}
|
||||||
|
|
||||||
private String getConfDir() {
|
private String getConfDir() {
|
||||||
String confDir = getenv("SPARK_CONF_DIR");
|
String confDir = getenv("SPARK_CONF_DIR");
|
||||||
return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
|
return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
|
||||||
|
|
|
@ -30,6 +30,7 @@ class CommandBuilderUtils {
|
||||||
static final String DEFAULT_MEM = "512m";
|
static final String DEFAULT_MEM = "512m";
|
||||||
static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf";
|
static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf";
|
||||||
static final String ENV_SPARK_HOME = "SPARK_HOME";
|
static final String ENV_SPARK_HOME = "SPARK_HOME";
|
||||||
|
static final String ENV_SPARK_ASSEMBLY = "_SPARK_ASSEMBLY";
|
||||||
|
|
||||||
/** Returns whether the given string is null or empty. */
|
/** Returns whether the given string is null or empty. */
|
||||||
static boolean isEmpty(String s) {
|
static boolean isEmpty(String s) {
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class SparkSubmitCommandBuilderSuite {
|
||||||
parser.NAME,
|
parser.NAME,
|
||||||
"appName");
|
"appName");
|
||||||
|
|
||||||
List<String> args = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
|
List<String> args = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
|
||||||
List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
|
List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
|
||||||
assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
|
assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
|
||||||
}
|
}
|
||||||
|
@ -110,7 +110,7 @@ public class SparkSubmitCommandBuilderSuite {
|
||||||
parser.MASTER + "=foo",
|
parser.MASTER + "=foo",
|
||||||
parser.DEPLOY_MODE + "=bar");
|
parser.DEPLOY_MODE + "=bar");
|
||||||
|
|
||||||
List<String> cmd = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
|
List<String> cmd = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
|
||||||
assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
|
assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
|
||||||
assertEquals("foo", findArgValue(cmd, parser.MASTER));
|
assertEquals("foo", findArgValue(cmd, parser.MASTER));
|
||||||
assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
|
assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
|
||||||
|
@ -153,7 +153,7 @@ public class SparkSubmitCommandBuilderSuite {
|
||||||
String deployMode = isDriver ? "client" : "cluster";
|
String deployMode = isDriver ? "client" : "cluster";
|
||||||
|
|
||||||
SparkSubmitCommandBuilder launcher =
|
SparkSubmitCommandBuilder launcher =
|
||||||
new SparkSubmitCommandBuilder(Collections.<String>emptyList());
|
newCommandBuilder(Collections.<String>emptyList());
|
||||||
launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
|
launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
|
||||||
System.getProperty("spark.test.home"));
|
System.getProperty("spark.test.home"));
|
||||||
launcher.master = "yarn";
|
launcher.master = "yarn";
|
||||||
|
@ -273,10 +273,15 @@ public class SparkSubmitCommandBuilderSuite {
|
||||||
return contains(needle, list.split(sep));
|
return contains(needle, list.split(sep));
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
|
private SparkSubmitCommandBuilder newCommandBuilder(List<String> args) {
|
||||||
SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
|
SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
|
||||||
builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
|
builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
|
||||||
return builder.buildCommand(env);
|
builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_ASSEMBLY, "dummy");
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
|
||||||
|
return newCommandBuilder(args).buildCommand(env);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue