[SPARK-21541][YARN] Spark Logs show incorrect job status for a job that does not create SparkContext

If you run a spark job without creating the SparkSession or SparkContext, the spark job logs says it succeeded but yarn says it fails and retries 3 times. Also, since, Application Master unregisters with Resource Manager and exits successfully, it deletes the spark staging directory, so when yarn makes subsequent retries, it fails to find the staging directory and thus, the retries fail.

Added a flag to check whether user has initialized SparkContext. If it is true, we let Application Master unregister with Resource Manager else, we do not let AM unregister with RM.

## How was this patch tested?
Manually tested the fix.
Before:
<img width="1253" alt="screen shot-before" src="https://user-images.githubusercontent.com/22228190/28647214-69bf81e2-722b-11e7-9ed0-d416d2bf23be.png">

After:
<img width="1319" alt="screen shot-after" src="https://user-images.githubusercontent.com/22228190/28647220-70f9eea2-722b-11e7-85c6-e56276b15614.png">

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: pgandhi <pgandhi@yahoo-inc.com>
Author: pgandhi999 <parthkgandhi9@gmail.com>

Closes #18741 from pgandhi999/SPARK-21541.
This commit is contained in:
pgandhi 2017-07-28 09:23:08 -05:00 committed by Tom Graves
parent 784680903c
commit 69ab0e4bdd

View file

@ -90,6 +90,9 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _
// A flag to check whether user has initialized spark context
@volatile private var registered = false
private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
@ -319,7 +322,7 @@ private[spark] class ApplicationMaster(
*/
final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
synchronized {
if (!unregistered) {
if (registered && !unregistered) {
logInfo(s"Unregistering ApplicationMaster with $status" +
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
unregistered = true
@ -332,10 +335,15 @@ private[spark] class ApplicationMaster(
synchronized {
if (!finished) {
val inShutdown = ShutdownHookManager.inShutdown()
logInfo(s"Final app status: $status, exitCode: $code" +
if (registered) {
exitCode = code
finalStatus = status
} else {
finalStatus = FinalApplicationStatus.FAILED
exitCode = ApplicationMaster.EXIT_SC_NOT_INITED
}
logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
exitCode = code
finalStatus = status
finalMsg = msg
finished = true
if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
@ -439,12 +447,11 @@ private[spark] class ApplicationMaster(
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
registered = true
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
if (!finished) {
throw new IllegalStateException("SparkContext is null but app is still running!")
}
throw new IllegalStateException("User did not initialize spark context!")
}
userClassThread.join()
} catch {