[SPARK-18840][YARN] Avoid throw exception when getting token renewal interval in non HDFS security environment
## What changes were proposed in this pull request? Fix `java.util.NoSuchElementException` when running Spark in non-hdfs security environment. In the current code, we assume `HDFS_DELEGATION_KIND` token will be found in Credentials. But in some cloud environments, HDFS is not required, so we should avoid this exception. ## How was this patch tested? Manually verified in local environment. Author: jerryshao <sshao@hortonworks.com> Closes #16265 from jerryshao/SPARK-18840.
This commit is contained in:
parent
5572ccf86b
commit
43298d157d
|
@ -72,21 +72,22 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
|
|||
// We cannot use the tokens generated with renewer yarn. Trying to renew
|
||||
// those will fail with an access control issue. So create new tokens with the logged in
|
||||
// user as renewer.
|
||||
sparkConf.get(PRINCIPAL).map { renewer =>
|
||||
sparkConf.get(PRINCIPAL).flatMap { renewer =>
|
||||
val creds = new Credentials()
|
||||
nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
|
||||
val dstFs = dst.getFileSystem(hadoopConf)
|
||||
dstFs.addDelegationTokens(renewer, creds)
|
||||
}
|
||||
val t = creds.getAllTokens.asScala
|
||||
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
|
||||
.head
|
||||
val newExpiration = t.renew(hadoopConf)
|
||||
val identifier = new DelegationTokenIdentifier()
|
||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
|
||||
val interval = newExpiration - identifier.getIssueDate
|
||||
logInfo(s"Renewal Interval is $interval")
|
||||
interval
|
||||
val hdfsToken = creds.getAllTokens.asScala
|
||||
.find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
|
||||
hdfsToken.map { t =>
|
||||
val newExpiration = t.renew(hadoopConf)
|
||||
val identifier = new DelegationTokenIdentifier()
|
||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
|
||||
val interval = newExpiration - identifier.getIssueDate
|
||||
logInfo(s"Renewal Interval is $interval")
|
||||
interval
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue