[SPARK-24149][YARN][FOLLOW-UP] Only get the delegation tokens of the filesystem explicitly specified by the user

## What changes were proposed in this pull request?

Our HDFS cluster configured 5 nameservices: `nameservices1`, `nameservices2`, `nameservices3`, `nameservices-dev1` and `nameservices4`, but `nameservices-dev1` unstable. So sometimes an error occurred and causing the entire job failed since [SPARK-24149](https://issues.apache.org/jira/browse/SPARK-24149):

![image](https://user-images.githubusercontent.com/5399861/42434779-f10c48fc-8386-11e8-98b0-4d9786014744.png)

I think it's best to add a switch here.

## How was this patch tested?

manual tests

Closes #21734 from wangyum/SPARK-24149.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Yuming Wang 2018-08-27 13:26:55 -07:00 committed by Marcelo Vanzin
parent 810d59ce44
commit c3f285c939

View file

@ -27,11 +27,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils
@ -193,8 +190,7 @@ object YarnSparkHadoopUtil {
sparkConf: SparkConf,
hadoopConf: Configuration): Set[FileSystem] = {
val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
.map(new Path(_).getFileSystem(hadoopConf))
.toSet
val requestAllDelegationTokens = filesystemsToAccess.isEmpty
val stagingFS = sparkConf.get(STAGING_DIR)
.map(new Path(_).getFileSystem(hadoopConf))
@ -203,8 +199,8 @@ object YarnSparkHadoopUtil {
// Add the list of available namenodes for all namespaces in HDFS federation.
// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
// namespaces.
val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
Set.empty
val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") {
filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
} else {
val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
// Retrieving the filesystem for the nameservices where HA is not enabled
@ -222,7 +218,7 @@ object YarnSparkHadoopUtil {
(filesystemsWithoutHA ++ filesystemsWithHA).toSet
}
filesystemsToAccess ++ hadoopFilesystems + stagingFS
hadoopFilesystems + stagingFS
}
}