diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 827ceb89a0..234ae15cd5 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -92,6 +92,10 @@
+
+ org.apache.hadoop
+ hadoop-minikdc
+
org.apache.zookeeper
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala
new file mode 100644
index 0000000000..9850a91f34
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_PLAINTEXT
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class KafkaDelegationTokenSuite extends StreamTest with SharedSparkSession with KafkaTest {
+
+ import testImplicits._
+
+ protected var testUtils: KafkaTestUtils = _
+
+ protected override def sparkConf = super.sparkConf
+ .set("spark.security.credentials.hadoopfs.enabled", "false")
+ .set("spark.security.credentials.hbase.enabled", "false")
+ .set(KEYTAB, testUtils.clientKeytab)
+ .set(PRINCIPAL, testUtils.clientPrincipal)
+ .set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", testUtils.brokerAddress)
+ .set("spark.kafka.clusters.cluster1.security.protocol", SASL_PLAINTEXT.name)
+
+ override def beforeAll(): Unit = {
+ testUtils = new KafkaTestUtils(Map.empty, true)
+ testUtils.setup()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ UserGroupInformation.reset()
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ test("Roundtrip") {
+ val hadoopConf = new Configuration()
+ val manager = new HadoopDelegationTokenManager(spark.sparkContext.conf, hadoopConf, null)
+ val credentials = new Credentials()
+ manager.obtainDelegationTokens(credentials)
+ val serializedCredentials = SparkHadoopUtil.get.serialize(credentials)
+ SparkHadoopUtil.get.addDelegationTokens(serializedCredentials, spark.sparkContext.conf)
+
+ val topic = "topic-" + UUID.randomUUID().toString
+ testUtils.createTopic(topic, partitions = 5)
+
+ withTempDir { checkpointDir =>
+ val input = MemoryStream[String]
+
+ val df = input.toDF()
+ val writer = df.writeStream
+ .outputMode(OutputMode.Append)
+ .format("kafka")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.security.protocol", SASL_PLAINTEXT.name)
+ .option("topic", topic)
+ .start()
+
+ try {
+ input.addData("1", "2", "3", "4", "5")
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ } finally {
+ writer.stop()
+ }
+ }
+
+ val streamingDf = spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.security.protocol", SASL_PLAINTEXT.name)
+ .option("startingOffsets", s"earliest")
+ .option("subscribe", topic)
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ .map(kv => kv._2.toInt + 1)
+
+ testStream(streamingDf)(
+ StartStream(),
+ AssertOnQuery { q =>
+ q.processAllAvailable()
+ true
+ },
+ CheckAnswer(2, 3, 4, 5, 6),
+ StopStream
+ )
+ }
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index f2e4ee7145..d7cb30f530 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -20,29 +20,38 @@ package org.apache.spark.sql.kafka010
import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
import java.util.{Collections, Map => JMap, Properties, UUID}
import java.util.concurrent.TimeUnit
+import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
import scala.util.Random
+import com.google.common.io.Files
import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
+import org.apache.hadoop.minikdc.MiniKdc
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+import org.apache.zookeeper.server.auth.SASLAuthenticationProvider
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaTokenUtil
import org.apache.spark.util.{ShutdownHookManager, Utils}
/**
@@ -51,31 +60,41 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
*
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
-class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends Logging {
+class KafkaTestUtils(
+ withBrokerProps: Map[String, Object] = Map.empty,
+ secure: Boolean = false) extends Logging {
+
+ private val JAVA_AUTH_CONFIG = "java.security.auth.login.config"
+
+ private var kdc: MiniKdc = _
// Zookeeper related configurations
- private val zkHost = "127.0.0.1"
+ private val zkHost = "localhost"
private var zkPort: Int = 0
private val zkConnectionTimeout = 60000
private val zkSessionTimeout = 10000
private var zookeeper: EmbeddedZookeeper = _
-
private var zkUtils: ZkUtils = _
- private var adminClient: AdminClient = null
// Kafka broker related configurations
- private val brokerHost = "127.0.0.1"
+ private val brokerHost = "localhost"
private var brokerPort = 0
private var brokerConf: KafkaConfig = _
+ private val brokerServiceName = "kafka"
+ private val clientUser = "client/localhost"
+ private var clientKeytabFile: File = _
+
// Kafka broker server
private var server: KafkaServer = _
+ private var adminClient: AdminClient = _
// Kafka producer
private var producer: Producer[String, String] = _
// Flag to test whether the system is correctly started
+ private var kdcReady = false
private var zkReady = false
private var brokerReady = false
private var leakDetector: AnyRef = null
@@ -96,6 +115,84 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
throw new IllegalStateException("Zookeeper client is not yet initialized"))
}
+ def clientPrincipal: String = {
+ assert(kdcReady, "KDC should be set up beforehand")
+ clientUser + "@" + kdc.getRealm()
+ }
+
+ def clientKeytab: String = {
+ assert(kdcReady, "KDC should be set up beforehand")
+ clientKeytabFile.getAbsolutePath()
+ }
+
+ private def setUpMiniKdc(): Unit = {
+ val kdcDir = Utils.createTempDir()
+ val kdcConf = MiniKdc.createConf()
+ kdc = new MiniKdc(kdcConf, kdcDir)
+ kdc.start()
+ kdcReady = true
+ }
+
+ private def createKeytabsAndJaasConfigFile(): String = {
+ assert(kdcReady, "KDC should be set up beforehand")
+ val baseDir = Utils.createTempDir()
+
+ val zkServerUser = "zookeeper/localhost"
+ val zkServerKeytabFile = new File(baseDir, "zookeeper.keytab")
+ kdc.createPrincipal(zkServerKeytabFile, zkServerUser)
+ logDebug(s"Created keytab file: ${zkServerKeytabFile.getAbsolutePath()}")
+
+ val zkClientUser = "zkclient/localhost"
+ val zkClientKeytabFile = new File(baseDir, "zkclient.keytab")
+ kdc.createPrincipal(zkClientKeytabFile, zkClientUser)
+ logDebug(s"Created keytab file: ${zkClientKeytabFile.getAbsolutePath()}")
+
+ val kafkaServerUser = "kafka/localhost"
+ val kafkaServerKeytabFile = new File(baseDir, "kafka.keytab")
+ kdc.createPrincipal(kafkaServerKeytabFile, kafkaServerUser)
+ logDebug(s"Created keytab file: ${kafkaServerKeytabFile.getAbsolutePath()}")
+
+ clientKeytabFile = new File(baseDir, "client.keytab")
+ kdc.createPrincipal(clientKeytabFile, clientUser)
+ logDebug(s"Created keytab file: ${clientKeytabFile.getAbsolutePath()}")
+
+ val file = new File(baseDir, "jaas.conf");
+ val realm = kdc.getRealm()
+ val content =
+ s"""
+ |Server {
+ | ${KafkaTokenUtil.getKrb5LoginModuleName} required
+ | useKeyTab=true
+ | storeKey=true
+ | useTicketCache=false
+ | keyTab="${zkServerKeytabFile.getAbsolutePath()}"
+ | principal="$zkServerUser@$realm";
+ |};
+ |
+ |Client {
+ | ${KafkaTokenUtil.getKrb5LoginModuleName} required
+ | useKeyTab=true
+ | storeKey=true
+ | useTicketCache=false
+ | keyTab="${zkClientKeytabFile.getAbsolutePath()}"
+ | principal="$zkClientUser@$realm";
+ |};
+ |
+ |KafkaServer {
+ | ${KafkaTokenUtil.getKrb5LoginModuleName} required
+ | serviceName="$brokerServiceName"
+ | useKeyTab=true
+ | storeKey=true
+ | keyTab="${kafkaServerKeytabFile.getAbsolutePath()}"
+ | principal="$kafkaServerUser@$realm";
+ |};
+ """.stripMargin.trim
+ Files.write(content, file, StandardCharsets.UTF_8)
+ logDebug(s"Created JAAS file: ${file.getPath}")
+ logDebug(s"JAAS file content: $content")
+ file.getAbsolutePath()
+ }
+
// Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup
@@ -110,20 +207,20 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private def setupEmbeddedKafkaServer(): Unit = {
assert(zkReady, "Zookeeper should be set up beforehand")
+ val protocolName = if (!secure) PLAINTEXT.name else SASL_PLAINTEXT.name
+
// Kafka broker startup
Utils.startServiceOnPort(brokerPort, port => {
brokerPort = port
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
- brokerPort = server.boundPort(new ListenerName("PLAINTEXT"))
+ brokerPort = server.boundPort(new ListenerName(protocolName))
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")
+ adminClient = AdminClient.create(adminClientConfiguration)
brokerReady = true
- val props = new Properties()
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
- adminClient = AdminClient.create(props)
}
/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
@@ -135,6 +232,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
logError("Found a leak KafkaTestUtils.", exception)
}
+ if (secure) {
+ setUpMiniKdc()
+ val jaasConfigFile = createKeytabsAndJaasConfigFile()
+ System.setProperty(JAVA_AUTH_CONFIG, jaasConfigFile)
+ Configuration.getConfiguration.refresh()
+ } else {
+ System.clearProperty(JAVA_AUTH_CONFIG)
+ }
setupEmbeddedZookeeper()
setupEmbeddedKafkaServer()
eventually(timeout(1.minute)) {
@@ -186,6 +291,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
zookeeper.shutdown()
zookeeper = null
}
+
+ System.clearProperty(JAVA_AUTH_CONFIG)
+ Configuration.getConfiguration.refresh()
+ if (kdc != null) {
+ kdc.stop()
+ }
+ UserGroupInformation.reset()
}
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
@@ -334,12 +446,27 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("transaction.state.log.replication.factor", "1")
props.put("transaction.state.log.min.isr", "1")
+ if (secure) {
+ props.put("listeners", "SASL_PLAINTEXT://127.0.0.1:0")
+ props.put("advertised.listeners", "SASL_PLAINTEXT://127.0.0.1:0")
+ props.put("inter.broker.listener.name", "SASL_PLAINTEXT")
+ props.put("delegation.token.master.key", UUID.randomUUID().toString)
+ props.put("sasl.enabled.mechanisms", "GSSAPI,SCRAM-SHA-512")
+ }
+
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
withBrokerProps.foreach { case (k, v) => props.put(k, v) }
props
}
+ private def adminClientConfiguration: Properties = {
+ val props = new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
+ setAuthenticationConfigIfNeeded(props)
+ props
+ }
+
private def producerConfiguration: Properties = {
val props = new Properties()
props.put("bootstrap.servers", brokerAddress)
@@ -347,6 +474,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("key.serializer", classOf[StringSerializer].getName)
// wait for all in-sync replicas to ack sends
props.put("acks", "all")
+ setAuthenticationConfigIfNeeded(props)
props
}
@@ -370,9 +498,19 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("value.deserializer", classOf[StringDeserializer].getName)
props.put("key.deserializer", classOf[StringDeserializer].getName)
props.put("enable.auto.commit", "false")
+ setAuthenticationConfigIfNeeded(props)
props
}
+ private def setAuthenticationConfigIfNeeded(props: Properties): Unit = {
+ if (secure) {
+ val jaasParams = KafkaTokenUtil.getKeytabJaasParams(
+ clientKeytabFile.getAbsolutePath, clientPrincipal, brokerServiceName)
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT.name)
+ }
+ }
+
/** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
private def verifyTopicDeletion(
topic: String,
@@ -454,9 +592,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
}
private class EmbeddedZookeeper(val zkConnect: String) {
+ private val ZOOKEEPER_AUTH_PROVIDER = "zookeeper.authProvider.1"
+
val snapshotDir = Utils.createTempDir()
val logDir = Utils.createTempDir()
+ if (secure) {
+ System.setProperty(ZOOKEEPER_AUTH_PROVIDER, classOf[SASLAuthenticationProvider].getName)
+ } else {
+ System.clearProperty(ZOOKEEPER_AUTH_PROVIDER)
+ }
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val (ip, port) = {
val splits = zkConnect.split(":")
@@ -485,6 +630,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
case e: IOException if Utils.isWindows =>
logWarning(e.getMessage)
}
+ System.clearProperty(ZOOKEEPER_AUTH_PROVIDER)
}
}
}
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index 950df867e1..39e3ac74a9 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -125,7 +125,9 @@ private[spark] object KafkaTokenUtil extends Logging {
adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
if (sparkConf.contains(KEYTAB)) {
logDebug("Keytab detected, using it for login.")
- val jaasParams = getKeytabJaasParams(sparkConf, clusterConf)
+ val keyTab = sparkConf.get(KEYTAB).get
+ val principal = sparkConf.get(PRINCIPAL).get
+ val jaasParams = getKeytabJaasParams(keyTab, principal, clusterConf.kerberosServiceName)
adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
} else {
logDebug("Using ticket cache for login.")
@@ -181,17 +183,18 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}
- private def getKeytabJaasParams(
- sparkConf: SparkConf,
- clusterConf: KafkaTokenClusterConf): String = {
+ def getKeytabJaasParams(
+ keyTab: String,
+ principal: String,
+ kerberosServiceName: String): String = {
val params =
s"""
|${getKrb5LoginModuleName} required
| debug=${isGlobalKrbDebugEnabled()}
| useKeyTab=true
- | serviceName="${clusterConf.kerberosServiceName}"
- | keyTab="${sparkConf.get(KEYTAB).get}"
- | principal="${sparkConf.get(PRINCIPAL).get}";
+ | serviceName="$kerberosServiceName"
+ | keyTab="$keyTab"
+ | principal="$principal";
""".stripMargin.replace("\n", "")
logDebug(s"Krb keytab JAAS params: $params")
params
@@ -213,7 +216,7 @@ private[spark] object KafkaTokenUtil extends Logging {
* Krb5LoginModule package vary in different JVMs.
* Please see Hadoop UserGroupInformation for further details.
*/
- private def getKrb5LoginModuleName(): String = {
+ def getKrb5LoginModuleName(): String = {
if (System.getProperty("java.vendor").contains("IBM")) {
"com.ibm.security.auth.module.Krb5LoginModule"
} else {
diff --git a/pom.xml b/pom.xml
index d655e9d9df..0288c6f97d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1026,6 +1026,18 @@
+
+ org.apache.hadoop
+ hadoop-minikdc
+ ${hadoop.version}
+ test
+
+
+ org.apache.directory.api
+ api-ldap-schema-data
+
+
+
org.apache.avro
avro
@@ -2883,6 +2895,18 @@
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 4.2.0
+ true
+