[SPARK-13063][YARN] Make the SPARK YARN STAGING DIR as configurable

## What changes were proposed in this pull request?
Made the SPARK YARN STAGING DIR as configurable with the configuration as 'spark.yarn.staging-dir'.

## How was this patch tested?

I have verified it manually by running applications on yarn, If the 'spark.yarn.staging-dir' is configured then the value used as staging directory otherwise uses the default value i.e. file system’s home directory for the user.

Author: Devaraj K <devaraj@apache.org>

Closes #12082 from devaraj-kavali/SPARK-13063.
This commit is contained in:
Devaraj K 2016-04-05 14:12:00 -05:00 committed by Tom Graves
parent 463bac0011
commit bc36df127d
3 changed files with 27 additions and 3 deletions

View file

@ -159,6 +159,13 @@ If you need a reference to the proper location to put log files in the YARN so t
HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
</td>
</tr>
<tr>
<td><code>spark.yarn.stagingDir</code></td>
<td>Current user's home directory in the filesystem</td>
<td>
Staging directory used while submitting applications.
</td>
</tr>
<tr>
<td><code>spark.yarn.preserve.staging.files</code></td>
<td><code>false</code></td>

View file

@ -182,8 +182,8 @@ private[spark] class Client(
val appStagingDir = getAppStagingDir(appId)
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
val stagingDirPath = new Path(appStagingDir)
val fs = FileSystem.get(hadoopConf)
val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
@ -357,7 +357,7 @@ private[spark] class Client(
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir)
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
@ -668,7 +668,7 @@ private[spark] class Client(
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (loginFromKeytab) {
val remoteFs = FileSystem.get(hadoopConf)
val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir)
val credentialsFile = "credentials-" + UUID.randomUUID().toString
sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
@ -1438,4 +1438,16 @@ private object Client extends Logging {
uri.startsWith(s"$LOCAL_SCHEME:")
}
/**
* Returns the app staging dir based on the STAGING_DIR configuration if configured
* otherwise based on the users home directory.
*/
private def getAppStagingDirPath(
conf: SparkConf,
fs: FileSystem,
appStagingDir: String): Path = {
val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory())
new Path(baseDir, appStagingDir)
}
}

View file

@ -108,6 +108,11 @@ package object config {
.intConf
.optional
private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.stringConf
.optional
/* Cluster-mode launcher configuration. */
private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")