[SPARK-34032][SS] Add truststore and keystore type config possibility for Kafka delegation token

### What changes were proposed in this pull request?
Kafka delegation token is obtained with `AdminClient` where security settings can be set. Keystore and trustrore type however can't be set. In this PR I've added these new configurations. This can be useful when the type is different. A good example is to make Spark FIPS compliant where the default JKS is not accepted.

### Why are the changes needed?
Missing configurations.

### Does this PR introduce _any_ user-facing change?
Yes, adding 2 additional config parameters.

### How was this patch tested?
Existing + modified unit tests + simple Kafka to Kafka app on cluster.

Closes #31070 from gaborgsomogyi/SPARK-34032.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
This commit is contained in:
Gabor Somogyi 2021-01-08 20:04:56 +09:00 committed by Jungtaek Lim (HeartSaVioR)
parent 0f8e5dd445
commit 71d261ab8f
6 changed files with 49 additions and 0 deletions

View file

@ -1004,6 +1004,14 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.truststore.type</code></td>
<td>None</td>
<td>
The file format of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token.
</td>
<td>3.2.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td>
<td>None</td>
@ -1021,6 +1029,15 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.keystore.type</code></td>
<td>None</td>
<td>
The file format of the key store file. This is optional for client.
For further details please see Kafka documentation. Only used to obtain delegation token.
</td>
<td>3.2.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td>
<td>None</td>

View file

@ -31,8 +31,10 @@ private[spark] case class KafkaTokenClusterConf(
targetServersRegex: String,
securityProtocol: String,
kerberosServiceName: String,
trustStoreType: Option[String],
trustStoreLocation: Option[String],
trustStorePassword: Option[String],
keyStoreType: Option[String],
keyStoreLocation: Option[String],
keyStorePassword: Option[String],
keyPassword: Option[String],
@ -44,8 +46,10 @@ private[spark] case class KafkaTokenClusterConf(
s"targetServersRegex=$targetServersRegex, " +
s"securityProtocol=$securityProtocol, " +
s"kerberosServiceName=$kerberosServiceName, " +
s"trustStoreType=$trustStoreType, " +
s"trustStoreLocation=$trustStoreLocation, " +
s"trustStorePassword=${trustStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
s"keyStoreType=$keyStoreType, " +
s"keyStoreLocation=$keyStoreLocation, " +
s"keyStorePassword=${keyStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
s"keyPassword=${keyPassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
@ -77,8 +81,10 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
DEFAULT_SECURITY_PROTOCOL_CONFIG),
sparkClusterConf.getOrElse(SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME),
sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG),
sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG),

View file

@ -162,6 +162,9 @@ private[spark] object KafkaTokenUtil extends Logging {
private def setTrustStoreProperties(
clusterConf: KafkaTokenClusterConf,
properties: ju.Properties): Unit = {
clusterConf.trustStoreType.foreach { truststoreType =>
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreType)
}
clusterConf.trustStoreLocation.foreach { truststoreLocation =>
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation)
}
@ -173,6 +176,9 @@ private[spark] object KafkaTokenUtil extends Logging {
private def setKeyStoreProperties(
clusterConf: KafkaTokenClusterConf,
properties: ju.Properties): Unit = {
clusterConf.keyStoreType.foreach { keystoreType =>
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, keystoreType)
}
clusterConf.keyStoreLocation.foreach { keystoreLocation =>
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation)
}

View file

@ -51,8 +51,10 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
protected val bootStrapServers = "127.0.0.1:0"
protected val matchingTargetServersRegex = "127.0.0.*:0"
protected val nonMatchingTargetServersRegex = "127.0.intentionally_non_matching.*:0"
protected val trustStoreType = "customTrustStoreType"
protected val trustStoreLocation = "/path/to/trustStore"
protected val trustStorePassword = "trustStoreSecret"
protected val keyStoreType = "customKeyStoreType"
protected val keyStoreLocation = "/path/to/keyStore"
protected val keyStorePassword = "keyStoreSecret"
protected val keyPassword = "keySecret"
@ -124,8 +126,10 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX,
securityProtocol,
KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME,
Some(trustStoreType),
Some(trustStoreLocation),
Some(trustStorePassword),
Some(keyStoreType),
Some(keyStoreLocation),
Some(keyStorePassword),
Some(keyPassword),

View file

@ -29,8 +29,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
private val targetServersRegex = "127.0.0.*:0"
private val securityProtocol = SSL.name
private val kerberosServiceName = "kafka1"
private val trustStoreType = "customTrustStoreType"
private val trustStoreLocation = "/path/to/trustStore"
private val trustStorePassword = "trustStoreSecret"
private val keyStoreType = "customKeyStoreType"
private val keyStoreLocation = "/path/to/keyStore"
private val keyStorePassword = "keyStoreSecret"
private val keyPassword = "keySecret"
@ -60,8 +62,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(clusterConfig.securityProtocol === SASL_SSL.name)
assert(clusterConfig.kerberosServiceName ===
KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME)
assert(clusterConfig.trustStoreType === None)
assert(clusterConfig.trustStoreLocation === None)
assert(clusterConfig.trustStorePassword === None)
assert(clusterConfig.keyStoreType === None)
assert(clusterConfig.keyStoreLocation === None)
assert(clusterConfig.keyStorePassword === None)
assert(clusterConfig.keyPassword === None)
@ -75,8 +79,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
sparkConf.set(s"spark.kafka.clusters.$identifier1.security.protocol", securityProtocol)
sparkConf.set(s"spark.kafka.clusters.$identifier1.sasl.kerberos.service.name",
kerberosServiceName)
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.type", trustStoreType)
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.location", trustStoreLocation)
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.password", trustStorePassword)
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.type", keyStoreType)
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.location", keyStoreLocation)
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.password", keyStorePassword)
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.key.password", keyPassword)
@ -88,8 +94,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(clusterConfig.targetServersRegex === targetServersRegex)
assert(clusterConfig.securityProtocol === securityProtocol)
assert(clusterConfig.kerberosServiceName === kerberosServiceName)
assert(clusterConfig.trustStoreType === Some(trustStoreType))
assert(clusterConfig.trustStoreLocation === Some(trustStoreLocation))
assert(clusterConfig.trustStorePassword === Some(trustStorePassword))
assert(clusterConfig.keyStoreType === Some(keyStoreType))
assert(clusterConfig.keyStoreLocation === Some(keyStoreLocation))
assert(clusterConfig.keyStorePassword === Some(keyStorePassword))
assert(clusterConfig.keyPassword === Some(keyPassword))
@ -127,8 +135,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(clusterConfig.securityProtocol === SASL_SSL.name)
assert(clusterConfig.kerberosServiceName ===
KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME)
assert(clusterConfig.trustStoreType === None)
assert(clusterConfig.trustStoreLocation === None)
assert(clusterConfig.trustStorePassword === None)
assert(clusterConfig.keyStoreType === None)
assert(clusterConfig.keyStoreLocation === None)
assert(clusterConfig.keyStorePassword === None)
assert(clusterConfig.keyPassword === None)

View file

@ -64,8 +64,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SASL_PLAINTEXT.name)
assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
@ -80,10 +82,12 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SASL_SSL.name)
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) === trustStoreType)
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
=== trustStoreLocation)
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
=== trustStorePassword)
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
@ -99,10 +103,12 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SSL.name)
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) === trustStoreType)
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
=== trustStoreLocation)
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
=== trustStorePassword)
assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG) === keyStoreType)
assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) === keyStoreLocation)
assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === keyStorePassword)
assert(adminClientProperties.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === keyPassword)