[SPARK-11035][CORE] Add in-process Spark app launcher.

This change adds a new launcher that allows applications to be run
in a separate thread in the same process as the calling code. To
achieve that, some code from the child process implementation was
moved to abstract classes that implement the common functionality,
and the new launcher inherits from those.

The new launcher was added as a new class, instead of implemented
as a new option to the existing SparkLauncher, to avoid ambigous
APIs. For example, SparkLauncher has ways to set the child app's
environment, modify SPARK_HOME, or control the logging of the
child process, none of which apply to in-process apps.

The in-process launcher has limitations: it needs Spark in the
context class loader of the calling thread, and it's bound by
Spark's current limitation of a single client-mode application
per JVM. It also relies on the recently added SparkApplication
trait to make sure different apps don't mess up each other's
configuration, so config isolation is currently limited to cluster mode.

I also chose to keep the same socket-based communication for in-process
apps, even though it might be possible to avoid it for in-process
mode. That helps both implementations share more code.

Tested with new and existing unit tests, and with a simple app that
uses the launcher; also made sure the app ran fine with older launcher
jar to check binary compatibility.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19591 from vanzin/SPARK-11035.
This commit is contained in:
Marcelo Vanzin 2017-12-28 17:00:49 -06:00 committed by Imran Rashid
parent 613b71a123
commit cfcd746689
21 changed files with 1139 additions and 505 deletions

View file

@ -351,6 +351,14 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId> <artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<!-- <!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
them will yield errors. them will yield errors.

View file

@ -19,7 +19,7 @@ package org.apache.spark.launcher
import java.net.{InetAddress, Socket} import java.net.{InetAddress, Socket}
import org.apache.spark.SPARK_VERSION import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.launcher.LauncherProtocol._ import org.apache.spark.launcher.LauncherProtocol._
import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.{ThreadUtils, Utils}
@ -36,9 +36,14 @@ private[spark] abstract class LauncherBackend {
private var lastState: SparkAppHandle.State = _ private var lastState: SparkAppHandle.State = _
@volatile private var _isConnected = false @volatile private var _isConnected = false
protected def conf: SparkConf
def connect(): Unit = { def connect(): Unit = {
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt) val port = conf.getOption(LauncherProtocol.CONF_LAUNCHER_PORT)
val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET) .orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT))
.map(_.toInt)
val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
if (port != None && secret != None) { if (port != None && secret != None) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get) val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s) connection = new BackendConnection(s)

View file

@ -45,6 +45,7 @@ private[spark] class StandaloneSchedulerBackend(
private var client: StandaloneAppClient = null private var client: StandaloneAppClient = null
private val stopping = new AtomicBoolean(false) private val stopping = new AtomicBoolean(false)
private val launcherBackend = new LauncherBackend() { private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sc.conf
override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
} }

View file

@ -105,6 +105,7 @@ private[spark] class LocalSchedulerBackend(
private val userClassPath = getUserClasspath(conf) private val userClassPath = getUserClasspath(conf)
private val listenerBus = scheduler.sc.listenerBus private val listenerBus = scheduler.sc.listenerBus
private val launcherBackend = new LauncherBackend() { private val launcherBackend = new LauncherBackend() {
override def conf: SparkConf = LocalSchedulerBackend.this.conf
override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
} }

View file

@ -18,30 +18,26 @@
package org.apache.spark.launcher; package org.apache.spark.launcher;
import java.util.Arrays; import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.junit.Assume.*; import static org.junit.Assume.*;
import static org.mockito.Mockito.*;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.config.package$; import org.apache.spark.internal.config.package$;
import org.apache.spark.util.Utils; import org.apache.spark.util.Utils;
/** /**
* These tests require the Spark assembly to be built before they can be run. * These tests require the Spark assembly to be built before they can be run.
*/ */
public class SparkLauncherSuite { public class SparkLauncherSuite extends BaseSuite {
static {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
}
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d"); private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
private final SparkLauncher launcher = new SparkLauncher(); private final SparkLauncher launcher = new SparkLauncher();
@ -123,6 +119,50 @@ public class SparkLauncherSuite {
assertEquals(0, app.waitFor()); assertEquals(0, app.waitFor());
} }
@Test
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
// system properties after this test runs.
Map<Object, Object> properties = new HashMap<>(System.getProperties());
try {
inProcessLauncherTestImpl();
} finally {
Properties p = new Properties();
for (Map.Entry<Object, Object> e : properties.entrySet()) {
p.put(e.getKey(), e.getValue());
}
System.setProperties(p);
}
}
private void inProcessLauncherTestImpl() throws Exception {
final List<SparkAppHandle.State> transitions = new ArrayList<>();
SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
doAnswer(invocation -> {
SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
transitions.add(h.getState());
return null;
}).when(listener).stateChanged(any(SparkAppHandle.class));
SparkAppHandle handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);
waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
}
public static class SparkLauncherTestApp { public static class SparkLauncherTestApp {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -134,4 +174,14 @@ public class SparkLauncherSuite {
} }
public static class InProcessTestApp {
public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
new SparkContext().stop();
}
}
} }

View file

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
abstract class AbstractAppHandle implements SparkAppHandle {
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private final LauncherServer server;
private LauncherConnection connection;
private List<Listener> listeners;
private State state;
private String appId;
private boolean disposed;
protected AbstractAppHandle(LauncherServer server) {
this.server = server;
this.state = State.UNKNOWN;
}
@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(l);
}
@Override
public State getState() {
return state;
}
@Override
public String getAppId() {
return appId;
}
@Override
public void stop() {
CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
try {
connection.send(new LauncherProtocol.Stop());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
}
}
void setConnection(LauncherConnection connection) {
this.connection = connection;
}
LauncherConnection getConnection() {
return connection;
}
boolean isDisposed() {
return disposed;
}
void setState(State s) {
setState(s, false);
}
synchronized void setState(State s, boolean force) {
if (force || !state.isFinal()) {
state = s;
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { state, s });
}
}
synchronized void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}
private void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
}
}
}
}

View file

