[SPARK-26772][YARN] Delete ServiceCredentialProvider and make HadoopDelegationTokenProvider a developer API

## What changes were proposed in this pull request?

`HadoopDelegationTokenProvider` has basically the same functionality just like `ServiceCredentialProvider` so the interfaces can be merged.

`YARNHadoopDelegationTokenManager` now loads `ServiceCredentialProvider`s in one step. The drawback of this if one provider fails all others are not loaded. `HadoopDelegationTokenManager` loads `HadoopDelegationTokenProvider`s independently so it provides more robust behaviour.

In this PR I've I've made the following changes:
* Deleted `YARNHadoopDelegationTokenManager` and `ServiceCredentialProvider`
* Made `HadoopDelegationTokenProvider` a `DeveloperApi`

## How was this patch tested?

Existing unit tests.

Closes #23686 from gaborgsomogyi/SPARK-26772.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Gabor Somogyi 2019-02-15 14:43:13 -08:00 committed by Marcelo Vanzin
parent b6c6875571
commit 28ced387b9
19 changed files with 21 additions and 208 deletions

View file

@ -23,12 +23,12 @@ import scala.reflect.runtime.universe
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.security.HadoopDelegationTokenProvider
import org.apache.spark.util.Utils
private[security] class HBaseDelegationTokenProvider

View file

@ -26,7 +26,6 @@ import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
@ -35,6 +34,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.security.HadoopDelegationTokenProvider
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.{ThreadUtils, Utils}

View file

