[SPARK-28760][SS][TESTS] Add Kafka delegation token end-to-end test with mini KDC

### What changes were proposed in this pull request?
At the moment no end-to-end Kafka delegation token test exists which was mainly because of missing embedded KDC. KDC is missing in general from the testing side so I've discovered what kind of possibilities are there. The most obvious choice is the MiniKDC inside the Hadoop library where Apache Kerby runs in the background. What this PR contains:
* Added MiniKDC as test dependency from Hadoop
* Added `maven-bundle-plugin` because couple of dependencies are coming in bundle format
* Added security mode to `KafkaTestUtils`. Namely start KDC -> start Zookeeper in secure mode -> start Kafka in secure mode
* Added a roundtrip test (saves and reads back data from Kafka)

### Why are the changes needed?
No such test exists + security testing with KDC is completely missing.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing + additional unit tests.
I've put the additional test into a loop and was consuming ~10 sec average.

Closes #25477 from gaborgsomogyi/SPARK-28760.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Gabor Somogyi 2019-08-29 11:52:35 -07:00 committed by Marcelo Vanzin
parent fb1053d14a
commit 7d72c073dd
5 changed files with 314 additions and 17 deletions

View file

@ -92,6 +92,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
</dependency>
<!-- Kafka embedded server uses Zookeeper 3.4.7 API -->
<dependency>
<groupId>org.apache.zookeeper</groupId>