@ -0,0 +1,307 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
/**
* Base class for launcher implementations.
*
* @since Spark 2.3.0
*/
public abstract class AbstractLauncher<T extends AbstractLauncher> {
final SparkSubmitCommandBuilder builder;
AbstractLauncher() {
this.builder = new SparkSubmitCommandBuilder();
}
/**
* Set a custom properties file with Spark configuration for the application.
*
* @param path Path to custom properties file to use.
* @return This launcher.
*/
public T setPropertiesFile(String path) {
checkNotNull(path, "path");
builder.setPropertiesFile(path);
return self();
}
/**
* Set a single configuration value for the application.
*
* @param key Configuration key.
* @param value The value to use.
* @return This launcher.
*/
public T setConf(String key, String value) {
checkNotNull(key, "key");
checkNotNull(value, "value");
checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
builder.conf.put(key, value);
return self();
}
/**
* Set the application name.
*
* @param appName Application name.
* @return This launcher.
*/
public T setAppName(String appName) {
checkNotNull(appName, "appName");
builder.appName = appName;
return self();
}
/**
* Set the Spark master for the application.
*
* @param master Spark master.
* @return This launcher.
*/
public T setMaster(String master) {
checkNotNull(master, "master");
builder.master = master;
return self();
}
/**
* Set the deploy mode for the application.
*
* @param mode Deploy mode.
* @return This launcher.
*/
public T setDeployMode(String mode) {
checkNotNull(mode, "mode");
builder.deployMode = mode;
return self();
}
/**
* Set the main application resource. This should be the location of a jar file for Scala/Java
* applications, or a python script for PySpark applications.
*
* @param resource Path to the main application resource.
* @return This launcher.
*/
public T setAppResource(String resource) {
checkNotNull(resource, "resource");
builder.appResource = resource;
return self();
}
/**
* Sets the application class name for Java/Scala applications.
*
* @param mainClass Application's main class.
* @return This launcher.
*/
public T setMainClass(String mainClass) {
checkNotNull(mainClass, "mainClass");
builder.mainClass = mainClass;
return self();
}
/**
* Adds a no-value argument to the Spark invocation. If the argument is known, this method
* validates whether the argument is indeed a no-value argument, and throws an exception
* otherwise.
* <p>
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
* @since 1.5.0
* @param arg Argument to add.
* @return This launcher.
*/
public T addSparkArg(String arg) {
SparkSubmitOptionParser validator = new ArgumentValidator(false);
validator.parse(Arrays.asList(arg));
builder.sparkArgs.add(arg);
return self();
}
/**
* Adds an argument with a value to the Spark invocation. If the argument name corresponds to
* a known argument, the code validates that the argument actually expects a value, and throws
* an exception otherwise.
* <p>
* It is safe to add arguments modified by other methods in this class (such as
* {@link #setMaster(String)} - the last invocation will be the one to take effect.
* <p>
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
* @since 1.5.0
* @param name Name of argument to add.
* @param value Value of the argument.
* @return This launcher.
*/
public T addSparkArg(String name, String value) {
SparkSubmitOptionParser validator = new ArgumentValidator(true);
if (validator.MASTER.equals(name)) {
setMaster(value);
} else if (validator.PROPERTIES_FILE.equals(name)) {
setPropertiesFile(value);
} else if (validator.CONF.equals(name)) {
String[] vals = value.split("=", 2);
setConf(vals[0], vals[1]);
} else if (validator.CLASS.equals(name)) {
setMainClass(value);
} else if (validator.JARS.equals(name)) {
builder.jars.clear();
for (String jar : value.split(",")) {
addJar(jar);
}
} else if (validator.FILES.equals(name)) {
builder.files.clear();
for (String file : value.split(",")) {
addFile(file);
}
} else if (validator.PY_FILES.equals(name)) {
builder.pyFiles.clear();
for (String file : value.split(",")) {
addPyFile(file);
}
} else {
validator.parse(Arrays.asList(name, value));
builder.sparkArgs.add(name);
builder.sparkArgs.add(value);
}
return self();
}
/**
* Adds command line arguments for the application.
*
* @param args Arguments to pass to the application's main class.
* @return This launcher.
*/
public T addAppArgs(String... args) {
for (String arg : args) {
checkNotNull(arg, "arg");
builder.appArgs.add(arg);
}
return self();
}
/**
* Adds a jar file to be submitted with the application.
*
* @param jar Path to the jar file.
* @return This launcher.
*/
public T addJar(String jar) {
checkNotNull(jar, "jar");
builder.jars.add(jar);
return self();
}
/**
* Adds a file to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
*/
public T addFile(String file) {
checkNotNull(file, "file");
builder.files.add(file);
return self();
}
/**
* Adds a python file / zip / egg to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
*/
public T addPyFile(String file) {
checkNotNull(file, "file");
builder.pyFiles.add(file);
return self();
}
/**
* Enables verbose reporting for SparkSubmit.
*
* @param verbose Whether to enable verbose output.
* @return This launcher.
*/
public T setVerbose(boolean verbose) {
builder.verbose = verbose;
return self();
}
/**
* Starts a Spark application.
*
* <p>
* This method returns a handle that provides information about the running application and can
* be used to do basic interaction with it.
* <p>
* The returned handle assumes that the application will instantiate a single SparkContext
* during its lifetime. Once that context reports a final state (one that indicates the
* SparkContext has stopped), the handle will not perform new state transitions, so anything
* that happens after that cannot be monitored. If the underlying application is launched as
* a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
*
* @since 1.6.0
* @param listeners Listeners to add to the handle before the app is launched.
* @return A handle for the launched application.
*/
public abstract SparkAppHandle startApplication(SparkAppHandle.Listener... listeners)
throws IOException;
abstract T self();
private static class ArgumentValidator extends SparkSubmitOptionParser {
private final boolean hasValue;
ArgumentValidator(boolean hasValue) {
this.hasValue = hasValue;
}
@Override
protected boolean handle(String opt, String value) {
if (value == null && hasValue) {
throw new IllegalArgumentException(String.format("'%s' expects a value.", opt));
}
return true;
}
@Override
protected boolean handleUnknown(String opt) {
// Do not fail on unknown arguments, to support future arguments added to SparkSubmit.
return true;
}
protected void handleExtraArgs(List<String> extra) {
// No op.
}
}
}

View file

