From 43298d157d58d5d03ffab818f8cdfc6eac783c55 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 13 Dec 2016 10:37:45 -0800 Subject: [PATCH] [SPARK-18840][YARN] Avoid throw exception when getting token renewal interval in non HDFS security environment ## What changes were proposed in this pull request? Fix `java.util.NoSuchElementException` when running Spark in non-hdfs security environment. In the current code, we assume `HDFS_DELEGATION_KIND` token will be found in Credentials. But in some cloud environments, HDFS is not required, so we should avoid this exception. ## How was this patch tested? Manually verified in local environment. Author: jerryshao Closes #16265 from jerryshao/SPARK-18840. --- .../security/HDFSCredentialProvider.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala index 8d06d735ba..ebb176bc95 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala @@ -72,21 +72,22 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - sparkConf.get(PRINCIPAL).map { renewer => + sparkConf.get(PRINCIPAL).flatMap { renewer => val creds = new Credentials() nnsToAccess(hadoopConf, sparkConf).foreach { dst => val dstFs = dst.getFileSystem(hadoopConf) dstFs.addDelegationTokens(renewer, creds) } - val t = creds.getAllTokens.asScala - .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) - .head - val newExpiration = t.renew(hadoopConf) - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal Interval is $interval") - interval + val hdfsToken = creds.getAllTokens.asScala + .find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + hdfsToken.map { t => + val newExpiration = t.renew(hadoopConf) + val identifier = new DelegationTokenIdentifier() + identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal Interval is $interval") + interval + } } }