[SPARK-34295][CORE] Exclude filesystems from token renewal at YARN
### What changes were proposed in this pull request? This patch adds a config `spark.yarn.kerberos.renewal.excludeHadoopFileSystems` which lists the filesystems to be excluded from delegation token renewal at YARN. ### Why are the changes needed? MapReduce jobs can instruct YARN to skip renewal of tokens obtained from certain hosts by specifying the hosts with configuration mapreduce.job.hdfs-servers.token-renewal.exclude=<host1>,<host2>,..,<hostN>. But seems Spark lacks of similar option. So the job submission fails if YARN fails to renew DelegationToken for any of the remote HDFS cluster. The failure in DT renewal can happen due to many reason like Remote HDFS does not trust Kerberos identity of YARN etc. We have a customer facing such issue. ### Does this PR introduce _any_ user-facing change? No, if the config is not set. Yes, as users can use this config to instruct YARN not to renew delegation token from certain filesystems. ### How was this patch tested? It is hard to do unit test for this. We did verify it work from the customer using this fix in the production environment. Closes #31761 from viirya/SPARK-34295. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
parent
2298cebcf8
commit
95c61df0fa
|
@ -48,7 +48,12 @@ private[deploy] class HadoopFSDelegationTokenProvider
|
|||
creds: Credentials): Option[Long] = {
|
||||
try {
|
||||
val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
|
||||
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)
|
||||
// The hosts on which the file systems to be excluded from token renewal
|
||||
val fsToExclude = sparkConf.get(YARN_KERBEROS_FILESYSTEM_RENEWAL_EXCLUDE)
|
||||
.map(new Path(_).getFileSystem(hadoopConf).getUri.getHost)
|
||||
.toSet
|
||||
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds,
|
||||
fsToExclude)
|
||||
|
||||
// Get the token renewal interval if it is not set. It will only be called once.
|
||||
if (tokenRenewalInterval == null) {
|
||||
|
@ -99,11 +104,18 @@ private[deploy] class HadoopFSDelegationTokenProvider
|
|||
private def fetchDelegationTokens(
|
||||
renewer: String,
|
||||
filesystems: Set[FileSystem],
|
||||
creds: Credentials): Credentials = {
|
||||
creds: Credentials,
|
||||
fsToExclude: Set[String]): Credentials = {
|
||||
|
||||
filesystems.foreach { fs =>
|
||||
logInfo(s"getting token for: $fs with renewer $renewer")
|
||||
fs.addDelegationTokens(renewer, creds)
|
||||
if (fsToExclude.contains(fs.getUri.getHost)) {
|
||||
// YARN RM skips renewing token with empty renewer
|
||||
logInfo(s"getting token for: $fs with empty renewer to skip renewal")
|
||||
fs.addDelegationTokens("", creds)
|
||||
} else {
|
||||
logInfo(s"getting token for: $fs with renewer $renewer")
|
||||
fs.addDelegationTokens(renewer, creds)
|
||||
}
|
||||
}
|
||||
|
||||
creds
|
||||
|
@ -119,7 +131,7 @@ private[deploy] class HadoopFSDelegationTokenProvider
|
|||
val renewer = UserGroupInformation.getCurrentUser().getUserName()
|
||||
|
||||
val creds = new Credentials()
|
||||
fetchDelegationTokens(renewer, filesystems, creds)
|
||||
fetchDelegationTokens(renewer, filesystems, creds, Set.empty)
|
||||
|
||||
val renewIntervals = creds.getAllTokens.asScala.filter {
|
||||
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
|
||||
|
|
|
@ -716,6 +716,18 @@ package object config {
|
|||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
private[spark] val YARN_KERBEROS_FILESYSTEM_RENEWAL_EXCLUDE =
|
||||
ConfigBuilder("spark.yarn.kerberos.renewal.excludeHadoopFileSystems")
|
||||
.doc("The list of Hadoop filesystem URLs whose hosts will be excluded from " +
|
||||
"delegation token renewal at resource scheduler. Currently this is known to " +
|
||||
"work under YARN, so YARN Resource Manager won't renew tokens for the application. " +
|
||||
"Note that as resource scheduler does not renew token, so any application running " +
|
||||
"longer than the original token expiration that tries to use that token will likely fail.")
|
||||
.version("3.2.0")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
|
||||
.version("1.0.0")
|
||||
.intConf
|
||||
|
|
|
@ -699,6 +699,18 @@ staging directory of the Spark application.
|
|||
</td>
|
||||
<td>2.3.0</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.yarn.kerberos.renewal.excludeHadoopFileSystems</code></td>
|
||||
<td>(none)</td>
|
||||
<td>
|
||||
A comma-separated list of Hadoop filesystems for whose hosts will be excluded from from delegation
|
||||
token renewal at resource scheduler. For example, <code>spark.yarn.kerberos.renewal.excludeHadoopFileSystems=hdfs://nn1.com:8032,
|
||||
hdfs://nn2.com:8032</code>. This is known to work under YARN for now, so YARN Resource Manager won't renew tokens for the application.
|
||||
Note that as resource scheduler does not renew token, so any application running longer than the original token expiration that tries
|
||||
to use that token will likely fail.
|
||||
</td>
|
||||
<td>3.2.0</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
## Troubleshooting Kerberos
|
||||
|
|
|
@ -840,6 +840,9 @@ The following options provides finer-grained control for this feature:
|
|||
</tr>
|
||||
</table>
|
||||
|
||||
Users can exclude Kerberos delegation token renewal at resource scheduler. Currently it is only supported
|
||||
on YARN. The configuration is covered in the [Running Spark on YARN](running-on-yarn.html#yarn-specific-kerberos-configuration) page.
|
||||
|
||||
## Long-Running Applications
|
||||
|
||||
Long-running applications may run into issues if their run time exceeds the maximum delegation
|
||||
|
|
Loading…
Reference in a new issue