@ -30,6 +30,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.security.HadoopDelegationTokenProvider
private[deploy] class HadoopFSDelegationTokenProvider
extends HadoopDelegationTokenProvider with Logging {

View file

@ -15,18 +15,20 @@
* limitations under the License.
*/
package org.apache.spark.deploy.security
package org.apache.spark.security
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
/**
* ::DeveloperApi::
* Hadoop delegation token provider.
*/
private[spark] trait HadoopDelegationTokenProvider {
@DeveloperApi
trait HadoopDelegationTokenProvider {
/**
* Name of the service to provide delegation tokens. This name should be unique. Spark will

View file

@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.security.HadoopDelegationTokenProvider
private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider {
ExceptionThrowingDelegationTokenProvider.constructed = true

View file

@ -538,13 +538,6 @@ for:
filesystem if `spark.yarn.stagingDir` is not set);
- if Hadoop federation is enabled, all the federated filesystems in the configuration.
The YARN integration also supports custom delegation token providers using the Java Services
mechanism (see `java.util.ServiceLoader`). Implementations of
`org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` can be made available to Spark
by listing their names in the corresponding file in the jar's `META-INF/services` directory. These
providers can be disabled individually by setting `spark.security.credentials.{service}.enabled` to
`false`, where `{service}` is the name of the credential provider.
## YARN-specific Kerberos Configuration
<table class="table">

View file

@ -756,6 +756,11 @@ If an application needs to interact with other secure Hadoop filesystems, their
explicitly provided to Spark at launch time. This is done by listing them in the
`spark.kerberos.access.hadoopFileSystems` property, described in the configuration section below.
Spark also supports custom delegation token providers using the Java Services
mechanism (see `java.util.ServiceLoader`). Implementations of
`org.apache.spark.security.HadoopDelegationTokenProvider` can be made available to Spark
by listing their names in the corresponding file in the jar's `META-INF/services` directory.
Delegation token support is currently only supported in YARN and Mesos modes. Consult the
deployment-specific page for more information.

View file

@ -25,9 +25,9 @@ import org.apache.hadoop.security.Credentials
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.security.HadoopDelegationTokenProvider
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kafka
import org.apache.spark.security.HadoopDelegationTokenProvider
private[spark] class KafkaDelegationTokenProvider
extends HadoopDelegationTokenProvider with Logging {

View file

@ -21,7 +21,6 @@ import java.io.{FileSystem => _, _}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.security.PrivilegedExceptionAction
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}
@ -34,9 +33,9 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.{DataOutputBuffer, Text}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@ -50,8 +49,8 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
@ -315,7 +314,7 @@ private[spark] class Client(
val credentials = currentUser.getCredentials()
if (isClusterMode) {
val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null)
val credentialManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null)
credentialManager.obtainDelegationTokens(credentials)
}

View file

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn.security
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
/**
* A credential provider for a service. User must implement this if they need to access a
* secure service from Spark.
*/
trait ServiceCredentialProvider {
/**
* Name of the service to provide credentials. This name should unique, Spark internally will
* use this name to differentiate credential provider.
*/
def serviceName: String
/**
* Returns true if credentials are required by this service. By default, it is based on whether
* Hadoop security is enabled.
*/
def credentialsRequired(hadoopConf: Configuration): Boolean = {
UserGroupInformation.isSecurityEnabled
}
/**
* Obtain credentials for this service and get the time of the next renewal.
*
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param sparkConf Spark configuration.
* @param creds Credentials to add tokens and security keys to.
* @return If this Credential is renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long]
}

View file

@ -1,75 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn.security
import java.util.ServiceLoader
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.spark.SparkConf
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
/**
* This class loads delegation token providers registered under the YARN-specific
* [[ServiceCredentialProvider]] interface, as well as the builtin providers defined
* in [[HadoopDelegationTokenManager]].
*/
private[spark] class YARNHadoopDelegationTokenManager(
_sparkConf: SparkConf,
_hadoopConf: Configuration,
_schedulerRef: RpcEndpointRef)
extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf, _schedulerRef) {
private val credentialProviders = {
ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader)
.asScala
.toList
.filter { p => isServiceEnabled(p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
}
if (credentialProviders.nonEmpty) {
logDebug("Using the following YARN-specific credential providers: " +
s"${credentialProviders.keys.mkString(", ")}.")
}
override def obtainDelegationTokens(creds: Credentials): Long = {
val superInterval = super.obtainDelegationTokens(creds)
credentialProviders.values.flatMap { provider =>
if (provider.credentialsRequired(hadoopConf)) {
provider.obtainCredentials(hadoopConf, sparkConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
None
}
}.foldLeft(superInterval)(math.min)
}
// For testing.
override def isProviderLoaded(serviceName: String): Boolean = {
credentialProviders.contains(serviceName) || super.isProviderLoaded(serviceName)
}
}

View file

@ -31,14 +31,12 @@ import org.eclipse.jetty.servlet.{FilterHolder, FilterMapping}
import org.apache.spark.SparkContext
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.UI._
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{RpcUtils, ThreadUtils}
/**
@ -223,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend(
}
override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint))
Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint))
}
/**

View file

@ -1 +0,0 @@
org.apache.spark.deploy.yarn.security.YARNTestCredentialProvider

View file

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn.security
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.spark.{SparkConf, SparkFunSuite}
class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite {
private var credentialManager: YARNHadoopDelegationTokenManager = null
private var sparkConf: SparkConf = null
private var hadoopConf: Configuration = null
override def beforeAll(): Unit = {
super.beforeAll()
sparkConf = new SparkConf()
hadoopConf = new Configuration()
}
test("Correctly loads credential providers") {
credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null)
assert(credentialManager.isProviderLoaded("yarn-test"))
}
}
class YARNTestCredentialProvider extends ServiceCredentialProvider {
override def serviceName: String = "yarn-test"
override def credentialsRequired(conf: Configuration): Boolean = true
override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = None
}

View file

@ -23,7 +23,6 @@ import java.security.PrivilegedExceptionAction
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.metadata.Hive
@ -33,9 +32,9 @@ import org.apache.hadoop.security.token.Token
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenProvider
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.KEYTAB
import org.apache.spark.security.HadoopDelegationTokenProvider
import org.apache.spark.util.Utils
private[spark] class HiveDelegationTokenProvider