[SPARK-28928][SS] Use Kafka delegation token protocol on sources/sinks

### What changes were proposed in this pull request?
At the moment there are 3 places where communication protocol with Kafka cluster has to be set when delegation token used:
* On delegation token
* On source
* On sink

Most of the time users are using the same protocol on all these places (within one Kafka cluster). It would be better to declare it in one place (delegation token side) and Kafka sources/sinks can take this config over.

In this PR I've I've modified the code in a way that Kafka sources/sinks are taking over delegation token side `security.protocol` configuration when the token and the source/sink matches in `bootstrap.servers` configuration. This default configuration can be overwritten on each source/sink independently by using `kafka.security.protocol` configuration.

### Why are the changes needed?
The actual configuration's default behavior represents the minority of the use-cases and inconvenient.

### Does this PR introduce any user-facing change?
Yes, with this change users need to provide less configuration parameters by default.

### How was this patch tested?
Existing + additional unit tests.

Closes #25631 from gaborgsomogyi/SPARK-28928.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Gabor Somogyi 2019-09-09 15:41:51 -07:00 committed by Marcelo Vanzin
parent 8018ded217
commit e516f7e09e
5 changed files with 32 additions and 5 deletions

View file

@ -825,7 +825,9 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
<td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td>
<td>SASL_SSL</td>
<td>
Protocol used to communicate with brokers. For further details please see Kafka documentation. Only used to obtain delegation token.
Protocol used to communicate with brokers. For further details please see Kafka documentation. Protocol is applied on all the sources and sinks as default where
<code>bootstrap.servers</code> config matches (for further details please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code>),
and can be overridden by setting <code>kafka.security.protocol</code> on the source or sink.
</td>
</tr>
<tr>

View file

@ -82,7 +82,6 @@ class KafkaDelegationTokenSuite extends StreamTest with SharedSparkSession with
.format("kafka")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.security.protocol", SASL_PLAINTEXT.name)
.option("topic", topic)
.start()
@ -99,7 +98,6 @@ class KafkaDelegationTokenSuite extends StreamTest with SharedSparkSession with
val streamingDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.security.protocol", SASL_PLAINTEXT.name)
.option("startingOffsets", s"earliest")
.option("subscribe", topic)
.load()

View file

@ -70,6 +70,7 @@ private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[St
map.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG).asInstanceOf[String])
clusterConfig.foreach { clusterConf =>
logDebug("Delegation token detected, using it for login.")
setIfUnset(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clusterConf.securityProtocol)
val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
require(clusterConf.tokenMechanism.startsWith("SCRAM"),

View file

@ -57,6 +57,7 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
val CLUSTERS_CONFIG_PREFIX = "spark.kafka.clusters."
val DEFAULT_TARGET_SERVERS_REGEX = ".*"
val DEFAULT_SASL_KERBEROS_SERVICE_NAME = "kafka"
val DEFAULT_SECURITY_PROTOCOL_CONFIG = SASL_SSL.name
val DEFAULT_SASL_TOKEN_MECHANISM = "SCRAM-SHA-512"
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf = {
@ -72,7 +73,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
s"${configPrefix}auth.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}")),
sparkClusterConf.getOrElse(s"target.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}.regex",
KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX),
sparkClusterConf.getOrElse(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_SSL.name),
sparkClusterConf.getOrElse(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
DEFAULT_SECURITY_PROTOCOL_CONFIG),
sparkClusterConf.getOrElse(SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME),
sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),

View file

@ -17,8 +17,11 @@
package org.apache.spark.kafka010
import java.{util => ju}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL}
import org.apache.spark.SparkFunSuite
@ -76,6 +79,26 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes
val params = Map(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> bootStrapServers
)
testWithTokenSetValues(params) { updatedParams =>
assert(updatedParams.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) ===
KafkaTokenSparkConf.DEFAULT_SECURITY_PROTOCOL_CONFIG)
}
}
test("setAuthenticationConfigIfNeeded with token should not override user-defined protocol") {
val overrideProtocolName = SASL_PLAINTEXT.name
val params = Map(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> bootStrapServers,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> overrideProtocolName
)
testWithTokenSetValues(params) { updatedParams =>
assert(updatedParams.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) ===
overrideProtocolName)
}
}
def testWithTokenSetValues(params: Map[String, String])
(validate: (ju.Map[String, Object]) => Unit) {
setSparkEnv(
Map(
s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers
@ -87,11 +110,12 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes
.setAuthenticationConfigIfNeeded()
.build()
assert(updatedParams.size() === 3)
assert(updatedParams.size() === 4)
assert(updatedParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers)
assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) ===
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
validate(updatedParams)
}
test("setAuthenticationConfigIfNeeded with invalid mechanism should throw exception") {