View file

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.util.UUID
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_PLAINTEXT
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
import org.apache.spark.sql.test.SharedSparkSession
class KafkaDelegationTokenSuite extends StreamTest with SharedSparkSession with KafkaTest {
import testImplicits._
protected var testUtils: KafkaTestUtils = _
protected override def sparkConf = super.sparkConf
.set("spark.security.credentials.hadoopfs.enabled", "false")
.set("spark.security.credentials.hbase.enabled", "false")
.set(KEYTAB, testUtils.clientKeytab)
.set(PRINCIPAL, testUtils.clientPrincipal)
.set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", testUtils.brokerAddress)
.set("spark.kafka.clusters.cluster1.security.protocol", SASL_PLAINTEXT.name)
override def beforeAll(): Unit = {
testUtils = new KafkaTestUtils(Map.empty, true)
testUtils.setup()
super.beforeAll()
}
override def afterAll(): Unit = {
try {
if (testUtils != null) {
testUtils.teardown()
testUtils = null
}
UserGroupInformation.reset()
} finally {
super.afterAll()
}
}
test("Roundtrip") {
val hadoopConf = new Configuration()
val manager = new HadoopDelegationTokenManager(spark.sparkContext.conf, hadoopConf, null)
val credentials = new Credentials()
manager.obtainDelegationTokens(credentials)
val serializedCredentials = SparkHadoopUtil.get.serialize(credentials)
SparkHadoopUtil.get.addDelegationTokens(serializedCredentials, spark.sparkContext.conf)
val topic = "topic-" + UUID.randomUUID().toString
testUtils.createTopic(topic, partitions = 5)
withTempDir { checkpointDir =>
val input = MemoryStream[String]
val df = input.toDF()
val writer = df.writeStream
.outputMode(OutputMode.Append)
.format("kafka")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.security.protocol", SASL_PLAINTEXT.name)
.option("topic", topic)
.start()
try {
input.addData("1", "2", "3", "4", "5")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
} finally {
writer.stop()
}
}
val streamingDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.security.protocol", SASL_PLAINTEXT.name)
.option("startingOffsets", s"earliest")
.option("subscribe", topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.map(kv => kv._2.toInt + 1)
testStream(streamingDf)(
StartStream(),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer(2, 3, 4, 5, 6),
StopStream
)
}
}

View file

@ -20,29 +20,38 @@ package org.apache.spark.sql.kafka010
import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.nio.charset.StandardCharsets
import java.util.{Collections, Map => JMap, Properties, UUID}
import java.util.concurrent.TimeUnit
import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
import scala.util.Random
import com.google.common.io.Files
import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.apache.zookeeper.server.auth.SASLAuthenticationProvider
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaTokenUtil
import org.apache.spark.util.{ShutdownHookManager, Utils}
/**
@ -51,31 +60,41 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
*
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends Logging {
class KafkaTestUtils(
withBrokerProps: Map[String, Object] = Map.empty,
secure: Boolean = false) extends Logging {
private val JAVA_AUTH_CONFIG = "java.security.auth.login.config"
private var kdc: MiniKdc = _
// Zookeeper related configurations
private val zkHost = "127.0.0.1"
private val zkHost = "localhost"
private var zkPort: Int = 0
private val zkConnectionTimeout = 60000
private val zkSessionTimeout = 10000
private var zookeeper: EmbeddedZookeeper = _
private var zkUtils: ZkUtils = _
private var adminClient: AdminClient = null
// Kafka broker related configurations
private val brokerHost = "127.0.0.1"
private val brokerHost = "localhost"
private var brokerPort = 0
private var brokerConf: KafkaConfig = _
private val brokerServiceName = "kafka"
private val clientUser = "client/localhost"
private var clientKeytabFile: File = _
// Kafka broker server
private var server: KafkaServer = _
private var adminClient: AdminClient = _
// Kafka producer
private var producer: Producer[String, String] = _
// Flag to test whether the system is correctly started
private var kdcReady = false
private var zkReady = false
private var brokerReady = false
private var leakDetector: AnyRef = null
@ -96,6 +115,84 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
throw new IllegalStateException("Zookeeper client is not yet initialized"))
}
def clientPrincipal: String = {
assert(kdcReady, "KDC should be set up beforehand")
clientUser + "@" + kdc.getRealm()
}
def clientKeytab: String = {
assert(kdcReady, "KDC should be set up beforehand")
clientKeytabFile.getAbsolutePath()
}
private def setUpMiniKdc(): Unit = {
val kdcDir = Utils.createTempDir()
val kdcConf = MiniKdc.createConf()
kdc = new MiniKdc(kdcConf, kdcDir)
kdc.start()
kdcReady = true
}
private def createKeytabsAndJaasConfigFile(): String = {
assert(kdcReady, "KDC should be set up beforehand")
val baseDir = Utils.createTempDir()
val zkServerUser = "zookeeper/localhost"
val zkServerKeytabFile = new File(baseDir, "zookeeper.keytab")
kdc.createPrincipal(zkServerKeytabFile, zkServerUser)
logDebug(s"Created keytab file: ${zkServerKeytabFile.getAbsolutePath()}")
val zkClientUser = "zkclient/localhost"
val zkClientKeytabFile = new File(baseDir, "zkclient.keytab")
kdc.createPrincipal(zkClientKeytabFile, zkClientUser)
logDebug(s"Created keytab file: ${zkClientKeytabFile.getAbsolutePath()}")
val kafkaServerUser = "kafka/localhost"
val kafkaServerKeytabFile = new File(baseDir, "kafka.keytab")
kdc.createPrincipal(kafkaServerKeytabFile, kafkaServerUser)
logDebug(s"Created keytab file: ${kafkaServerKeytabFile.getAbsolutePath()}")
clientKeytabFile = new File(baseDir, "client.keytab")
kdc.createPrincipal(clientKeytabFile, clientUser)
logDebug(s"Created keytab file: ${clientKeytabFile.getAbsolutePath()}")
val file = new File(baseDir, "jaas.conf");
val realm = kdc.getRealm()
val content =
s"""
|Server {
| ${KafkaTokenUtil.getKrb5LoginModuleName} required
| useKeyTab=true
| storeKey=true
| useTicketCache=false
| keyTab="${zkServerKeytabFile.getAbsolutePath()}"
| principal="$zkServerUser@$realm";
|};
|
|Client {
| ${KafkaTokenUtil.getKrb5LoginModuleName} required
| useKeyTab=true
| storeKey=true
| useTicketCache=false
| keyTab="${zkClientKeytabFile.getAbsolutePath()}"
| principal="$zkClientUser@$realm";
|};
|
|KafkaServer {
| ${KafkaTokenUtil.getKrb5LoginModuleName} required
| serviceName="$brokerServiceName"
| useKeyTab=true
| storeKey=true
| keyTab="${kafkaServerKeytabFile.getAbsolutePath()}"
| principal="$kafkaServerUser@$realm";
|};
""".stripMargin.trim
Files.write(content, file, StandardCharsets.UTF_8)
logDebug(s"Created JAAS file: ${file.getPath}")
logDebug(s"JAAS file content: $content")
file.getAbsolutePath()
}
// Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup
@ -110,20 +207,20 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private def setupEmbeddedKafkaServer(): Unit = {
assert(zkReady, "Zookeeper should be set up beforehand")
val protocolName = if (!secure) PLAINTEXT.name else SASL_PLAINTEXT.name
// Kafka broker startup
Utils.startServiceOnPort(brokerPort, port => {
brokerPort = port
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
brokerPort = server.boundPort(new ListenerName("PLAINTEXT"))
brokerPort = server.boundPort(new ListenerName(protocolName))
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")
adminClient = AdminClient.create(adminClientConfiguration)
brokerReady = true
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
adminClient = AdminClient.create(props)
}
/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
@ -135,6 +232,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
logError("Found a leak KafkaTestUtils.", exception)
}
if (secure) {
setUpMiniKdc()
val jaasConfigFile = createKeytabsAndJaasConfigFile()
System.setProperty(JAVA_AUTH_CONFIG, jaasConfigFile)
Configuration.getConfiguration.refresh()
} else {
System.clearProperty(JAVA_AUTH_CONFIG)
}
setupEmbeddedZookeeper()
setupEmbeddedKafkaServer()
eventually(timeout(1.minute)) {
@ -186,6 +291,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
zookeeper.shutdown()
zookeeper = null
}
System.clearProperty(JAVA_AUTH_CONFIG)
Configuration.getConfiguration.refresh()
if (kdc != null) {
kdc.stop()
}
UserGroupInformation.reset()
}
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
@ -334,12 +446,27 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("transaction.state.log.replication.factor", "1")
props.put("transaction.state.log.min.isr", "1")
if (secure) {
props.put("listeners", "SASL_PLAINTEXT://127.0.0.1:0")
props.put("advertised.listeners", "SASL_PLAINTEXT://127.0.0.1:0")
props.put("inter.broker.listener.name", "SASL_PLAINTEXT")
props.put("delegation.token.master.key", UUID.randomUUID().toString)
props.put("sasl.enabled.mechanisms", "GSSAPI,SCRAM-SHA-512")
}
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
withBrokerProps.foreach { case (k, v) => props.put(k, v) }
props
}
private def adminClientConfiguration: Properties = {
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
setAuthenticationConfigIfNeeded(props)
props
}
private def producerConfiguration: Properties = {
val props = new Properties()
props.put("bootstrap.servers", brokerAddress)
@ -347,6 +474,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("key.serializer", classOf[StringSerializer].getName)
// wait for all in-sync replicas to ack sends
props.put("acks", "all")
setAuthenticationConfigIfNeeded(props)
props
}
@ -370,9 +498,19 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("value.deserializer", classOf[StringDeserializer].getName)
props.put("key.deserializer", classOf[StringDeserializer].getName)
props.put("enable.auto.commit", "false")
setAuthenticationConfigIfNeeded(props)
props
}
private def setAuthenticationConfigIfNeeded(props: Properties): Unit = {
if (secure) {
val jaasParams = KafkaTokenUtil.getKeytabJaasParams(
clientKeytabFile.getAbsolutePath, clientPrincipal, brokerServiceName)
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT.name)
}
}
/** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
private def verifyTopicDeletion(
topic: String,
@ -454,9 +592,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
}
private class EmbeddedZookeeper(val zkConnect: String) {
private val ZOOKEEPER_AUTH_PROVIDER = "zookeeper.authProvider.1"
val snapshotDir = Utils.createTempDir()
val logDir = Utils.createTempDir()
if (secure) {
System.setProperty(ZOOKEEPER_AUTH_PROVIDER, classOf[SASLAuthenticationProvider].getName)
} else {
System.clearProperty(ZOOKEEPER_AUTH_PROVIDER)
}
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val (ip, port) = {
val splits = zkConnect.split(":")
@ -485,6 +630,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
case e: IOException if Utils.isWindows =>
logWarning(e.getMessage)
}
System.clearProperty(ZOOKEEPER_AUTH_PROVIDER)
}
}
}

View file

@ -125,7 +125,9 @@ private[spark] object KafkaTokenUtil extends Logging {
adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
if (sparkConf.contains(KEYTAB)) {
logDebug("Keytab detected, using it for login.")
val jaasParams = getKeytabJaasParams(sparkConf, clusterConf)
val keyTab = sparkConf.get(KEYTAB).get
val principal = sparkConf.get(PRINCIPAL).get
val jaasParams = getKeytabJaasParams(keyTab, principal, clusterConf.kerberosServiceName)
adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
} else {
logDebug("Using ticket cache for login.")
@ -181,17 +183,18 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}
private def getKeytabJaasParams(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf): String = {
def getKeytabJaasParams(
keyTab: String,
principal: String,
kerberosServiceName: String): String = {
val params =
s"""
|${getKrb5LoginModuleName} required
| debug=${isGlobalKrbDebugEnabled()}
| useKeyTab=true
| serviceName="${clusterConf.kerberosServiceName}"
| keyTab="${sparkConf.get(KEYTAB).get}"
| principal="${sparkConf.get(PRINCIPAL).get}";
| serviceName="$kerberosServiceName"
| keyTab="$keyTab"
| principal="$principal";
""".stripMargin.replace("\n", "")
logDebug(s"Krb keytab JAAS params: $params")
params
@ -213,7 +216,7 @@ private[spark] object KafkaTokenUtil extends Logging {
* Krb5LoginModule package vary in different JVMs.
* Please see Hadoop UserGroupInformation for further details.
*/
private def getKrb5LoginModuleName(): String = {
def getKrb5LoginModuleName(): String = {
if (System.getProperty("java.vendor").contains("IBM")) {
"com.ibm.security.auth.module.Krb5LoginModule"
} else {

24
pom.xml
View file

@ -1026,6 +1026,18 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.directory.api</groupId>
<artifactId>api-ldap-schema-data</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
@ -2883,6 +2895,18 @@
</execution>
</executions>
</plugin>
<!--
Couple of dependencies are coming in bundle format (bundle is just a normal jar which
contains OSGi metadata in the manifest). If one don't use OSGi, then a bundle will work as
any other jar. Since maven doesn't have native bundle support it needs an external plugin
handle it. If the plugin is not added then the build can't resolve bundle dependencies.
-->
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>4.2.0</version>
<extensions>true</extensions>
</plugin>
</plugins>
</build>