@ -17,77 +17,29 @@
package org.apache.spark.launcher; package org.apache.spark.launcher;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
/** /**
* Handle implementation for monitoring apps started as a child process. * Handle implementation for monitoring apps started as a child process.
*/ */
class ChildProcAppHandle implements SparkAppHandle { class ChildProcAppHandle extends AbstractAppHandle {
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private final String secret;
private final LauncherServer server;
private volatile Process childProc; private volatile Process childProc;
private boolean disposed;
private LauncherConnection connection;
private List<Listener> listeners;
private State state;
private String appId;
private OutputRedirector redirector; private OutputRedirector redirector;
ChildProcAppHandle(String secret, LauncherServer server) { ChildProcAppHandle(LauncherServer server) {
this.secret = secret; super(server);
this.server = server;
this.state = State.UNKNOWN;
}
@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(l);
}
@Override
public State getState() {
return state;
}
@Override
public String getAppId() {
return appId;
}
@Override
public void stop() {
CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
try {
connection.send(new LauncherProtocol.Stop());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} }
@Override @Override
public synchronized void disconnect() { public synchronized void disconnect() {
if (!disposed) { try {
disposed = true; super.disconnect();
if (connection != null) { } finally {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
if (redirector != null) { if (redirector != null) {
redirector.stop(); redirector.stop();
} }
@ -106,10 +58,6 @@ class ChildProcAppHandle implements SparkAppHandle {
setState(State.KILLED); setState(State.KILLED);
} }
String getSecret() {
return secret;
}
void setChildProc(Process childProc, String loggerName, InputStream logStream) { void setChildProc(Process childProc, String loggerName, InputStream logStream) {
this.childProc = childProc; this.childProc = childProc;
if (logStream != null) { if (logStream != null) {
@ -118,39 +66,10 @@ class ChildProcAppHandle implements SparkAppHandle {
} else { } else {
// If there is no log redirection, spawn a thread that will wait for the child process // If there is no log redirection, spawn a thread that will wait for the child process
// to finish. // to finish.
Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild); SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild).start();
waiter.setDaemon(true);
waiter.start();
} }
} }
void setConnection(LauncherConnection connection) {
this.connection = connection;
}
LauncherServer getServer() {
return server;
}
LauncherConnection getConnection() {
return connection;
}
synchronized void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { state, s });
}
}
synchronized void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}
/** /**
* Wait for the child process to exit and update the handle's state if necessary, accoding to * Wait for the child process to exit and update the handle's state if necessary, accoding to
* the exit code. * the exit code.
@ -171,7 +90,7 @@ class ChildProcAppHandle implements SparkAppHandle {
} }
synchronized (this) { synchronized (this) {
if (disposed) { if (isDisposed()) {
return; return;
} }
@ -185,31 +104,19 @@ class ChildProcAppHandle implements SparkAppHandle {
ec = 1; ec = 1;
} }
State currState = getState();
State newState = null; State newState = null;
if (ec != 0) { if (ec != 0) {
// Override state with failure if the current state is not final, or is success. // Override state with failure if the current state is not final, or is success.
if (!state.isFinal() || state == State.FINISHED) { if (!currState.isFinal() || currState == State.FINISHED) {
newState = State.FAILED; newState = State.FAILED;
} }
} else if (!state.isFinal()) { } else if (!currState.isFinal()) {
newState = State.LOST; newState = State.LOST;
} }
if (newState != null) { if (newState != null) {
state = newState; setState(newState, true);
fireEvent(false);
}
}
}
private void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
} }
} }
} }

View file

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
class InProcessAppHandle extends AbstractAppHandle {
private static final String THREAD_NAME_FMT = "spark-app-%d: '%s'";
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private static final AtomicLong THREAD_IDS = new AtomicLong();
// Avoid really long thread names.
private static final int MAX_APP_NAME_LEN = 16;
private Thread app;
InProcessAppHandle(LauncherServer server) {
super(server);
}
@Override
public synchronized void kill() {
LOG.warning("kill() may leave the underlying app running in in-process mode.");
disconnect();
// Interrupt the thread. This is not guaranteed to kill the app, though.
if (app != null) {
app.interrupt();
}
setState(State.KILLED);
}
synchronized void start(String appName, Method main, String[] args) {
CommandBuilderUtils.checkState(app == null, "Handle already started.");
if (appName.length() > MAX_APP_NAME_LEN) {
appName = "..." + appName.substring(appName.length() - MAX_APP_NAME_LEN);
}
app = new Thread(() -> {
try {
main.invoke(null, (Object) args);
} catch (Throwable t) {
LOG.log(Level.WARNING, "Application failed with exception.", t);
setState(State.FAILED);
}
synchronized (InProcessAppHandle.this) {
if (!isDisposed()) {
disconnect();
if (!getState().isFinal()) {
setState(State.LOST, true);
}
}
}
});
app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));
app.start();
}
}

View file

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.List;
import java.util.logging.Logger;
/**
* In-process launcher for Spark applications.
* <p>
* Use this class to start Spark applications programmatically. Applications launched using this
* class will run in the same process as the caller.
* <p>
* Because Spark only supports a single active instance of <code>SparkContext</code> per JVM, code
* that uses this class should be careful about which applications are launched. It's recommended
* that this launcher only be used to launch applications in cluster mode.
* <p>
* Also note that, when running applications in client mode, JVM-related configurations (like
* driver memory or configs which modify the driver's class path) do not take effect. Logging
* configuration is also inherited from the parent application.
*
* @since Spark 2.3.0
*/
public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {
private static final Logger LOG = Logger.getLogger(InProcessLauncher.class.getName());
/**
* Starts a Spark application.
*
* @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
* @param listeners Listeners to add to the handle before the app is launched.
* @return A handle for the launched application.
*/
@Override
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
if (builder.isClientMode(builder.getEffectiveConfig())) {
LOG.warning("It's not recommended to run client-mode applications using InProcessLauncher.");
}
Method main = findSparkSubmit();
LauncherServer server = LauncherServer.getOrCreateServer();
InProcessAppHandle handle = new InProcessAppHandle(server);
for (SparkAppHandle.Listener l : listeners) {
handle.addListener(l);
}
String secret = server.registerHandle(handle);
setConf(LauncherProtocol.CONF_LAUNCHER_PORT, String.valueOf(server.getPort()));
setConf(LauncherProtocol.CONF_LAUNCHER_SECRET, secret);
List<String> sparkArgs = builder.buildSparkSubmitArgs();
String[] argv = sparkArgs.toArray(new String[sparkArgs.size()]);
String appName = CommandBuilderUtils.firstNonEmpty(builder.appName, builder.mainClass,
"<unknown>");
handle.start(appName, main, argv);
return handle;
}
@Override
InProcessLauncher self() {
return this;
}
// Visible for testing.
Method findSparkSubmit() throws IOException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = getClass().getClassLoader();
}
Class<?> sparkSubmit;
try {
sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
} catch (Exception e) {
throw new IOException("Cannot find SparkSubmit; make sure necessary jars are available.", e);
}
Method main;
try {
main = sparkSubmit.getMethod("main", String[].class);
} catch (Exception e) {
throw new IOException("Cannot find SparkSubmit main method.", e);
}
CommandBuilderUtils.checkState(Modifier.isStatic(main.getModifiers()),
"main method is not static.");
return main;
}
}

View file

@ -32,6 +32,12 @@ final class LauncherProtocol {
/** Environment variable where the secret for connecting back to the server is stored. */ /** Environment variable where the secret for connecting back to the server is stored. */
static final String ENV_LAUNCHER_SECRET = "_SPARK_LAUNCHER_SECRET"; static final String ENV_LAUNCHER_SECRET = "_SPARK_LAUNCHER_SECRET";
/** Spark conf key used to propagate the server port for in-process launches. */
static final String CONF_LAUNCHER_PORT = "spark.launcher.port";
/** Spark conf key used to propagate the app secret for in-process launches. */
static final String CONF_LAUNCHER_SECRET = "spark.launcher.secret";
static class Message implements Serializable { static class Message implements Serializable {
} }

View file

@ -26,6 +26,7 @@ import java.net.Socket;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -88,31 +89,25 @@ class LauncherServer implements Closeable {
private static volatile LauncherServer serverInstance; private static volatile LauncherServer serverInstance;
/** static synchronized LauncherServer getOrCreateServer() throws IOException {
* Creates a handle for an app to be launched. This method will start a server if one hasn't been LauncherServer server;
* started yet. The server is shared for multiple handles, and once all handles are disposed of, do {
* the server is shut down. server = serverInstance != null ? serverInstance : new LauncherServer();
*/ } while (!server.running);
static synchronized ChildProcAppHandle newAppHandle() throws IOException {
LauncherServer server = serverInstance != null ? serverInstance : new LauncherServer();
server.ref(); server.ref();
serverInstance = server; serverInstance = server;
return server;
String secret = server.createSecret();
while (server.pending.containsKey(secret)) {
secret = server.createSecret();
}
return server.newAppHandle(secret);
} }
static LauncherServer getServerInstance() { // For testing.
static synchronized LauncherServer getServer() {
return serverInstance; return serverInstance;
} }
private final AtomicLong refCount; private final AtomicLong refCount;
private final AtomicLong threadIds; private final AtomicLong threadIds;
private final ConcurrentMap<String, ChildProcAppHandle> pending; private final ConcurrentMap<String, AbstractAppHandle> secretToPendingApps;
private final List<ServerConnection> clients; private final List<ServerConnection> clients;
private final ServerSocket server; private final ServerSocket server;
private final Thread serverThread; private final Thread serverThread;
@ -132,7 +127,7 @@ class LauncherServer implements Closeable {
this.clients = new ArrayList<>(); this.clients = new ArrayList<>();
this.threadIds = new AtomicLong(); this.threadIds = new AtomicLong();
this.factory = new NamedThreadFactory(THREAD_NAME_FMT); this.factory = new NamedThreadFactory(THREAD_NAME_FMT);
this.pending = new ConcurrentHashMap<>(); this.secretToPendingApps = new ConcurrentHashMap<>();
this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true); this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true);
this.server = server; this.server = server;
this.running = true; this.running = true;
@ -149,32 +144,38 @@ class LauncherServer implements Closeable {
} }
/** /**
* Creates a new app handle. The handle will wait for an incoming connection for a configurable * Registers a handle with the server, and returns the secret the child app needs to connect
* amount of time, and if one doesn't arrive, it will transition to an error state. * back.
*/ */
ChildProcAppHandle newAppHandle(String secret) { synchronized String registerHandle(AbstractAppHandle handle) {
ChildProcAppHandle handle = new ChildProcAppHandle(secret, this); String secret = createSecret();
ChildProcAppHandle existing = pending.putIfAbsent(secret, handle); secretToPendingApps.put(secret, handle);
CommandBuilderUtils.checkState(existing == null, "Multiple handles with the same secret."); return secret;
return handle;
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
synchronized (this) { synchronized (this) {
if (running) { if (!running) {
running = false; return;
timeoutTimer.cancel(); }
server.close(); running = false;
synchronized (clients) { }
List<ServerConnection> copy = new ArrayList<>(clients);
clients.clear(); synchronized(LauncherServer.class) {
for (ServerConnection client : copy) { serverInstance = null;
client.close(); }
}
} timeoutTimer.cancel();
server.close();
synchronized (clients) {
List<ServerConnection> copy = new ArrayList<>(clients);
clients.clear();
for (ServerConnection client : copy) {
client.close();
} }
} }
if (serverThread != null) { if (serverThread != null) {
try { try {
serverThread.join(); serverThread.join();
@ -195,8 +196,6 @@ class LauncherServer implements Closeable {
close(); close();
} catch (IOException ioe) { } catch (IOException ioe) {
// no-op. // no-op.
} finally {
serverInstance = null;
} }
} }
} }
@ -210,8 +209,14 @@ class LauncherServer implements Closeable {
* Removes the client handle from the pending list (in case it's still there), and unrefs * Removes the client handle from the pending list (in case it's still there), and unrefs
* the server. * the server.
*/ */
void unregister(ChildProcAppHandle handle) { void unregister(AbstractAppHandle handle) {
pending.remove(handle.getSecret()); for (Map.Entry<String, AbstractAppHandle> e : secretToPendingApps.entrySet()) {
if (e.getValue().equals(handle)) {
String secret = e.getKey();
secretToPendingApps.remove(secret);
break;
}
}
unref(); unref();
} }
@ -260,24 +265,30 @@ class LauncherServer implements Closeable {
} }
private String createSecret() { private String createSecret() {
byte[] secret = new byte[128]; while (true) {
RND.nextBytes(secret); byte[] secret = new byte[128];
RND.nextBytes(secret);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (byte b : secret) { for (byte b : secret) {
int ival = b >= 0 ? b : Byte.MAX_VALUE - b; int ival = b >= 0 ? b : Byte.MAX_VALUE - b;
if (ival < 0x10) { if (ival < 0x10) {
sb.append("0"); sb.append("0");
}
sb.append(Integer.toHexString(ival));
}
String secretStr = sb.toString();
if (!secretToPendingApps.containsKey(secretStr)) {
return secretStr;
} }
sb.append(Integer.toHexString(ival));
} }
return sb.toString();
} }
private class ServerConnection extends LauncherConnection { private class ServerConnection extends LauncherConnection {
private TimerTask timeout; private TimerTask timeout;
private ChildProcAppHandle handle; private AbstractAppHandle handle;
ServerConnection(Socket socket, TimerTask timeout) throws IOException { ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket); super(socket);
@ -291,7 +302,7 @@ class LauncherServer implements Closeable {
timeout.cancel(); timeout.cancel();
timeout = null; timeout = null;
Hello hello = (Hello) msg; Hello hello = (Hello) msg;
ChildProcAppHandle handle = pending.remove(hello.secret); AbstractAppHandle handle = secretToPendingApps.remove(hello.secret);
if (handle != null) { if (handle != null) {
handle.setConnection(this); handle.setConnection(this);
handle.setState(SparkAppHandle.State.CONNECTED); handle.setState(SparkAppHandle.State.CONNECTED);

View file

@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -37,7 +36,7 @@ import static org.apache.spark.launcher.CommandBuilderUtils.*;
* to allow clients to configure the Spark application and launch it as a child process. * to allow clients to configure the Spark application and launch it as a child process.
* </p> * </p>
*/ */
public class SparkLauncher { public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
/** The Spark master. */ /** The Spark master. */
public static final String SPARK_MASTER = "spark.master"; public static final String SPARK_MASTER = "spark.master";
@ -109,7 +108,6 @@ public class SparkLauncher {
} }
// Visible for testing. // Visible for testing.
final SparkSubmitCommandBuilder builder;
File workingDir; File workingDir;
boolean redirectErrorStream; boolean redirectErrorStream;
ProcessBuilder.Redirect errorStream; ProcessBuilder.Redirect errorStream;
@ -125,7 +123,6 @@ public class SparkLauncher {
* @param env Environment variables to set. * @param env Environment variables to set.
*/ */
public SparkLauncher(Map<String, String> env) { public SparkLauncher(Map<String, String> env) {
this.builder = new SparkSubmitCommandBuilder();
if (env != null) { if (env != null) {
this.builder.childEnv.putAll(env); this.builder.childEnv.putAll(env);
} }
@ -155,224 +152,6 @@ public class SparkLauncher {
return this; return this;
} }
/**
* Set a custom properties file with Spark configuration for the application.
*
* @param path Path to custom properties file to use.
* @return This launcher.
*/
public SparkLauncher setPropertiesFile(String path) {
checkNotNull(path, "path");
builder.setPropertiesFile(path);
return this;
}
/**
* Set a single configuration value for the application.
*
* @param key Configuration key.
* @param value The value to use.
* @return This launcher.
*/
public SparkLauncher setConf(String key, String value) {
checkNotNull(key, "key");
checkNotNull(value, "value");
checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
builder.conf.put(key, value);
return this;
}
/**
* Set the application name.
*
* @param appName Application name.
* @return This launcher.
*/
public SparkLauncher setAppName(String appName) {
checkNotNull(appName, "appName");
builder.appName = appName;
return this;
}
/**
* Set the Spark master for the application.
*
* @param master Spark master.
* @return This launcher.
*/
public SparkLauncher setMaster(String master) {
checkNotNull(master, "master");
builder.master = master;
return this;
}
/**
* Set the deploy mode for the application.
*
* @param mode Deploy mode.
* @return This launcher.
*/
public SparkLauncher setDeployMode(String mode) {
checkNotNull(mode, "mode");
builder.deployMode = mode;
return this;
}
/**
* Set the main application resource. This should be the location of a jar file for Scala/Java
* applications, or a python script for PySpark applications.
*
* @param resource Path to the main application resource.
* @return This launcher.
*/
public SparkLauncher setAppResource(String resource) {
checkNotNull(resource, "resource");
builder.appResource = resource;
return this;
}
/**
* Sets the application class name for Java/Scala applications.
*
* @param mainClass Application's main class.
* @return This launcher.
*/
public SparkLauncher setMainClass(String mainClass) {
checkNotNull(mainClass, "mainClass");
builder.mainClass = mainClass;
return this;
}
/**
* Adds a no-value argument to the Spark invocation. If the argument is known, this method
* validates whether the argument is indeed a no-value argument, and throws an exception
* otherwise.
* <p>
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
* @since 1.5.0
* @param arg Argument to add.
* @return This launcher.
*/
public SparkLauncher addSparkArg(String arg) {
SparkSubmitOptionParser validator = new ArgumentValidator(false);
validator.parse(Arrays.asList(arg));
builder.sparkArgs.add(arg);
return this;
}
/**
* Adds an argument with a value to the Spark invocation. If the argument name corresponds to
* a known argument, the code validates that the argument actually expects a value, and throws
* an exception otherwise.
* <p>
* It is safe to add arguments modified by other methods in this class (such as
* {@link #setMaster(String)} - the last invocation will be the one to take effect.
* <p>
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
* @since 1.5.0
* @param name Name of argument to add.
* @param value Value of the argument.
* @return This launcher.
*/
public SparkLauncher addSparkArg(String name, String value) {
SparkSubmitOptionParser validator = new ArgumentValidator(true);
if (validator.MASTER.equals(name)) {
setMaster(value);
} else if (validator.PROPERTIES_FILE.equals(name)) {
setPropertiesFile(value);
} else if (validator.CONF.equals(name)) {
String[] vals = value.split("=", 2);
setConf(vals[0], vals[1]);
} else if (validator.CLASS.equals(name)) {
setMainClass(value);
} else if (validator.JARS.equals(name)) {
builder.jars.clear();
for (String jar : value.split(",")) {
addJar(jar);
}
} else if (validator.FILES.equals(name)) {
builder.files.clear();
for (String file : value.split(",")) {
addFile(file);
}
} else if (validator.PY_FILES.equals(name)) {
builder.pyFiles.clear();
for (String file : value.split(",")) {
addPyFile(file);
}
} else {
validator.parse(Arrays.asList(name, value));
builder.sparkArgs.add(name);
builder.sparkArgs.add(value);
}
return this;
}
/**
* Adds command line arguments for the application.
*
* @param args Arguments to pass to the application's main class.
* @return This launcher.
*/
public SparkLauncher addAppArgs(String... args) {
for (String arg : args) {
checkNotNull(arg, "arg");
builder.appArgs.add(arg);
}
return this;
}
/**
* Adds a jar file to be submitted with the application.
*
* @param jar Path to the jar file.
* @return This launcher.
*/
public SparkLauncher addJar(String jar) {
checkNotNull(jar, "jar");
builder.jars.add(jar);
return this;
}
/**
* Adds a file to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
*/
public SparkLauncher addFile(String file) {
checkNotNull(file, "file");
builder.files.add(file);
return this;
}
/**
* Adds a python file / zip / egg to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
*/
public SparkLauncher addPyFile(String file) {
checkNotNull(file, "file");
builder.pyFiles.add(file);
return this;
}
/**
* Enables verbose reporting for SparkSubmit.
*
* @param verbose Whether to enable verbose output.
* @return This launcher.
*/
public SparkLauncher setVerbose(boolean verbose) {
builder.verbose = verbose;
return this;
}
/** /**
* Sets the working directory of spark-submit. * Sets the working directory of spark-submit.
* *
@ -449,6 +228,79 @@ public class SparkLauncher {
return this; return this;
} }
// The following methods just delegate to the parent class, but they are needed to keep
// binary compatibility with previous versions of this class.
@Override
public SparkLauncher setPropertiesFile(String path) {
return super.setPropertiesFile(path);
}
@Override
public SparkLauncher setConf(String key, String value) {
return super.setConf(key, value);
}
@Override
public SparkLauncher setAppName(String appName) {
return super.setAppName(appName);
}
@Override
public SparkLauncher setMaster(String master) {
return super.setMaster(master);
}
@Override
public SparkLauncher setDeployMode(String mode) {
return super.setDeployMode(mode);
}
@Override
public SparkLauncher setAppResource(String resource) {
return super.setAppResource(resource);
}
@Override
public SparkLauncher setMainClass(String mainClass) {
return super.setMainClass(mainClass);
}
@Override
public SparkLauncher addSparkArg(String arg) {
return super.addSparkArg(arg);
}
@Override
public SparkLauncher addSparkArg(String name, String value) {
return super.addSparkArg(name, value);
}
@Override
public SparkLauncher addAppArgs(String... args) {
return super.addAppArgs(args);
}
@Override
public SparkLauncher addJar(String jar) {
return super.addJar(jar);
}
@Override
public SparkLauncher addFile(String file) {
return super.addFile(file);
}
@Override
public SparkLauncher addPyFile(String file) {
return super.addPyFile(file);
}
@Override
public SparkLauncher setVerbose(boolean verbose) {
return super.setVerbose(verbose);
}
/** /**
* Launches a sub-process that will start the configured Spark application. * Launches a sub-process that will start the configured Spark application.
* <p> * <p>
@ -479,17 +331,9 @@ public class SparkLauncher {
/** /**
* Starts a Spark application. * Starts a Spark application.
*
* <p> * <p>
* This method returns a handle that provides information about the running application and can * Applications launched by this launcher run as child processes. The child's stdout and stderr
* be used to do basic interaction with it.
* <p>
* The returned handle assumes that the application will instantiate a single SparkContext
* during its lifetime. Once that context reports a final state (one that indicates the
* SparkContext has stopped), the handle will not perform new state transitions, so anything
* that happens after that cannot be monitored. If the underlying application is launched as
* a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
* <p>
* Currently, all applications are launched as child processes. The child's stdout and stderr
* are merged and written to a logger (see <code>java.util.logging</code>) only if redirection * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection
* has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be
* defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that
@ -499,15 +343,20 @@ public class SparkLauncher {
* easily into the configuration of commonly-used logging systems. * easily into the configuration of commonly-used logging systems.
* *
* @since 1.6.0 * @since 1.6.0
* @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
* @param listeners Listeners to add to the handle before the app is launched. * @param listeners Listeners to add to the handle before the app is launched.
* @return A handle for the launched application. * @return A handle for the launched application.
*/ */
@Override
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException { public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
ChildProcAppHandle handle = LauncherServer.newAppHandle(); LauncherServer server = LauncherServer.getOrCreateServer();
ChildProcAppHandle handle = new ChildProcAppHandle(server);
for (SparkAppHandle.Listener l : listeners) { for (SparkAppHandle.Listener l : listeners) {
handle.addListener(l); handle.addListener(l);
} }
String secret = server.registerHandle(handle);
String loggerName = getLoggerName(); String loggerName = getLoggerName();
ProcessBuilder pb = createBuilder(); ProcessBuilder pb = createBuilder();
@ -540,9 +389,8 @@ public class SparkLauncher {
pb.redirectErrorStream(true); pb.redirectErrorStream(true);
} }
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, String.valueOf(server.getPort()));
String.valueOf(LauncherServer.getServerInstance().getPort())); pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, secret);
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
try { try {
Process child = pb.start(); Process child = pb.start();
InputStream logStream = null; InputStream logStream = null;
@ -604,6 +452,11 @@ public class SparkLauncher {
return pb; return pb;
} }
@Override
SparkLauncher self() {
return this;
}
// Visible for testing. // Visible for testing.
String findSparkSubmit() { String findSparkSubmit() {
String script = isWindows() ? "spark-submit.cmd" : "spark-submit"; String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
@ -614,32 +467,4 @@ public class SparkLauncher {
return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
} }
private static class ArgumentValidator extends SparkSubmitOptionParser {
private final boolean hasValue;
ArgumentValidator(boolean hasValue) {
this.hasValue = hasValue;
}
@Override
protected boolean handle(String opt, String value) {
if (value == null && hasValue) {
throw new IllegalArgumentException(String.format("'%s' expects a value.", opt));
}
return true;
}
@Override
protected boolean handleUnknown(String opt) {
// Do not fail on unknown arguments, to support future arguments added to SparkSubmit.
return true;
}
protected void handleExtraArgs(List<String> extra) {
// No op.
}
}
} }

View file

@ -355,7 +355,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
env.put(submitArgsEnvVariable, submitArgs.toString()); env.put(submitArgsEnvVariable, submitArgs.toString());
} }
private boolean isClientMode(Map<String, String> userProps) { boolean isClientMode(Map<String, String> userProps) {
String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER)); String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE)); String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE));
// Default master is "local[*]", so assume client mode in that case // Default master is "local[*]", so assume client mode in that case

View file

@ -16,17 +16,18 @@
*/ */
/** /**
* Library for launching Spark applications. * Library for launching Spark applications programmatically.
* *
* <p> * <p>
* This library allows applications to launch Spark programmatically. There's only one entry * There are two ways to start applications with this library: as a child process, using
* point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class. * {@link org.apache.spark.launcher.SparkLauncher}, or in-process, using
* {@link org.apache.spark.launcher.InProcessLauncher}.
* </p> * </p>
* *
* <p> * <p>
* The {@link org.apache.spark.launcher.SparkLauncher#startApplication( * The {@link org.apache.spark.launcher.AbstractLauncher#startApplication(
* org.apache.spark.launcher.SparkAppHandle.Listener...)} can be used to start Spark and provide * org.apache.spark.launcher.SparkAppHandle.Listener...)} method can be used to start Spark and
* a handle to monitor and control the running application: * provide a handle to monitor and control the running application:
* </p> * </p>
* *
* <pre> * <pre>
@ -49,7 +50,20 @@
* </pre> * </pre>
* *
* <p> * <p>
* It's also possible to launch a raw child process, using the * Launching applications as a child process requires a full Spark installation. The installation
* directory can be provided to the launcher explicitly in the launcher's configuration, or by
* setting the <i>SPARK_HOME</i> environment variable.
* </p>
*
* <p>
* Launching applications in-process is only recommended in cluster mode, since Spark cannot run
* multiple client-mode applications concurrently in the same process. The in-process launcher
* requires the necessary Spark dependencies (such as spark-core and cluster manager-specific
* modules) to be present in the caller thread's class loader.
* </p>
*
* <p>
* It's also possible to launch a raw child process, without the extra monitoring, using the
* {@link org.apache.spark.launcher.SparkLauncher#launch()} method: * {@link org.apache.spark.launcher.SparkLauncher#launch()} method:
* </p> * </p>
* *

View file

@ -17,10 +17,14 @@
package org.apache.spark.launcher; package org.apache.spark.launcher;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.slf4j.bridge.SLF4JBridgeHandler; import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;
/** /**
* Handles configuring the JUL -> SLF4J bridge. * Handles configuring the JUL -> SLF4J bridge, and provides some utility methods for tests.
*/ */
class BaseSuite { class BaseSuite {
@ -29,4 +33,33 @@ class BaseSuite {
SLF4JBridgeHandler.install(); SLF4JBridgeHandler.install();
} }
@After
public void postChecks() {
LauncherServer server = LauncherServer.getServer();
if (server != null) {
// Shut down the server to clean things up for the next test.
try {
server.close();
} catch (Exception e) {
// Ignore.
}
}
assertNull(server);
}
protected void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
try {
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
} finally {
if (!handle.getState().isFinal()) {
handle.kill();
}
}
}
} }

