diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d21a739347..7e023f2d92 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream} +import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import java.security.PrivilegedExceptionAction @@ -91,30 +91,52 @@ private[spark] class Client( * available in the alpha API. */ def submitApplication(): ApplicationId = { - // Setup the credentials before doing anything else, so we have don't have issues at any point. - setupCredentials() - yarnClient.init(yarnConf) - yarnClient.start() + var appId: ApplicationId = null + try { + // Setup the credentials before doing anything else, + // so we have don't have issues at any point. + setupCredentials() + yarnClient.init(yarnConf) + yarnClient.start() - logInfo("Requesting a new application from cluster with %d NodeManagers" - .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) + logInfo("Requesting a new application from cluster with %d NodeManagers" + .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) - // Get a new application from our RM - val newApp = yarnClient.createApplication() - val newAppResponse = newApp.getNewApplicationResponse() - val appId = newAppResponse.getApplicationId() + // Get a new application from our RM + val newApp = yarnClient.createApplication() + val newAppResponse = newApp.getNewApplicationResponse() + appId = newAppResponse.getApplicationId() - // Verify whether the cluster has enough resources for our AM - verifyClusterResources(newAppResponse) + // Verify whether the cluster has enough resources for our AM + verifyClusterResources(newAppResponse) - // Set up the appropriate contexts to launch our AM - val containerContext = createContainerLaunchContext(newAppResponse) - val appContext = createApplicationSubmissionContext(newApp, containerContext) + // Set up the appropriate contexts to launch our AM + val containerContext = createContainerLaunchContext(newAppResponse) + val appContext = createApplicationSubmissionContext(newApp, containerContext) - // Finally, submit and monitor the application - logInfo(s"Submitting application ${appId.getId} to ResourceManager") - yarnClient.submitApplication(appContext) - appId + // Finally, submit and monitor the application + logInfo(s"Submitting application ${appId.getId} to ResourceManager") + yarnClient.submitApplication(appContext) + appId + } catch { + case e: Throwable => + if (appId != null) { + val appStagingDir = getAppStagingDir(appId) + try { + val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false) + val stagingDirPath = new Path(appStagingDir) + val fs = FileSystem.get(hadoopConf) + if (!preserveFiles && fs.exists(stagingDirPath)) { + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logWarning("Failed to cleanup staging dir " + appStagingDir, ioe) + } + } + throw e + } } /**