[SPARK-3877][YARN] Throw an exception when application is not successful so that the exit code wil be set to 1

When an yarn application fails (yarn-cluster mode), the exit code of spark-submit is still 0. It's hard for people to write some automatic scripts to run spark jobs in yarn because the failure can not be detected in these scripts.

This PR added a status checking after `monitorApplication`. If an application is not successful, `run()` will throw an `SparkException`, so that Client.scala will exit with code 1. Therefore, people can use the exit code of `spark-submit` to write some automatic scripts.

Author: zsxwing <zsxwing@gmail.com>

Closes #2732 from zsxwing/SPARK-3877 and squashes the following commits:

1f89fa5 [zsxwing] Fix the unit test
a0498e1 [zsxwing] Update the docs and the error message
e1cb9ef [zsxwing] Fix the hacky way of calling Client
ff16fec [zsxwing] Remove System.exit in Client.scala and add a test
6a2c103 [zsxwing] [SPARK-3877] Throw an exception when application is not successful so that the exit code wil be set to 1
This commit is contained in:
zsxwing 2014-10-22 15:04:41 -07:00 committed by Andrew Or
parent 813effc701
commit 137d942353
5 changed files with 44 additions and 35 deletions

View file

@ -137,15 +137,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception =>
Console.err.println(e.getMessage)
System.exit(1)
}
System.exit(0)
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
}
}

View file

@ -417,17 +417,19 @@ private[spark] trait ClientBase extends Logging {
/**
* Report the state of an application until it has exited, either successfully or
* due to some failure, then return the application state.
* due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
* KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
* or KILLED).
*
* @param appId ID of the application to monitor.
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
* @param logApplicationReport Whether to log details of the application report every iteration.
* @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
* @return A pair of the yarn application state and the final application state.
*/
def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true): YarnApplicationState = {
logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
var lastState: YarnApplicationState = null
while (true) {
@ -468,11 +470,11 @@ private[spark] trait ClientBase extends Logging {
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
return state
return (state, report.getFinalApplicationStatus)
}
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return state
return (state, report.getFinalApplicationStatus)
}
lastState = state
@ -485,8 +487,23 @@ private[spark] trait ClientBase extends Logging {
/**
* Submit an application to the ResourceManager and monitor its state.
* This continues until the application has exited for any reason.
* If the application finishes with a failed, killed, or undefined status,
* throw an appropriate SparkException.
*/
def run(): Unit = monitorApplication(submitApplication())
def run(): Unit = {
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
throw new SparkException("Application finished with failed status")
}
if (yarnApplicationState == YarnApplicationState.KILLED ||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
throw new SparkException("Application is killed")
}
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
throw new SparkException("The final status of application is undefined")
}
}
/* --------------------------------------------------------------------------------------- *
| Methods that cannot be implemented here due to API differences across hadoop versions |

View file

@ -99,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend(
*/
private def waitForApplication(): Unit = {
assert(client != null && appId != null, "Application has not been submitted yet!")
val state = client.monitorApplication(appId, returnOnRunning = true) // blocking
val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {

View file

@ -135,15 +135,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception =>
Console.err.println(e.getMessage)
System.exit(1)
}
System.exit(0)
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
}
}

View file

@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@ -123,21 +123,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
var result = File.createTempFile("result", null, tempDir)
// The Client object will call System.exit() after the job is done, and we don't want
// that because it messes up the scalatest monitoring. So replicate some of what main()
// does here.
val args = Array("--class", main,
"--jar", "file:" + fakeSparkJar.getAbsolutePath(),
"--arg", "yarn-cluster",
"--arg", result.getAbsolutePath(),
"--num-executors", "1")
val sparkConf = new SparkConf()
val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val clientArgs = new ClientArguments(args, sparkConf)
new Client(clientArgs, yarnConf, sparkConf).run()
Client.main(args)
checkResult(result)
}
test("run Spark in yarn-cluster mode unsuccessfully") {
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
// Use only one argument so the driver will fail
val args = Array("--class", main,
"--jar", "file:" + fakeSparkJar.getAbsolutePath(),
"--arg", "yarn-cluster",
"--num-executors", "1")
val exception = intercept[SparkException] {
Client.main(args)
}
assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
}
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So