View file

@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.nio.file.attribute.PosixFilePermission.*; import static java.nio.file.attribute.PosixFilePermission.*;
@ -217,21 +216,6 @@ public class ChildProcAppHandleSuite extends BaseSuite {
assertEquals(SparkAppHandle.State.FAILED, handle.getState()); assertEquals(SparkAppHandle.State.FAILED, handle.getState());
} }
private void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
try {
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
} finally {
if (!handle.getState().isFinal()) {
handle.kill();
}
}
}
private static class TestSparkLauncher extends SparkLauncher { private static class TestSparkLauncher extends SparkLauncher {
TestSparkLauncher() { TestSparkLauncher() {

View file

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class InProcessLauncherSuite extends BaseSuite {
// Arguments passed to the test class to identify the test being run.
private static final String TEST_SUCCESS = "success";
private static final String TEST_FAILURE = "failure";
private static final String TEST_KILL = "kill";
private static final String TEST_FAILURE_MESSAGE = "d'oh";
private static Throwable lastError;
@Before
public void testSetup() {
lastError = null;
}
@Test
public void testLauncher() throws Exception {
SparkAppHandle app = startTest(TEST_SUCCESS);
waitFor(app);
assertNull(lastError);
// Because the test doesn't implement the launcher protocol, the final state here will be
// LOST instead of FINISHED.
assertEquals(SparkAppHandle.State.LOST, app.getState());
}
@Test
public void testKill() throws Exception {
SparkAppHandle app = startTest(TEST_KILL);
app.kill();
waitFor(app);
assertNull(lastError);
assertEquals(SparkAppHandle.State.KILLED, app.getState());
}
@Test
public void testErrorPropagation() throws Exception {
SparkAppHandle app = startTest(TEST_FAILURE);
waitFor(app);
assertEquals(SparkAppHandle.State.FAILED, app.getState());
assertNotNull(lastError);
assertEquals(TEST_FAILURE_MESSAGE, lastError.getMessage());
}
private SparkAppHandle startTest(String test) throws Exception {
return new TestInProcessLauncher()
.addAppArgs(test)
.setAppResource(SparkLauncher.NO_RESOURCE)
.startApplication();
}
public static void runTest(String[] args) {
try {
assertTrue(args.length != 0);
// Make sure at least the launcher-provided config options are in the args array.
final AtomicReference<String> port = new AtomicReference<>();
final AtomicReference<String> secret = new AtomicReference<>();
SparkSubmitOptionParser parser = new SparkSubmitOptionParser() {
@Override
protected boolean handle(String opt, String value) {
if (opt == CONF) {
String[] conf = value.split("=");
switch(conf[0]) {
case LauncherProtocol.CONF_LAUNCHER_PORT:
port.set(conf[1]);
break;
case LauncherProtocol.CONF_LAUNCHER_SECRET:
secret.set(conf[1]);
break;
default:
// no op
}
}
return true;
}
@Override
protected boolean handleUnknown(String opt) {
return true;
}
@Override
protected void handleExtraArgs(List<String> extra) {
// no op.
}
};
parser.parse(Arrays.asList(args));
assertNotNull("Launcher port not found.", port.get());
assertNotNull("Launcher secret not found.", secret.get());
String test = args[args.length - 1];
switch (test) {
case TEST_SUCCESS:
break;
case TEST_FAILURE:
throw new IllegalStateException(TEST_FAILURE_MESSAGE);
case TEST_KILL:
try {
// Wait for a reasonable amount of time to avoid the test hanging forever on failure,
// but still allowing for time outs to hopefully not occur on busy machines.
Thread.sleep(10000);
fail("Did not get expected interrupt after 10s.");
} catch (InterruptedException ie) {
// Expected.
}
break;
default:
fail("Unknown test " + test);
}
} catch (Throwable t) {
lastError = t;
throw new RuntimeException(t);
}
}
private static class TestInProcessLauncher extends InProcessLauncher {
@Override
Method findSparkSubmit() throws IOException {
try {
return InProcessLauncherSuite.class.getMethod("runTest", String[].class);
} catch (Exception e) {
throw new IOException(e);
}
}
}
}

View file

@ -39,39 +39,27 @@ public class LauncherServerSuite extends BaseSuite {
@Test @Test
public void testLauncherServerReuse() throws Exception { public void testLauncherServerReuse() throws Exception {
ChildProcAppHandle handle1 = null; LauncherServer server1 = LauncherServer.getOrCreateServer();
ChildProcAppHandle handle2 = null; ChildProcAppHandle handle = new ChildProcAppHandle(server1);
ChildProcAppHandle handle3 = null; handle.kill();
LauncherServer server2 = LauncherServer.getOrCreateServer();
try { try {
handle1 = LauncherServer.newAppHandle(); assertNotSame(server1, server2);
handle2 = LauncherServer.newAppHandle();
LauncherServer server1 = handle1.getServer();
assertSame(server1, handle2.getServer());
handle1.kill();
handle2.kill();
handle3 = LauncherServer.newAppHandle();
assertNotSame(server1, handle3.getServer());
handle3.kill();
assertNull(LauncherServer.getServerInstance());
} finally { } finally {
kill(handle1); server2.unref();
kill(handle2);
kill(handle3);
} }
} }
@Test @Test
public void testCommunication() throws Exception { public void testCommunication() throws Exception {
ChildProcAppHandle handle = LauncherServer.newAppHandle(); LauncherServer server = LauncherServer.getOrCreateServer();
ChildProcAppHandle handle = new ChildProcAppHandle(server);
String secret = server.registerHandle(handle);
TestClient client = null; TestClient client = null;
try { try {
Socket s = new Socket(InetAddress.getLoopbackAddress(), Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort());
LauncherServer.getServerInstance().getPort());
final Semaphore semaphore = new Semaphore(0); final Semaphore semaphore = new Semaphore(0);
handle.addListener(new SparkAppHandle.Listener() { handle.addListener(new SparkAppHandle.Listener() {
@ -86,7 +74,7 @@ public class LauncherServerSuite extends BaseSuite {
}); });
client = new TestClient(s); client = new TestClient(s);
client.send(new Hello(handle.getSecret(), "1.4.0")); client.send(new Hello(secret, "1.4.0"));
assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS)); assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
// Make sure the server matched the client to the handle. // Make sure the server matched the client to the handle.
@ -104,7 +92,7 @@ public class LauncherServerSuite extends BaseSuite {
Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS); Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
assertTrue(stopMsg instanceof Stop); assertTrue(stopMsg instanceof Stop);
} finally { } finally {
kill(handle); handle.kill();
close(client); close(client);
client.clientThread.join(); client.clientThread.join();
} }
@ -112,34 +100,36 @@ public class LauncherServerSuite extends BaseSuite {
@Test @Test
public void testTimeout() throws Exception { public void testTimeout() throws Exception {
ChildProcAppHandle handle = null; LauncherServer server = LauncherServer.getOrCreateServer();
ChildProcAppHandle handle = new ChildProcAppHandle(server);
String secret = server.registerHandle(handle);
TestClient client = null; TestClient client = null;
try { try {
// LauncherServer will immediately close the server-side socket when the timeout is set // LauncherServer will immediately close the server-side socket when the timeout is set
// to 0. // to 0.
SparkLauncher.setConfig(SparkLauncher.CHILD_CONNECTION_TIMEOUT, "0"); SparkLauncher.setConfig(SparkLauncher.CHILD_CONNECTION_TIMEOUT, "0");
handle = LauncherServer.newAppHandle(); Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort());
Socket s = new Socket(InetAddress.getLoopbackAddress(),
LauncherServer.getServerInstance().getPort());
client = new TestClient(s); client = new TestClient(s);
waitForError(client, handle.getSecret()); waitForError(client, secret);
} finally { } finally {
SparkLauncher.launcherConfig.remove(SparkLauncher.CHILD_CONNECTION_TIMEOUT); SparkLauncher.launcherConfig.remove(SparkLauncher.CHILD_CONNECTION_TIMEOUT);
kill(handle); handle.kill();
close(client); close(client);
} }
} }
@Test @Test
public void testSparkSubmitVmShutsDown() throws Exception { public void testSparkSubmitVmShutsDown() throws Exception {
ChildProcAppHandle handle = LauncherServer.newAppHandle(); LauncherServer server = LauncherServer.getOrCreateServer();
ChildProcAppHandle handle = new ChildProcAppHandle(server);
String secret = server.registerHandle(handle);
TestClient client = null; TestClient client = null;
final Semaphore semaphore = new Semaphore(0); final Semaphore semaphore = new Semaphore(0);
try { try {
Socket s = new Socket(InetAddress.getLoopbackAddress(), Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort());
LauncherServer.getServerInstance().getPort());
handle.addListener(new SparkAppHandle.Listener() { handle.addListener(new SparkAppHandle.Listener() {
public void stateChanged(SparkAppHandle handle) { public void stateChanged(SparkAppHandle handle) {
semaphore.release(); semaphore.release();
@ -149,7 +139,7 @@ public class LauncherServerSuite extends BaseSuite {
} }
}); });
client = new TestClient(s); client = new TestClient(s);
client.send(new Hello(handle.getSecret(), "1.4.0")); client.send(new Hello(secret, "1.4.0"));
assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS)); assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
// Make sure the server matched the client to the handle. // Make sure the server matched the client to the handle.
assertNotNull(handle.getConnection()); assertNotNull(handle.getConnection());
@ -157,7 +147,7 @@ public class LauncherServerSuite extends BaseSuite {
assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS)); assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
assertEquals(SparkAppHandle.State.LOST, handle.getState()); assertEquals(SparkAppHandle.State.LOST, handle.getState());
} finally { } finally {
kill(handle); handle.kill();
close(client); close(client);
client.clientThread.join(); client.clientThread.join();
} }
@ -165,11 +155,13 @@ public class LauncherServerSuite extends BaseSuite {
@Test @Test
public void testStreamFiltering() throws Exception { public void testStreamFiltering() throws Exception {
ChildProcAppHandle handle = LauncherServer.newAppHandle(); LauncherServer server = LauncherServer.getOrCreateServer();
ChildProcAppHandle handle = new ChildProcAppHandle(server);
String secret = server.registerHandle(handle);
TestClient client = null; TestClient client = null;
try { try {
Socket s = new Socket(InetAddress.getLoopbackAddress(), Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort());
LauncherServer.getServerInstance().getPort());
client = new TestClient(s); client = new TestClient(s);
@ -181,21 +173,15 @@ public class LauncherServerSuite extends BaseSuite {
// happening for other reasons). // happening for other reasons).
} }
waitForError(client, handle.getSecret()); waitForError(client, secret);
assertEquals(0, EvilPayload.EVIL_BIT); assertEquals(0, EvilPayload.EVIL_BIT);
} finally { } finally {
kill(handle); handle.kill();
close(client); close(client);
client.clientThread.join(); client.clientThread.join();
} }
} }
private void kill(SparkAppHandle handle) {
if (handle != null) {
handle.kill();
}
}
private void close(Closeable c) { private void close(Closeable c) {
if (c != null) { if (c != null) {
try { try {

View file

@ -92,6 +92,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private[this] var stopCalled: Boolean = false private[this] var stopCalled: Boolean = false
private val launcherBackend = new LauncherBackend() { private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sc.conf
override protected def onStopRequest(): Unit = { override protected def onStopRequest(): Unit = {
stopSchedulerBackend() stopSchedulerBackend()
setState(SparkAppHandle.State.KILLED) setState(SparkAppHandle.State.KILLED)

View file

@ -100,6 +100,8 @@ private[spark] class Client(
private var amKeytabFileName: String = null private var amKeytabFileName: String = null
private val launcherBackend = new LauncherBackend() { private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sparkConf
override def onStopRequest(): Unit = { override def onStopRequest(): Unit = {
if (isClusterMode && appId != null) { if (isClusterMode && appId != null) {
yarnClient.killApplication(appId) yarnClient.killApplication(appId)