[SPARK-3591][YARN]fire and forget for YARN cluster mode
https://issues.apache.org/jira/browse/SPARK-3591 The output after this patch: >doggie153:/opt/oss/spark-1.3.0-bin-hadoop2.4/bin # ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster ../lib/spark-examples*.jar 15/03/31 21:15:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/31 21:15:25 INFO RMProxy: Connecting to ResourceManager at doggie153/10.177.112.153:8032 15/03/31 21:15:25 INFO Client: Requesting a new application from cluster with 4 NodeManagers 15/03/31 21:15:25 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 15/03/31 21:15:25 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/03/31 21:15:25 INFO Client: Setting up container launch context for our AM 15/03/31 21:15:25 INFO Client: Preparing resources for our AM container 15/03/31 21:15:26 INFO Client: Uploading resource file:/opt/oss/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.4.1.jar -> hdfs://doggie153:9000/user/root/.sparkStaging/application_1427257505534_0016/spark-assembly-1.4.0-SNAPSHOT-hadoop2.4.1.jar 15/03/31 21:15:27 INFO Client: Uploading resource file:/opt/oss/spark-1.3.0-bin-hadoop2.4/lib/spark-examples-1.3.0-hadoop2.4.0.jar -> hdfs://doggie153:9000/user/root/.sparkStaging/application_1427257505534_0016/spark-examples-1.3.0-hadoop2.4.0.jar 15/03/31 21:15:28 INFO Client: Setting up the launch environment for our AM container 15/03/31 21:15:28 INFO SecurityManager: Changing view acls to: root 15/03/31 21:15:28 INFO SecurityManager: Changing modify acls to: root 15/03/31 21:15:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/31 21:15:28 INFO Client: Submitting application 16 to ResourceManager 15/03/31 21:15:28 INFO YarnClientImpl: Submitted application application_1427257505534_0016 15/03/31 21:15:28 INFO Client: ... waiting before polling ResourceManager for application state 15/03/31 21:15:33 INFO Client: ... polling ResourceManager for application state 15/03/31 21:15:33 INFO Client: Application report for application_1427257505534_0016 (state: RUNNING) 15/03/31 21:15:33 INFO Client: client token: N/A diagnostics: N/A ApplicationMaster host: doggie157 ApplicationMaster RPC port: 0 queue: default start time: 1427807728307 final status: UNDEFINED tracking URL: http://doggie153:8088/proxy/application_1427257505534_0016/ user: root /cc andrewor14 Author: WangTaoTheTonic <wangtao111@huawei.com> Closes #5297 from WangTaoTheTonic/SPARK-3591 and squashes the following commits: c76d232 [WangTaoTheTonic] wrap lines 16c90a8 [WangTaoTheTonic] move up lines to avoid duplicate fea390d [WangTaoTheTonic] log failed/killed report, style and comment be1cc2e [WangTaoTheTonic] reword f0bc54f [WangTaoTheTonic] minor: expose appid in excepiton messages ba9b22b [WangTaoTheTonic] wrong config name e1a4013 [WangTaoTheTonic] revert to the old version and do some robust 19706c0 [WangTaoTheTonic] add a config to control whether to forget 0cbdce8 [WangTaoTheTonic] fire and forget for YARN cluster mode
This commit is contained in:
parent
ae980eb41c
commit
b65bad65c3
|
@ -89,7 +89,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
|
|||
|
||||
/* Find out driver status then exit the JVM */
|
||||
def pollAndReportStatus(driverId: String) {
|
||||
println(s"... waiting before polling master for driver state")
|
||||
println("... waiting before polling master for driver state")
|
||||
Thread.sleep(5000)
|
||||
println("... polling master for driver state")
|
||||
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
|
||||
|
|
|
@ -245,7 +245,7 @@ private[deploy] class StandaloneRestClient extends Logging {
|
|||
}
|
||||
} else {
|
||||
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
|
||||
logError("Application submission failed" + failMessage)
|
||||
logError(s"Application submission failed$failMessage")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -196,6 +196,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
|
|||
It should be no larger than the global number of max attempts in the YARN configuration.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.yarn.submit.waitAppCompletion</code></td>
|
||||
<td>true</td>
|
||||
<td>
|
||||
In YARN cluster mode, controls whether the client waits to exit until the application completes.
|
||||
If set to true, the client process will stay alive reporting the application's status.
|
||||
Otherwise, the client process will exit after submission.
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
# Launching Spark on YARN
|
||||
|
|
|
@ -66,6 +66,8 @@ private[spark] class Client(
|
|||
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
|
||||
private val distCacheMgr = new ClientDistributedCacheManager()
|
||||
private val isClusterMode = args.isClusterMode
|
||||
private val fireAndForget = isClusterMode &&
|
||||
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
|
||||
|
||||
|
||||
def stop(): Unit = yarnClient.stop()
|
||||
|
@ -564,31 +566,13 @@ private[spark] class Client(
|
|||
|
||||
if (logApplicationReport) {
|
||||
logInfo(s"Application report for $appId (state: $state)")
|
||||
val details = Seq[(String, String)](
|
||||
("client token", getClientToken(report)),
|
||||
("diagnostics", report.getDiagnostics),
|
||||
("ApplicationMaster host", report.getHost),
|
||||
("ApplicationMaster RPC port", report.getRpcPort.toString),
|
||||
("queue", report.getQueue),
|
||||
("start time", report.getStartTime.toString),
|
||||
("final status", report.getFinalApplicationStatus.toString),
|
||||
("tracking URL", report.getTrackingUrl),
|
||||
("user", report.getUser)
|
||||
)
|
||||
|
||||
// Use more loggable format if value is null or empty
|
||||
val formattedDetails = details
|
||||
.map { case (k, v) =>
|
||||
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
|
||||
s"\n\t $k: $newValue" }
|
||||
.mkString("")
|
||||
|
||||
// If DEBUG is enabled, log report details every iteration
|
||||
// Otherwise, log them every time the application changes state
|
||||
if (log.isDebugEnabled) {
|
||||
logDebug(formattedDetails)
|
||||
logDebug(formatReportDetails(report))
|
||||
} else if (lastState != state) {
|
||||
logInfo(formattedDetails)
|
||||
logInfo(formatReportDetails(report))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -609,24 +593,57 @@ private[spark] class Client(
|
|||
throw new SparkException("While loop is depleted! This should never happen...")
|
||||
}
|
||||
|
||||
private def formatReportDetails(report: ApplicationReport): String = {
|
||||
val details = Seq[(String, String)](
|
||||
("client token", getClientToken(report)),
|
||||
("diagnostics", report.getDiagnostics),
|
||||
("ApplicationMaster host", report.getHost),
|
||||
("ApplicationMaster RPC port", report.getRpcPort.toString),
|
||||
("queue", report.getQueue),
|
||||
("start time", report.getStartTime.toString),
|
||||
("final status", report.getFinalApplicationStatus.toString),
|
||||
("tracking URL", report.getTrackingUrl),
|
||||
("user", report.getUser)
|
||||
)
|
||||
|
||||
// Use more loggable format if value is null or empty
|
||||
details.map { case (k, v) =>
|
||||
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
|
||||
s"\n\t $k: $newValue"
|
||||
}.mkString("")
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit an application to the ResourceManager and monitor its state.
|
||||
* This continues until the application has exited for any reason.
|
||||
* Submit an application to the ResourceManager.
|
||||
* If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
|
||||
* reporting the application's status until the application has exited for any reason.
|
||||
* Otherwise, the client process will exit after submission.
|
||||
* If the application finishes with a failed, killed, or undefined status,
|
||||
* throw an appropriate SparkException.
|
||||
*/
|
||||
def run(): Unit = {
|
||||
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
|
||||
if (yarnApplicationState == YarnApplicationState.FAILED ||
|
||||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
|
||||
throw new SparkException("Application finished with failed status")
|
||||
}
|
||||
if (yarnApplicationState == YarnApplicationState.KILLED ||
|
||||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
|
||||
throw new SparkException("Application is killed")
|
||||
}
|
||||
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
|
||||
throw new SparkException("The final status of application is undefined")
|
||||
val appId = submitApplication()
|
||||
if (fireAndForget) {
|
||||
val report = getApplicationReport(appId)
|
||||
val state = report.getYarnApplicationState
|
||||
logInfo(s"Application report for $appId (state: $state)")
|
||||
logInfo(formatReportDetails(report))
|
||||
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
|
||||
throw new SparkException(s"Application $appId finished with status: $state")
|
||||
}
|
||||
} else {
|
||||
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
|
||||
if (yarnApplicationState == YarnApplicationState.FAILED ||
|
||||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
|
||||
throw new SparkException(s"Application $appId finished with failed status")
|
||||
}
|
||||
if (yarnApplicationState == YarnApplicationState.KILLED ||
|
||||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
|
||||
throw new SparkException(s"Application $appId is killed")
|
||||
}
|
||||
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
|
||||
throw new SparkException(s"The final status of application $appId is undefined")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue