[SPARK-16505][YARN] Optionally propagate error during shuffle service startup.
This prevents the NM from starting when something is wrong, which would lead to later errors which are confusing and harder to debug. Added a unit test to verify startup fails if something is wrong. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #14162 from vanzin/SPARK-16505.
This commit is contained in:
parent
c4bc2ed844
commit
b7b5e17876
|
@ -70,6 +70,11 @@ public class YarnShuffleService extends AuxiliaryService {
|
||||||
|
|
||||||
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
|
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
|
||||||
|
|
||||||
|
// Whether failure during service initialization should stop the NM.
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
|
||||||
|
private static final boolean DEFAULT_STOP_ON_FAILURE = false;
|
||||||
|
|
||||||
// An entity that manages the shuffle secret per application
|
// An entity that manages the shuffle secret per application
|
||||||
// This is used only if authentication is enabled
|
// This is used only if authentication is enabled
|
||||||
private ShuffleSecretManager secretManager;
|
private ShuffleSecretManager secretManager;
|
||||||
|
@ -119,9 +124,12 @@ public class YarnShuffleService extends AuxiliaryService {
|
||||||
* Start the shuffle server with the given configuration.
|
* Start the shuffle server with the given configuration.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
_conf = conf;
|
_conf = conf;
|
||||||
|
|
||||||
|
boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
|
||||||
|
|
||||||
|
try {
|
||||||
// In case this NM was killed while there were running spark applications, we need to restore
|
// In case this NM was killed while there were running spark applications, we need to restore
|
||||||
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
|
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
|
||||||
// If we don't find one, then we choose a file to use to save the state next time. Even if
|
// If we don't find one, then we choose a file to use to save the state next time. Even if
|
||||||
|
@ -131,15 +139,11 @@ public class YarnShuffleService extends AuxiliaryService {
|
||||||
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
|
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
|
||||||
|
|
||||||
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
|
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
|
||||||
|
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
|
||||||
|
|
||||||
// If authentication is enabled, set up the shuffle server to use a
|
// If authentication is enabled, set up the shuffle server to use a
|
||||||
// special RPC handler that filters out unauthenticated fetch requests
|
// special RPC handler that filters out unauthenticated fetch requests
|
||||||
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
|
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
|
||||||
try {
|
|
||||||
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Failed to initialize external shuffle service", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
|
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
|
||||||
if (authEnabled) {
|
if (authEnabled) {
|
||||||
secretManager = new ShuffleSecretManager();
|
secretManager = new ShuffleSecretManager();
|
||||||
|
@ -157,6 +161,13 @@ public class YarnShuffleService extends AuxiliaryService {
|
||||||
logger.info("Started YARN shuffle service for Spark on port {}. " +
|
logger.info("Started YARN shuffle service for Spark on port {}. " +
|
||||||
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
|
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
|
||||||
registeredExecutorFile);
|
registeredExecutorFile);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (stopOnFailure) {
|
||||||
|
throw e;
|
||||||
|
} else {
|
||||||
|
noteFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -83,18 +83,7 @@ In Mesos coarse-grained mode, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.
|
||||||
slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so
|
slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so
|
||||||
through Marathon.
|
through Marathon.
|
||||||
|
|
||||||
In YARN mode, start the shuffle service on each `NodeManager` as follows:
|
In YARN mode, follow the instructions [here](running-on-yarn.html#configuring-the-external-shuffle-service).
|
||||||
|
|
||||||
1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
|
|
||||||
pre-packaged distribution.
|
|
||||||
2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
|
|
||||||
`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under
|
|
||||||
`lib` if you are using a distribution.
|
|
||||||
2. Add this jar to the classpath of all `NodeManager`s in your cluster.
|
|
||||||
3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
|
|
||||||
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
|
|
||||||
`org.apache.spark.network.yarn.YarnShuffleService`.
|
|
||||||
4. Restart all `NodeManager`s in your cluster.
|
|
||||||
|
|
||||||
All other relevant configurations are optional and under the `spark.dynamicAllocation.*` and
|
All other relevant configurations are optional and under the `spark.dynamicAllocation.*` and
|
||||||
`spark.shuffle.service.*` namespaces. For more detail, see the
|
`spark.shuffle.service.*` namespaces. For more detail, see the
|
||||||
|
|
|
@ -539,6 +539,37 @@ launch time. This is done by listing them in the `spark.yarn.access.namenodes` p
|
||||||
spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
|
spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Configuring the External Shuffle Service
|
||||||
|
|
||||||
|
To start the Spark Shuffle Service on each `NodeManager` in your YARN cluster, follow these
|
||||||
|
instructions:
|
||||||
|
|
||||||
|
1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
|
||||||
|
pre-packaged distribution.
|
||||||
|
1. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
|
||||||
|
`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under
|
||||||
|
`lib` if you are using a distribution.
|
||||||
|
1. Add this jar to the classpath of all `NodeManager`s in your cluster.
|
||||||
|
1. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
|
||||||
|
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
|
||||||
|
`org.apache.spark.network.yarn.YarnShuffleService`.
|
||||||
|
1. Restart all `NodeManager`s in your cluster.
|
||||||
|
|
||||||
|
The following extra configuration options are available when the shuffle service is running on YARN:
|
||||||
|
|
||||||
|
<table class="table">
|
||||||
|
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
||||||
|
<tr>
|
||||||
|
<td><code>spark.yarn.shuffle.stopOnFailure</code></td>
|
||||||
|
<td><code>false</code></td>
|
||||||
|
<td>
|
||||||
|
Whether to stop the NodeManager when there's a failure in the Spark Shuffle Service's
|
||||||
|
initialization. This prevents application failures caused by running containers on
|
||||||
|
NodeManagers where the Spark Shuffle Service is not running.
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
|
||||||
## Launching your application with Apache Oozie
|
## Launching your application with Apache Oozie
|
||||||
|
|
||||||
Apache Oozie can launch Spark applications as part of a workflow.
|
Apache Oozie can launch Spark applications as part of a workflow.
|
||||||
|
|
|
@ -16,13 +16,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.spark.network.yarn
|
package org.apache.spark.network.yarn
|
||||||
|
|
||||||
import java.io.{DataOutputStream, File, FileOutputStream}
|
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
|
||||||
|
import java.nio.file.Files
|
||||||
|
import java.nio.file.attribute.PosixFilePermission._
|
||||||
|
import java.util.EnumSet
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.language.postfixOps
|
import scala.language.postfixOps
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
import org.apache.hadoop.service.ServiceStateException
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId
|
import org.apache.hadoop.yarn.api.records.ApplicationId
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
||||||
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
|
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
|
||||||
|
@ -45,7 +49,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
||||||
classOf[YarnShuffleService].getCanonicalName)
|
classOf[YarnShuffleService].getCanonicalName)
|
||||||
yarnConfig.setInt("spark.shuffle.service.port", 0)
|
yarnConfig.setInt("spark.shuffle.service.port", 0)
|
||||||
val localDir = Utils.createTempDir()
|
val localDir = Utils.createTempDir()
|
||||||
yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
|
yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
var s1: YarnShuffleService = null
|
var s1: YarnShuffleService = null
|
||||||
|
@ -316,4 +320,28 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
||||||
|
|
||||||
s2.stop()
|
s2.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("service throws error if cannot start") {
|
||||||
|
// Create a different config with a read-only local dir.
|
||||||
|
val roConfig = new YarnConfiguration(yarnConfig)
|
||||||
|
val roDir = Utils.createTempDir()
|
||||||
|
Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE))
|
||||||
|
roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
|
||||||
|
roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
|
||||||
|
|
||||||
|
// Try to start the shuffle service, it should fail.
|
||||||
|
val service = new YarnShuffleService()
|
||||||
|
|
||||||
|
try {
|
||||||
|
val error = intercept[ServiceStateException] {
|
||||||
|
service.init(roConfig)
|
||||||
|
}
|
||||||
|
assert(error.getCause().isInstanceOf[IOException])
|
||||||
|
} finally {
|
||||||
|
service.stop()
|
||||||
|
Files.setPosixFilePermissions(roDir.toPath(),
|
||||||
|
EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue