[SPARK-27022][DSTREAMS] Add kafka delegation token support.

## What changes were proposed in this pull request?

It adds Kafka delegation token support for DStreams. Please be aware as Kafka native sink is not available for DStreams this PR contains delegation token usage only on consumer side.

What this PR contains:
* Usage of token through dynamic JAAS configuration
* `KafkaConfigUpdater` moved to `kafka-0-10-token-provider`
* `KafkaSecurityHelper` functionality moved into `KafkaTokenUtil`
* Documentation

## How was this patch tested?

Existing unit tests + on cluster.

Long running Kafka to file tests on 4 node cluster with randomly thrown artificial exceptions.

Test scenario:

* 4 node cluster
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512

Kafka broker settings:

* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)

After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
When token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token picked up correctly.

cd docs/
SKIP_API=1 jekyll build
Manual webpage check.

Closes #23929 from gaborgsomogyi/SPARK-27022.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Gabor Somogyi 2019-03-07 11:36:37 -08:00 committed by Marcelo Vanzin
parent ddc2052ebd
commit 98a8725e66
16 changed files with 101 additions and 147 deletions

View file

@ -315,3 +315,10 @@ As with any Spark applications, `spark-submit` is used to launch your applicatio
For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
### Security
See [Structured Streaming Security](structured-streaming-kafka-integration.html#security).
##### Additional Caveats
- Kafka native sink is not available so delegation token used only on consumer side.

View file

@ -28,6 +28,7 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
private[kafka010] object CachedKafkaProducer extends Logging {

View file

@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.spark.kafka010.KafkaConfigUpdater
/**
* Subscribe allows you to subscribe to a fixed collection of topics.
* SubscribePattern allows you to use a regex to specify topics of interest.

View file

@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread

View file

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.common.security.scram.ScramLoginModule
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.kafka010.KafkaTokenUtil
private[kafka010] object KafkaSecurityHelper extends Logging {
def isTokenAvailable(): Boolean = {
UserGroupInformation.getCurrentUser().getCredentials.getToken(
KafkaTokenUtil.TOKEN_SERVICE) != null
}
def getTokenJaasParams(sparkConf: SparkConf): String = {
val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
KafkaTokenUtil.TOKEN_SERVICE)
val username = new String(token.getIdentifier)
val password = new String(token.getPassword)
val loginModuleName = classOf[ScramLoginModule].getName
val params =
s"""
|$loginModuleName required
| tokenauth=true
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
| username="$username"
| password="$password";
""".stripMargin.replace("\n", "")
logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
params
}
}

View file

@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._

View file

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import org.apache.spark.{SparkConf, SparkFunSuite}
class KafkaSecurityHelperSuite extends SparkFunSuite with KafkaDelegationTokenTest {
test("isTokenAvailable without token should return false") {
assert(!KafkaSecurityHelper.isTokenAvailable())
}
test("isTokenAvailable with token should return true") {
addTokenToUGI()
assert(KafkaSecurityHelper.isTokenAvailable())
}
test("getTokenJaasParams with token should return scram module") {
addTokenToUGI()
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(new SparkConf())
assert(jaasParams.contains("ScramLoginModule required"))
assert(jaasParams.contains("tokenauth=true"))
assert(jaasParams.contains(tokenId))
assert(jaasParams.contains(tokenPassword))
}
}

View file

@ -52,6 +52,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
package org.apache.spark.kafka010
import java.{util => ju}
@ -26,12 +26,11 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kafka
import org.apache.spark.kafka010.KafkaTokenUtil
/**
* Class to conveniently update Kafka config params, while logging the changes
*/
private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
extends Logging {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
@ -58,9 +57,9 @@ private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else if (KafkaSecurityHelper.isTokenAvailable()) {
} else if (KafkaTokenUtil.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
val jaasParams = KafkaTokenUtil.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),

View file

@ -31,6 +31,7 @@ import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.apache.kafka.common.security.scram.ScramLoginModule
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.spark.SparkConf
@ -154,7 +155,7 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}
private[kafka010] def getKeytabJaasParams(sparkConf: SparkConf): String = {
private def getKeytabJaasParams(sparkConf: SparkConf): String = {
val params =
s"""
|${getKrb5LoginModuleName} required
@ -167,7 +168,7 @@ private[spark] object KafkaTokenUtil extends Logging {
params
}
def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
private def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
require(serviceName.nonEmpty, "Kerberos service name must be defined")
@ -208,4 +209,29 @@ private[spark] object KafkaTokenUtil extends Logging {
dateFormat.format(tokenInfo.maxTimestamp)))
}
}
def isTokenAvailable(): Boolean = {
UserGroupInformation.getCurrentUser().getCredentials.getToken(
KafkaTokenUtil.TOKEN_SERVICE) != null
}
def getTokenJaasParams(sparkConf: SparkConf): String = {
val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
KafkaTokenUtil.TOKEN_SERVICE)
val username = new String(token.getIdentifier)
val password = new String(token.getPassword)
val loginModuleName = classOf[ScramLoginModule].getName
val params =
s"""
|$loginModuleName required
| tokenauth=true
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
| username="$username"
| password="$password";
""".stripMargin.replace("\n", "")
logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
params
}
}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
package org.apache.spark.kafka010
import org.apache.kafka.common.config.SaslConfigs

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
package org.apache.spark.kafka010
import java.{util => ju}
import javax.security.auth.login.{AppConfigurationEntry, Configuration}
@ -26,7 +26,6 @@ import org.mockito.Mockito.mock
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.kafka010.KafkaTokenUtil
import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier
/**

View file

@ -17,20 +17,17 @@
package org.apache.spark.kafka010
import java.{util => ju}
import java.security.PrivilegedExceptionAction
import javax.security.auth.login.{AppConfigurationEntry, Configuration}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
private val bootStrapServers = "127.0.0.1:0"
private val trustStoreLocation = "/path/to/trustStore"
private val trustStorePassword = "trustStoreSecret"
@ -42,44 +39,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
private var sparkConf: SparkConf = null
private class KafkaJaasConfiguration extends Configuration {
val entry =
new AppConfigurationEntry(
"DummyModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
ju.Collections.emptyMap[String, Object]()
)
override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
if (name.equals("KafkaClient")) {
Array(entry)
} else {
null
}
}
}
override def beforeEach(): Unit = {
super.beforeEach()
sparkConf = new SparkConf()
}
override def afterEach(): Unit = {
try {
resetGlobalConfig()
} finally {
super.afterEach()
}
}
private def setGlobalKafkaClientConfig(): Unit = {
Configuration.setConfiguration(new KafkaJaasConfiguration)
}
private def resetGlobalConfig(): Unit = {
Configuration.setConfiguration(null)
}
test("checkProxyUser with proxy current user should throw exception") {
val realUser = UserGroupInformation.createUserForTesting("realUser", Array())
UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs(
@ -229,4 +193,25 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
}
test("isTokenAvailable without token should return false") {
assert(!KafkaTokenUtil.isTokenAvailable())
}
test("isTokenAvailable with token should return true") {
addTokenToUGI()
assert(KafkaTokenUtil.isTokenAvailable())
}
test("getTokenJaasParams with token should return scram module") {
addTokenToUGI()
val jaasParams = KafkaTokenUtil.getTokenJaasParams(new SparkConf())
assert(jaasParams.contains("ScramLoginModule required"))
assert(jaasParams.contains("tokenauth=true"))
assert(jaasParams.contains(tokenId))
assert(jaasParams.contains(tokenPassword))
}
}

View file

@ -34,6 +34,11 @@
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-token-provider-kafka-0-10_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>

View file

@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
/**
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
@ -54,6 +55,15 @@ abstract class ConsumerStrategy[K, V] {
* checkpoint.
*/
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
/**
* Updates the parameters with security if needed.
* Added a function to hide internals and reduce code duplications because all strategy uses it.
*/
protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) =
KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
.setAuthenticationConfigIfNeeded()
.build()
}
/**
@ -78,7 +88,8 @@ private case class Subscribe[K, V](
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
consumer.subscribe(topics)
val toSeek = if (currentOffsets.isEmpty) {
offsets
@ -134,7 +145,8 @@ private case class SubscribePattern[K, V](
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
val toSeek = if (currentOffsets.isEmpty) {
offsets
@ -186,7 +198,8 @@ private case class Assign[K, V](
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
consumer.assign(topicPartitions)
val toSeek = if (currentOffsets.isEmpty) {
offsets

View file

@ -19,11 +19,14 @@ package org.apache.spark.streaming.kafka010
import java.{util => ju}
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
private[kafka010] sealed trait KafkaDataConsumer[K, V] {
/**
@ -109,7 +112,10 @@ private[kafka010] class InternalKafkaConsumer[K, V](
/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[K, V] = {
val c = new KafkaConsumer[K, V](kafkaParams)
val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
.setAuthenticationConfigIfNeeded()
.build()
val c = new KafkaConsumer[K, V](updatedKafkaParams)
val topics = ju.Arrays.asList(topicPartition)
c.assign(topics)
c